You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2014/12/23 09:23:09 UTC
[33/38] incubator-ignite git commit: # ignite-41
# ignite-41
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/5b142ddd
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/5b142ddd
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/5b142ddd
Branch: refs/heads/ignite-1
Commit: 5b142ddd2d54172510035a7a6a72adad7e133b40
Parents: 575dbae
Author: sboikov <sb...@gridgain.com>
Authored: Mon Dec 22 16:50:58 2014 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Mon Dec 22 23:00:55 2014 +0300
----------------------------------------------------------------------
.../processors/cache/GridCacheAdapter.java | 26 +++--
.../cache/IgniteCacheExpiryPolicy.java | 7 +-
.../GridDistributedCacheAdapter.java | 18 ++-
.../GridDistributedTxRemoteAdapter.java | 12 +-
.../distributed/dht/GridDhtLockFuture.java | 18 ++-
.../distributed/dht/GridDhtLockRequest.java | 50 +++++++-
.../dht/GridDhtTransactionalCacheAdapter.java | 42 +++++--
.../distributed/dht/GridDhtTxLocalAdapter.java | 16 ++-
.../cache/distributed/dht/GridDhtTxRemote.java | 6 +-
.../dht/atomic/GridDhtAtomicCache.java | 19 ++-
.../dht/colocated/GridDhtColocatedCache.java | 87 +++++++++++---
.../colocated/GridDhtColocatedLockFuture.java | 23 +++-
.../distributed/near/GridNearAtomicCache.java | 1 +
.../distributed/near/GridNearCacheAdapter.java | 3 +-
.../distributed/near/GridNearLockFuture.java | 11 +-
.../distributed/near/GridNearLockRequest.java | 35 +++++-
.../near/GridNearTransactionalCache.java | 28 +++--
.../cache/distributed/near/GridNearTxLocal.java | 116 +++++++++++++++++--
.../processors/cache/local/GridLocalCache.java | 11 +-
.../processors/cache/local/GridLocalTx.java | 1 +
.../local/atomic/GridLocalAtomicCache.java | 4 +-
.../cache/transactions/IgniteTxHandler.java | 11 +-
.../transactions/IgniteTxLocalAdapter.java | 82 ++++++++++---
.../IgniteCacheExpiryPolicyAbstractTest.java | 43 +++++--
.../GridCacheInterceptorAbstractSelfTest.java | 1 +
25 files changed, 545 insertions(+), 126 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5b142ddd/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheAdapter.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheAdapter.java
index 3780cbb..5295f20 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheAdapter.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheAdapter.java
@@ -533,6 +533,7 @@ public abstract class GridCacheAdapter<K, V> extends GridMetadataAwareAdapter im
* @param retval Flag to return value.
* @param isolation Transaction isolation.
* @param invalidate Invalidate flag.
+ * @param filter TTL for read operation.
* @param filter Optional filter.
* @return Locks future.
*/
@@ -544,6 +545,7 @@ public abstract class GridCacheAdapter<K, V> extends GridMetadataAwareAdapter im
boolean retval,
IgniteTxIsolation isolation,
boolean invalidate,
+ long accessTtl,
IgnitePredicate<GridCacheEntry<K, V>>[] filter);
/**
@@ -1743,11 +1745,6 @@ public abstract class GridCacheAdapter<K, V> extends GridMetadataAwareAdapter im
subjId = ctx.subjectIdPerCall(subjId, prj);
- ExpiryPolicy expiryPlc = prj != null ? prj.expiry() : null;
-
- if (expiryPlc == null)
- expiryPlc = ctx.expiry();
-
return getAllAsync(keys,
entry,
!skipTx,
@@ -1755,7 +1752,7 @@ public abstract class GridCacheAdapter<K, V> extends GridMetadataAwareAdapter im
taskName,
deserializePortable,
forcePrimary,
- GetExpiryPolicy.forPolicy(expiryPlc),
+ accessExpiryPolicy(prj != null ? prj.expiry() : null),
filter);
}
@@ -4552,6 +4549,17 @@ public abstract class GridCacheAdapter<K, V> extends GridMetadataAwareAdapter im
}
/**
+ * @param plc Explicitly specified expiry policy for cache operation.
+ * @return Expiry policy wrapper.
+ */
+ @Nullable public GetExpiryPolicy accessExpiryPolicy(@Nullable ExpiryPolicy plc) {
+ if (plc == null)
+ plc = ctx.expiry();
+
+ return GetExpiryPolicy.forPolicy(plc);
+ }
+
+ /**
* Cache operation.
*/
private abstract class SyncOp<T> {
@@ -4901,10 +4909,8 @@ public abstract class GridCacheAdapter<K, V> extends GridMetadataAwareAdapter im
return -1L;
}
- /**
- *
- */
- public synchronized void reset() {
+ /** {@inheritDoc} */
+ @Override public synchronized void reset() {
if (entries != null)
entries.clear();
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5b142ddd/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/IgniteCacheExpiryPolicy.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/IgniteCacheExpiryPolicy.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/IgniteCacheExpiryPolicy.java
index 59cd937..d603b35 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/IgniteCacheExpiryPolicy.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/IgniteCacheExpiryPolicy.java
@@ -36,7 +36,7 @@ public interface IgniteCacheExpiryPolicy {
public long forAccess();
/**
- * Callback when entry's ttl is updated on access.
+ * Callback for ttl update on entry access.
*
* @param key Entry key.
* @param keyBytes Entry key bytes.
@@ -49,6 +49,11 @@ public interface IgniteCacheExpiryPolicy {
@Nullable Collection<UUID> rdrs);
/**
+ * Clears information about updated entries.
+ */
+ public void reset();
+
+ /**
* @return Entries with TTL updated on access.
*/
@Nullable public Map<Object, IgniteBiTuple<byte[], GridCacheVersion>> entries();
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5b142ddd/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/GridDistributedCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/GridDistributedCacheAdapter.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/GridDistributedCacheAdapter.java
index e19fe92..2d01a82 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/GridDistributedCacheAdapter.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/GridDistributedCacheAdapter.java
@@ -59,11 +59,12 @@ public abstract class GridDistributedCacheAdapter<K, V> extends GridCacheAdapter
boolean retval,
IgniteTxIsolation isolation,
boolean isInvalidate,
+ long accessTtl,
IgnitePredicate<GridCacheEntry<K, V>>[] filter
) {
assert tx != null;
- return lockAllAsync(keys, timeout, tx, isInvalidate, isRead, retval, isolation, filter);
+ return lockAllAsync(keys, timeout, tx, isInvalidate, isRead, retval, isolation, accessTtl, filter);
}
/** {@inheritDoc} */
@@ -72,7 +73,7 @@ public abstract class GridDistributedCacheAdapter<K, V> extends GridCacheAdapter
IgniteTxLocalEx<K, V> tx = ctx.tm().userTxx();
// Return value flag is true because we choose to bring values for explicit locks.
- return lockAllAsync(keys, timeout, tx, false, false, /*retval*/true, null, filter);
+ return lockAllAsync(keys, timeout, tx, false, false, /*retval*/true, null, -1L, filter);
}
/**
@@ -83,12 +84,19 @@ public abstract class GridDistributedCacheAdapter<K, V> extends GridCacheAdapter
* @param isRead Indicates whether value is read or written.
* @param retval Flag to return value.
* @param isolation Transaction isolation.
+ * @param accessTtl TTL for read operation.
* @param filter Optional filter.
* @return Future for locks.
*/
- protected abstract IgniteFuture<Boolean> lockAllAsync(Collection<? extends K> keys, long timeout,
- @Nullable IgniteTxLocalEx<K, V> tx, boolean isInvalidate, boolean isRead, boolean retval,
- @Nullable IgniteTxIsolation isolation, IgnitePredicate<GridCacheEntry<K, V>>[] filter);
+ protected abstract IgniteFuture<Boolean> lockAllAsync(Collection<? extends K> keys,
+ long timeout,
+ @Nullable IgniteTxLocalEx<K, V> tx,
+ boolean isInvalidate,
+ boolean isRead,
+ boolean retval,
+ @Nullable IgniteTxIsolation isolation,
+ long accessTtl,
+ IgnitePredicate<GridCacheEntry<K, V>>[] filter);
/**
* @param key Key to remove.
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5b142ddd/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java
index 1abf714..2e87441 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java
@@ -578,17 +578,7 @@ public class GridDistributedTxRemoteAdapter<K, V> extends IgniteTxAdapter<K, V>
}
}
else if (op == READ) {
- ExpiryPolicy expiry = txEntry.expiry();
-
- if (expiry == null)
- expiry = cacheCtx.expiry();
-
- if (expiry != null) {
- Duration duration = expiry.getExpiryForAccess();
-
- if (duration != null)
- cached.updateTtl(null, CU.toTtl(duration));
- }
+ assert near();
if (log.isDebugEnabled())
log.debug("Ignoring READ entry when committing: " + txEntry);
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5b142ddd/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtLockFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtLockFuture.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtLockFuture.java
index 5a9ee72..2970568 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtLockFuture.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtLockFuture.java
@@ -121,6 +121,9 @@ public final class GridDhtLockFuture<K, V> extends GridCompoundIdentityFuture<Bo
/** Pending locks. */
private final Collection<K> pendingLocks = new GridConcurrentHashSet<>();
+ /** TTL for read operation. */
+ private long accessTtl;
+
/**
* Empty constructor required by {@link Externalizable}.
*/
@@ -138,6 +141,7 @@ public final class GridDhtLockFuture<K, V> extends GridCompoundIdentityFuture<Bo
* @param timeout Lock acquisition timeout.
* @param tx Transaction.
* @param threadId Thread ID.
+ * @param accessTtl TTL for read operation.
* @param filter Filter.
*/
public GridDhtLockFuture(
@@ -150,10 +154,10 @@ public final class GridDhtLockFuture<K, V> extends GridCompoundIdentityFuture<Bo
long timeout,
GridDhtTxLocalAdapter<K, V> tx,
long threadId,
+ long accessTtl,
IgnitePredicate<GridCacheEntry<K, V>>[] filter) {
super(cctx.kernalContext(), CU.boolReducer());
- assert cctx != null;
assert nearNodeId != null;
assert nearLockVer != null;
assert topVer > 0;
@@ -166,6 +170,7 @@ public final class GridDhtLockFuture<K, V> extends GridCompoundIdentityFuture<Bo
this.timeout = timeout;
this.filter = filter;
this.tx = tx;
+ this.accessTtl = accessTtl;
if (tx != null)
tx.topologyVersion(topVer);
@@ -202,6 +207,7 @@ public final class GridDhtLockFuture<K, V> extends GridCompoundIdentityFuture<Bo
}
/**
+ * @param cacheCtx Cache context.
* @param invalidPart Partition to retry.
*/
void addInvalidPartition(GridCacheContext<K, V> cacheCtx, int invalidPart) {
@@ -827,7 +833,8 @@ public final class GridDhtLockFuture<K, V> extends GridCompoundIdentityFuture<Bo
inTx() ? tx.groupLockKey() : null,
inTx() && tx.partitionLock(),
inTx() ? tx.subjectId() : null,
- inTx() ? tx.taskNameHash() : 0);
+ inTx() ? tx.taskNameHash() : 0,
+ read ? accessTtl : -1L);
try {
for (ListIterator<GridDhtCacheEntry<K, V>> it = dhtMapping.listIterator(); it.hasNext();) {
@@ -838,11 +845,13 @@ public final class GridDhtLockFuture<K, V> extends GridCompoundIdentityFuture<Bo
boolean invalidateRdr = e.readerId(n.id()) != null;
+ IgniteTxEntry<K, V> entry = tx != null ? tx.entry(e.txKey()) : null;
+
req.addDhtKey(
e.key(),
e.getOrMarshalKeyBytes(),
tx != null ? tx.writeMap().get(e.txKey()) : null,
- tx != null ? tx.entry(e.txKey()).drVersion() : null,
+ entry != null ? entry.drVersion() : null,
invalidateRdr,
cctx);
@@ -906,7 +915,8 @@ public final class GridDhtLockFuture<K, V> extends GridCompoundIdentityFuture<Bo
inTx() ? tx.groupLockKey() : null,
inTx() && tx.partitionLock(),
inTx() ? tx.subjectId() : null,
- inTx() ? tx.taskNameHash() : 0);
+ inTx() ? tx.taskNameHash() : 0,
+ read ? accessTtl : -1L);
try {
for (ListIterator<GridDhtCacheEntry<K, V>> it = nearMapping.listIterator(); it.hasNext();) {
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5b142ddd/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtLockRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtLockRequest.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtLockRequest.java
index cb4ef69..f72f921 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtLockRequest.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtLockRequest.java
@@ -73,6 +73,9 @@ public class GridDhtLockRequest<K, V> extends GridDistributedLockRequest<K, V> {
@GridDirectVersion(3)
private BitSet preloadKeys;
+ /** TTL for read operation. */
+ private long accessTtl;
+
/**
* Empty constructor required for {@link Externalizable}.
*/
@@ -81,6 +84,7 @@ public class GridDhtLockRequest<K, V> extends GridDistributedLockRequest<K, V> {
}
/**
+ * @param cacheId Cache ID.
* @param nodeId Node ID.
* @param nearXidVer Near transaction ID.
* @param threadId Thread ID.
@@ -98,6 +102,9 @@ public class GridDhtLockRequest<K, V> extends GridDistributedLockRequest<K, V> {
* @param txSize Expected transaction size.
* @param grpLockKey Group lock key.
* @param partLock {@code True} if partition lock.
+ * @param subjId Subject ID.
+ * @param taskNameHash Task name hash code.
+ * @param accessTtl TTL for read operation.
*/
public GridDhtLockRequest(
int cacheId,
@@ -119,10 +126,24 @@ public class GridDhtLockRequest<K, V> extends GridDistributedLockRequest<K, V> {
@Nullable IgniteTxKey grpLockKey,
boolean partLock,
@Nullable UUID subjId,
- int taskNameHash
+ int taskNameHash,
+ long accessTtl
) {
- super(cacheId, nodeId, nearXidVer, threadId, futId, lockVer, isInTx, isRead, isolation, isInvalidate, timeout,
- dhtCnt == 0 ? nearCnt : dhtCnt, txSize, grpLockKey, partLock);
+ super(cacheId,
+ nodeId,
+ nearXidVer,
+ threadId,
+ futId,
+ lockVer,
+ isInTx,
+ isRead,
+ isolation,
+ isInvalidate,
+ timeout,
+ dhtCnt == 0 ? nearCnt : dhtCnt,
+ txSize,
+ grpLockKey,
+ partLock);
this.topVer = topVer;
@@ -135,6 +156,7 @@ public class GridDhtLockRequest<K, V> extends GridDistributedLockRequest<K, V> {
this.miniId = miniId;
this.subjId = subjId;
this.taskNameHash = taskNameHash;
+ this.accessTtl = accessTtl;
}
/** {@inheritDoc} */
@@ -239,6 +261,7 @@ public class GridDhtLockRequest<K, V> extends GridDistributedLockRequest<K, V> {
}
/**
+ * @param idx Key index.
* @return {@code True} if need to preload key with given index.
*/
public boolean needPreloadKey(int idx) {
@@ -282,6 +305,13 @@ public class GridDhtLockRequest<K, V> extends GridDistributedLockRequest<K, V> {
return miniId;
}
+ /**
+ * @return TTL for read operation.
+ */
+ public long accessTtl() {
+ return accessTtl;
+ }
+
/** {@inheritDoc}
* @param ctx*/
@Override public void prepareMarshal(GridCacheSharedContext<K, V> ctx) throws IgniteCheckedException {
@@ -330,6 +360,7 @@ public class GridDhtLockRequest<K, V> extends GridDistributedLockRequest<K, V> {
_clone.subjId = subjId;
_clone.taskNameHash = taskNameHash;
_clone.preloadKeys = preloadKeys;
+ _clone.accessTtl = accessTtl;
}
/** {@inheritDoc} */
@@ -417,6 +448,12 @@ public class GridDhtLockRequest<K, V> extends GridDistributedLockRequest<K, V> {
commState.idx++;
+ case 32:
+ if (!commState.putLong(accessTtl))
+ return false;
+
+ commState.idx++;
+
}
return true;
@@ -526,6 +563,13 @@ public class GridDhtLockRequest<K, V> extends GridDistributedLockRequest<K, V> {
commState.idx++;
+ case 32:
+ if (buf.remaining() < 8)
+ return false;
+
+ accessTtl = commState.getLong();
+
+ commState.idx++;
}
return true;
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5b142ddd/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java
index 67c01b3..dc8ddcb 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java
@@ -210,13 +210,14 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach
tx.addWrite(
ctx,
- writeEntry == null ? (req.txRead() ? READ : NOOP) : writeEntry.op(),
+ writeEntry == null ? NOOP : writeEntry.op(),
txKey,
req.keyBytes() != null ? req.keyBytes().get(i) : null,
writeEntry == null ? null : writeEntry.value(),
writeEntry == null ? null : writeEntry.valueBytes(),
writeEntry == null ? null : writeEntry.transformClosures(),
- drVer);
+ drVer,
+ req.accessTtl());
if (req.groupLock())
tx.groupLockKey(txKey);
@@ -547,8 +548,17 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach
boolean isRead,
boolean retval,
IgniteTxIsolation isolation,
+ long accessTtl,
IgnitePredicate<GridCacheEntry<K, V>>[] filter) {
- return lockAllAsyncInternal(keys, timeout, txx, isInvalidate, isRead, retval, isolation, filter);
+ return lockAllAsyncInternal(keys,
+ timeout,
+ txx,
+ isInvalidate,
+ isRead,
+ retval,
+ isolation,
+ accessTtl,
+ filter);
}
/**
@@ -561,6 +571,7 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach
* @param isRead Read flag.
* @param retval Return value flag.
* @param isolation Transaction isolation.
+ * @param accessTtl TTL for read operation.
* @param filter Optional filter.
* @return Lock future.
*/
@@ -571,6 +582,7 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach
boolean isRead,
boolean retval,
IgniteTxIsolation isolation,
+ long accessTtl,
IgnitePredicate<GridCacheEntry<K, V>>[] filter) {
if (keys == null || keys.isEmpty())
return new GridDhtFinishedFuture<>(ctx.kernalContext(), true);
@@ -589,6 +601,7 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach
timeout,
tx,
tx.threadId(),
+ accessTtl,
filter);
for (K key : keys) {
@@ -637,6 +650,7 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach
}
/**
+ * @param cacheCtx Cache context.
* @param nearNode Near node.
* @param req Request.
* @param filter0 Filter.
@@ -705,6 +719,7 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach
req.timeout(),
tx,
req.threadId(),
+ req.accessTtl(),
filter);
// Add before mapping.
@@ -815,15 +830,16 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach
req.drVersions(),
req.messageId(),
req.implicitTx(),
- req.txRead());
+ req.txRead(),
+ req.accessTtl());
final GridDhtTxLocal<K, V> t = tx;
return new GridDhtEmbeddedFuture<>(
txFut,
new C2<GridCacheReturn<V>, Exception, IgniteFuture<GridNearLockResponse<K, V>>>() {
- @Override public IgniteFuture<GridNearLockResponse<K, V>> apply(GridCacheReturn<V> o,
- Exception e) {
+ @Override public IgniteFuture<GridNearLockResponse<K, V>> apply(
+ GridCacheReturn<V> o, Exception e) {
if (e != null)
e = U.unwrap(e);
@@ -831,7 +847,11 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach
// Create response while holding locks.
final GridNearLockResponse<K, V> resp = createLockReply(nearNode,
- entries, req, t, t.xidVersion(), e);
+ entries,
+ req,
+ t,
+ t.xidVersion(),
+ e);
if (resp.error() == null && t.onePhaseCommit()) {
assert t.implicit();
@@ -880,8 +900,12 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach
else if (!b)
e = new GridCacheLockTimeoutException(req.version());
- GridNearLockResponse<K, V> res = createLockReply(nearNode, entries, req,
- null, mappedVer, e);
+ GridNearLockResponse<K, V> res = createLockReply(nearNode,
+ entries,
+ req,
+ null,
+ mappedVer,
+ e);
sendLockReply(nearNode, null, req, res);
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5b142ddd/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java
index d20a4a3..b752178 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java
@@ -461,12 +461,15 @@ public abstract class GridDhtTxLocalAdapter<K, V> extends IgniteTxLocalAdapter<K
}
/**
+ * @param cacheCtx Cache context.
* @param entries Entries to lock.
* @param writeEntries Write entries for implicit transactions mapped to one node.
+ * @param onePhaseCommit One phase commit flag.
* @param drVers DR versions.
* @param msgId Message ID.
* @param implicit Implicit flag.
* @param read Read flag.
+ * @param accessTtl TTL for read operation.
* @return Lock future.
*/
IgniteFuture<GridCacheReturn<V>> lockAllAsync(
@@ -477,7 +480,8 @@ public abstract class GridDhtTxLocalAdapter<K, V> extends IgniteTxLocalAdapter<K
GridCacheVersion[] drVers,
long msgId,
boolean implicit,
- final boolean read
+ final boolean read,
+ long accessTtl
) {
try {
checkValid();
@@ -547,6 +551,8 @@ public abstract class GridDhtTxLocalAdapter<K, V> extends IgniteTxLocalAdapter<K
txEntry.drExpireTime(w.drExpireTime());
txEntry.expiry(w.expiry());
}
+ else if (read)
+ txEntry.ttl(accessTtl);
txEntry.cached(cached, txEntry.keyBytes());
@@ -573,7 +579,7 @@ public abstract class GridDhtTxLocalAdapter<K, V> extends IgniteTxLocalAdapter<K
if (log.isDebugEnabled())
log.debug("Lock keys: " + passedKeys);
- return obtainLockAsync(cacheCtx, ret, passedKeys, read, skipped, null);
+ return obtainLockAsync(cacheCtx, ret, passedKeys, read, skipped, accessTtl, null);
}
catch (IgniteCheckedException e) {
setRollbackOnly();
@@ -588,6 +594,7 @@ public abstract class GridDhtTxLocalAdapter<K, V> extends IgniteTxLocalAdapter<K
* @param passedKeys Passed keys.
* @param read {@code True} if read.
* @param skipped Skipped keys.
+ * @param accessTtl TTL for read operation.
* @param filter Entry write filter.
* @return Future for lock acquisition.
*/
@@ -597,6 +604,7 @@ public abstract class GridDhtTxLocalAdapter<K, V> extends IgniteTxLocalAdapter<K
final Collection<? extends K> passedKeys,
final boolean read,
final Set<K> skipped,
+ final long accessTtl,
@Nullable final IgnitePredicate<GridCacheEntry<K, V>>[] filter) {
if (log.isDebugEnabled())
log.debug("Before acquiring transaction lock on keys [passedKeys=" + passedKeys + ", skipped=" +
@@ -614,6 +622,7 @@ public abstract class GridDhtTxLocalAdapter<K, V> extends IgniteTxLocalAdapter<K
read,
/*retval*/false,
isolation,
+ accessTtl,
CU.<K, V>empty());
return new GridEmbeddedFuture<>(
@@ -631,7 +640,8 @@ public abstract class GridDhtTxLocalAdapter<K, V> extends IgniteTxLocalAdapter<K
ret,
/*remove*/false,
/*retval*/false,
- /*read*/true,
+ /*read*/read,
+ accessTtl,
filter == null ? CU.<K, V>empty() : filter);
return ret;
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5b142ddd/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTxRemote.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTxRemote.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTxRemote.java
index a545bc4..97ec1af 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTxRemote.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTxRemote.java
@@ -281,6 +281,7 @@ public class GridDhtTxRemote<K, V> extends GridDistributedTxRemoteAdapter<K, V>
* @param valBytes Value bytes.
* @param drVer Data center replication version.
* @param clos Transform closures.
+ * @param ttl TTL.
*/
public void addWrite(GridCacheContext<K, V> cacheCtx,
GridCacheOperation op,
@@ -289,7 +290,8 @@ public class GridDhtTxRemote<K, V> extends GridDistributedTxRemoteAdapter<K, V>
@Nullable V val,
@Nullable byte[] valBytes,
@Nullable Collection<IgniteClosure<V, V>> clos,
- @Nullable GridCacheVersion drVer) {
+ @Nullable GridCacheVersion drVer,
+ long ttl) {
checkInternal(key);
if (isSystemInvalidate())
@@ -301,7 +303,7 @@ public class GridDhtTxRemote<K, V> extends GridDistributedTxRemoteAdapter<K, V>
this,
op,
val,
- -1L,
+ ttl,
-1L,
cached,
drVer);
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5b142ddd/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
index 65099ef..0b5d638 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
@@ -576,6 +576,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
boolean isRead,
boolean retval,
@Nullable IgniteTxIsolation isolation,
+ long accessTtl,
IgnitePredicate<GridCacheEntry<K, V>>[] filter) {
return new FinishedLockFuture(new UnsupportedOperationException("Locks are not supported for " +
"GridCacheAtomicityMode.ATOMIC mode (use GridCacheAtomicityMode.TRANSACTIONAL instead)"));
@@ -734,8 +735,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
long topVer = ctx.affinity().affinityTopologyVersion();
- final GetExpiryPolicy expiry =
- GetExpiryPolicy.forPolicy(expiryPlc != null ? expiryPlc : ctx.expiry());
+ final GetExpiryPolicy expiry = accessExpiryPolicy(expiryPlc);
// Optimisation: try to resolve value locally and escape 'get future' creation.
if (!reload && !forcePrimary) {
@@ -2741,9 +2741,9 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
/** {@inheritDoc} */
@Override public void ttlUpdated(Object key,
- byte[] keyBytes,
- GridCacheVersion ver,
- @Nullable Collection<UUID> rdrs) {
+ byte[] keyBytes,
+ GridCacheVersion ver,
+ @Nullable Collection<UUID> rdrs) {
if (entries == null)
entries = new HashMap<>();
@@ -2767,6 +2767,15 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
}
/** {@inheritDoc} */
+ @Override public void reset() {
+ if (entries != null)
+ entries.clear();
+
+ if (rdrsMap != null)
+ rdrsMap.clear();
+ }
+
+ /** {@inheritDoc} */
@Nullable @Override public Map<Object, IgniteBiTuple<byte[], GridCacheVersion>> entries() {
return entries;
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5b142ddd/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
index 7534fd5..9878768 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
@@ -184,7 +184,7 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte
taskName,
deserializePortable,
filter,
- prj != null ? prj.expiry() : null);
+ accessExpiryPolicy(prj != null ? prj.expiry() : null));
}
/** {@inheritDoc} */
@@ -236,15 +236,15 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte
String taskName,
boolean deserializePortable,
@Nullable IgnitePredicate<GridCacheEntry<K, V>>[] filter,
- @Nullable ExpiryPolicy expiryPlc) {
+ @Nullable IgniteCacheExpiryPolicy expiryPlc) {
if (keys == null || keys.isEmpty())
return new GridFinishedFuture<>(ctx.kernalContext(), Collections.<K, V>emptyMap());
if (keyCheck)
validateCacheKeys(keys);
- final GetExpiryPolicy expiry =
- GetExpiryPolicy.forPolicy(expiryPlc != null ? expiryPlc : ctx.expiry());
+ if (expiryPlc == null)
+ expiryPlc = accessExpiryPolicy(ctx.expiry());
// Optimisation: try to resolve value locally and escape 'get future' creation.
if (!reload && !forcePrimary) {
@@ -276,7 +276,7 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte
null,
taskName,
filter,
- expiry);
+ expiryPlc);
// Entry was not in memory or in swap, so we remove it from cache.
if (v == null) {
@@ -325,14 +325,14 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte
}
if (success) {
- sendTtlUpdateRequest(expiry);
+ sendTtlUpdateRequest(expiryPlc);
return ctx.wrapCloneMap(new GridFinishedFuture<>(ctx.kernalContext(), locVals));
}
}
- if (expiry != null)
- expiry.reset();
+ if (expiryPlc != null)
+ expiryPlc.reset();
// Either reload or not all values are available locally.
GridPartitionedGetFuture<K, V> fut = new GridPartitionedGetFuture<>(ctx,
@@ -344,7 +344,7 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte
subjId,
taskName,
deserializePortable,
- expiry);
+ expiryPlc);
fut.init();
@@ -363,13 +363,20 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte
boolean isRead,
boolean retval,
@Nullable IgniteTxIsolation isolation,
+ long accessTtl,
IgnitePredicate<GridCacheEntry<K, V>>[] filter) {
assert tx == null || tx instanceof GridNearTxLocal;
GridNearTxLocal<K, V> txx = (GridNearTxLocal<K, V>)tx;
- GridDhtColocatedLockFuture<K, V> fut = new GridDhtColocatedLockFuture<>(ctx, keys, txx, isRead, retval,
- timeout, filter);
+ GridDhtColocatedLockFuture<K, V> fut = new GridDhtColocatedLockFuture<>(ctx,
+ keys,
+ txx,
+ isRead,
+ retval,
+ timeout,
+ accessTtl,
+ filter);
// Future will be added to mvcc only if it was mapped to remote nodes.
fut.map();
@@ -570,6 +577,7 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte
}
/**
+ * @param cacheCtx Cache context.
* @param tx Started colocated transaction (if any).
* @param threadId Thread ID.
* @param ver Lock version.
@@ -577,6 +585,7 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte
* @param keys Mapped keys.
* @param txRead Tx read.
* @param timeout Lock timeout.
+ * @param accessTtl TTL for read operation.
* @param filter filter Optional filter.
* @return Lock future.
*/
@@ -589,6 +598,7 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte
final Collection<K> keys,
final boolean txRead,
final long timeout,
+ final long accessTtl,
@Nullable final IgnitePredicate<GridCacheEntry<K, V>>[] filter
) {
assert keys != null;
@@ -601,7 +611,16 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte
// Check for exception.
keyFut.get();
- return lockAllAsync0(cacheCtx, tx, threadId, ver, topVer, keys, txRead, timeout, filter);
+ return lockAllAsync0(cacheCtx,
+ tx,
+ threadId,
+ ver,
+ topVer,
+ keys,
+ txRead,
+ timeout,
+ accessTtl,
+ filter);
}
catch (IgniteCheckedException e) {
return new GridFinishedFuture<>(ctx.kernalContext(), e);
@@ -614,7 +633,16 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte
if (exx != null)
return new GridDhtFinishedFuture<>(ctx.kernalContext(), exx);
- return lockAllAsync0(cacheCtx, tx, threadId, ver, topVer, keys, txRead, timeout, filter);
+ return lockAllAsync0(cacheCtx,
+ tx,
+ threadId,
+ ver,
+ topVer,
+ keys,
+ txRead,
+ timeout,
+ accessTtl,
+ filter);
}
},
ctx.kernalContext());
@@ -622,6 +650,7 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte
}
/**
+ * @param cacheCtx Cache context.
* @param tx Started colocated transaction (if any).
* @param threadId Thread ID.
* @param ver Lock version.
@@ -629,19 +658,35 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte
* @param keys Mapped keys.
* @param txRead Tx read.
* @param timeout Lock timeout.
+ * @param accessTtl TTL for read operation.
* @param filter filter Optional filter.
* @return Lock future.
*/
private IgniteFuture<Exception> lockAllAsync0(
GridCacheContext<K, V> cacheCtx,
- @Nullable final GridNearTxLocal<K, V> tx, long threadId,
- final GridCacheVersion ver, final long topVer, final Collection<K> keys, final boolean txRead,
- final long timeout, @Nullable final IgnitePredicate<GridCacheEntry<K, V>>[] filter) {
+ @Nullable final GridNearTxLocal<K, V> tx,
+ long threadId,
+ final GridCacheVersion ver,
+ final long topVer,
+ final Collection<K> keys,
+ final boolean txRead,
+ final long timeout,
+ final long accessTtl,
+ @Nullable final IgnitePredicate<GridCacheEntry<K, V>>[] filter) {
int cnt = keys.size();
if (tx == null) {
- GridDhtLockFuture<K, V> fut = new GridDhtLockFuture<>(ctx, ctx.localNodeId(), ver, topVer, cnt, txRead,
- timeout, tx, threadId, filter);
+ GridDhtLockFuture<K, V> fut = new GridDhtLockFuture<>(ctx,
+ ctx.localNodeId(),
+ ver,
+ topVer,
+ cnt,
+ txRead,
+ timeout,
+ tx,
+ threadId,
+ accessTtl,
+ filter);
// Add before mapping.
if (!ctx.mvcc().addFuture(fut))
@@ -704,7 +749,11 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte
if (log.isDebugEnabled())
log.debug("Performing colocated lock [tx=" + tx + ", keys=" + keys + ']');
- IgniteFuture<GridCacheReturn<V>> txFut = tx.lockAllAsync(cacheCtx, keys, tx.implicit(), txRead);
+ IgniteFuture<GridCacheReturn<V>> txFut = tx.lockAllAsync(cacheCtx,
+ keys,
+ tx.implicit(),
+ txRead,
+ accessTtl);
return new GridDhtEmbeddedFuture<>(
ctx.kernalContext(),
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5b142ddd/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
index 1a8f8d1..8c8a8e5 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
@@ -101,6 +101,9 @@ public final class GridDhtColocatedLockFuture<K, V> extends GridCompoundIdentity
/** Trackable flag (here may be non-volatile). */
private boolean trackable;
+ /** TTL for read operation. */
+ private long accessTtl;
+
/**
* Empty constructor required by {@link Externalizable}.
*/
@@ -115,6 +118,7 @@ public final class GridDhtColocatedLockFuture<K, V> extends GridCompoundIdentity
* @param read Read flag.
* @param retval Flag to return value or not.
* @param timeout Lock acquisition timeout.
+ * @param accessTtl TTL for read operation.
* @param filter Filter.
*/
public GridDhtColocatedLockFuture(
@@ -124,9 +128,10 @@ public final class GridDhtColocatedLockFuture<K, V> extends GridCompoundIdentity
boolean read,
boolean retval,
long timeout,
+ long accessTtl,
IgnitePredicate<GridCacheEntry<K, V>>[] filter) {
super(cctx.kernalContext(), CU.boolReducer());
- assert cctx != null;
+
assert keys != null;
this.cctx = cctx;
@@ -135,6 +140,7 @@ public final class GridDhtColocatedLockFuture<K, V> extends GridCompoundIdentity
this.read = read;
this.retval = retval;
this.timeout = timeout;
+ this.accessTtl = accessTtl;
this.filter = filter;
threadId = tx == null ? Thread.currentThread().getId() : tx.threadId();
@@ -710,7 +716,8 @@ public final class GridDhtColocatedLockFuture<K, V> extends GridCompoundIdentity
inTx() ? tx.groupLockKey() : null,
inTx() && tx.partitionLock(),
inTx() ? tx.subjectId() : null,
- inTx() ? tx.taskNameHash() : 0);
+ inTx() ? tx.taskNameHash() : 0,
+ read ? accessTtl : -1L);
mapping.request(req);
}
@@ -878,8 +885,16 @@ public final class GridDhtColocatedLockFuture<K, V> extends GridCompoundIdentity
if (log.isDebugEnabled())
log.debug("Before locally locking keys : " + keys);
- IgniteFuture<Exception> fut = cctx.colocated().lockAllAsync(cctx, tx, threadId, lockVer,
- topVer, keys, read, timeout, filter);
+ IgniteFuture<Exception> fut = cctx.colocated().lockAllAsync(cctx,
+ tx,
+ threadId,
+ lockVer,
+ topVer,
+ keys,
+ read,
+ timeout,
+ accessTtl,
+ filter);
// Add new future.
add(new GridEmbeddedFuture<>(
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5b142ddd/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearAtomicCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearAtomicCache.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearAtomicCache.java
index fa19607..d7e32b3 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearAtomicCache.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearAtomicCache.java
@@ -631,6 +631,7 @@ public class GridNearAtomicCache<K, V> extends GridNearCacheAdapter<K, V> {
boolean isRead,
boolean retval,
@Nullable IgniteTxIsolation isolation,
+ long accessTtl,
IgnitePredicate<GridCacheEntry<K, V>>[] filter) {
return dht.lockAllAsync(keys, timeout, filter);
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5b142ddd/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearCacheAdapter.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearCacheAdapter.java
index 83ac913..f3994bf 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearCacheAdapter.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearCacheAdapter.java
@@ -279,8 +279,7 @@ public abstract class GridNearCacheAdapter<K, V> extends GridDistributedCacheAda
IgniteTxLocalEx<K, V> txx = (tx != null && tx.local()) ? (IgniteTxLocalEx<K, V>)tx : null;
- final GetExpiryPolicy expiry =
- GetExpiryPolicy.forPolicy(expiryPlc != null ? expiryPlc : ctx.expiry());
+ final GetExpiryPolicy expiry = accessExpiryPolicy(expiryPlc);
GridNearGetFuture<K, V> fut = new GridNearGetFuture<>(ctx,
keys,
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5b142ddd/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearLockFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearLockFuture.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearLockFuture.java
index 2653fd0..f1f9a7f 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearLockFuture.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearLockFuture.java
@@ -111,6 +111,9 @@ public final class GridNearLockFuture<K, V> extends GridCompoundIdentityFuture<B
@GridToStringExclude
private List<GridDistributedCacheEntry<K, V>> entries;
+ /** TTL for read operation. */
+ private long accessTtl;
+
/**
* Empty constructor required by {@link Externalizable}.
*/
@@ -125,6 +128,7 @@ public final class GridNearLockFuture<K, V> extends GridCompoundIdentityFuture<B
* @param read Read flag.
* @param retval Flag to return value or not.
* @param timeout Lock acquisition timeout.
+ * @param accessTtl TTL for read operation.
* @param filter Filter.
*/
public GridNearLockFuture(
@@ -134,9 +138,10 @@ public final class GridNearLockFuture<K, V> extends GridCompoundIdentityFuture<B
boolean read,
boolean retval,
long timeout,
+ long accessTtl,
IgnitePredicate<GridCacheEntry<K, V>>[] filter) {
super(cctx.kernalContext(), CU.boolReducer());
- assert cctx != null;
+
assert keys != null;
this.cctx = cctx;
@@ -145,6 +150,7 @@ public final class GridNearLockFuture<K, V> extends GridCompoundIdentityFuture<B
this.read = read;
this.retval = retval;
this.timeout = timeout;
+ this.accessTtl = accessTtl;
this.filter = filter;
threadId = tx == null ? Thread.currentThread().getId() : tx.threadId();
@@ -852,7 +858,8 @@ public final class GridNearLockFuture<K, V> extends GridCompoundIdentityFuture<B
inTx() ? tx.groupLockKey() : null,
inTx() && tx.partitionLock(),
inTx() ? tx.subjectId() : null,
- inTx() ? tx.taskNameHash() : 0);
+ inTx() ? tx.taskNameHash() : 0,
+ read ? accessTtl : -1L);
mapping.request(req);
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5b142ddd/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearLockRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearLockRequest.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearLockRequest.java
index 0c7bd8c..6607a66 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearLockRequest.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearLockRequest.java
@@ -74,6 +74,9 @@ public class GridNearLockRequest<K, V> extends GridDistributedLockRequest<K, V>
/** Sync commit flag. */
private boolean syncCommit;
+ /** TTL for read operation. */
+ private long accessTtl;
+
/**
* Empty constructor required for {@link Externalizable}.
*/
@@ -82,6 +85,7 @@ public class GridNearLockRequest<K, V> extends GridDistributedLockRequest<K, V>
}
/**
+ * @param cacheId Cache ID.
* @param topVer Topology version.
* @param nodeId Node ID.
* @param threadId Thread ID.
@@ -96,8 +100,12 @@ public class GridNearLockRequest<K, V> extends GridDistributedLockRequest<K, V>
* @param timeout Lock timeout.
* @param keyCnt Number of keys.
* @param txSize Expected transaction size.
+ * @param syncCommit Synchronous commit flag.
* @param grpLockKey Group lock key if this is a group-lock transaction.
* @param partLock If partition is locked.
+ * @param subjId Subject ID.
+ * @param taskNameHash Task name hash code.
+ * @param accessTtl TTL for read operation.
*/
public GridNearLockRequest(
int cacheId,
@@ -119,7 +127,8 @@ public class GridNearLockRequest<K, V> extends GridDistributedLockRequest<K, V>
@Nullable IgniteTxKey grpLockKey,
boolean partLock,
@Nullable UUID subjId,
- int taskNameHash
+ int taskNameHash,
+ long accessTtl
) {
super(
cacheId,
@@ -146,6 +155,7 @@ public class GridNearLockRequest<K, V> extends GridDistributedLockRequest<K, V>
this.syncCommit = syncCommit;
this.subjId = subjId;
this.taskNameHash = taskNameHash;
+ this.accessTtl = accessTtl;
dhtVers = new GridCacheVersion[keyCnt];
}
@@ -291,6 +301,13 @@ public class GridNearLockRequest<K, V> extends GridDistributedLockRequest<K, V>
return true;
}
+ /**
+ * @return TTL for read operation.
+ */
+ public long accessTtl() {
+ return accessTtl;
+ }
+
/** {@inheritDoc} */
@Override public void prepareMarshal(GridCacheSharedContext<K, V> ctx) throws IgniteCheckedException {
super.prepareMarshal(ctx);
@@ -335,6 +352,7 @@ public class GridNearLockRequest<K, V> extends GridDistributedLockRequest<K, V>
_clone.taskNameHash = taskNameHash;
_clone.hasTransforms = hasTransforms;
_clone.syncCommit = syncCommit;
+ _clone.accessTtl = accessTtl;
}
/** {@inheritDoc} */
@@ -460,6 +478,13 @@ public class GridNearLockRequest<K, V> extends GridDistributedLockRequest<K, V>
return false;
commState.idx++;
+
+ case 35:
+ if (!commState.putLong(accessTtl))
+ return false;
+
+ commState.idx++;
+
}
return true;
@@ -607,6 +632,14 @@ public class GridNearLockRequest<K, V> extends GridDistributedLockRequest<K, V>
syncCommit = commState.getBoolean();
commState.idx++;
+
+ case 35:
+ if (buf.remaining() < 8)
+ return false;
+
+ accessTtl = commState.getLong();
+
+ commState.idx++;
}
return true;
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5b142ddd/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearTransactionalCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearTransactionalCache.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearTransactionalCache.java
index f09d5c2..839a5e9 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearTransactionalCache.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearTransactionalCache.java
@@ -130,12 +130,14 @@ public class GridNearTransactionalCache<K, V> extends GridNearCacheAdapter<K, V>
* @param keys Keys to load.
* @param filter Filter.
* @param deserializePortable Deserialize portable flag.
+ * @param expiryPlc Expiry policy.
* @return Future.
*/
IgniteFuture<Map<K, V>> txLoadAsync(GridNearTxLocal<K, V> tx,
@Nullable Collection<? extends K> keys,
@Nullable IgnitePredicate<GridCacheEntry<K, V>>[] filter,
- boolean deserializePortable) {
+ boolean deserializePortable,
+ @Nullable IgniteCacheExpiryPolicy expiryPlc) {
assert tx != null;
GridNearGetFuture<K, V> fut = new GridNearGetFuture<>(ctx,
@@ -147,7 +149,7 @@ public class GridNearTransactionalCache<K, V> extends GridNearCacheAdapter<K, V>
CU.subjectId(tx, ctx.shared()),
tx.resolveTaskName(),
deserializePortable,
- null);
+ expiryPlc);
// init() will register future for responses if it has remote mappings.
fut.init();
@@ -393,11 +395,23 @@ public class GridNearTransactionalCache<K, V> extends GridNearCacheAdapter<K, V>
}
/** {@inheritDoc} */
- @Override protected IgniteFuture<Boolean> lockAllAsync(Collection<? extends K> keys, long timeout,
- IgniteTxLocalEx<K, V> tx, boolean isInvalidate, boolean isRead, boolean retval,
- IgniteTxIsolation isolation, IgnitePredicate<GridCacheEntry<K, V>>[] filter) {
- GridNearLockFuture<K, V> fut = new GridNearLockFuture<>(ctx, keys, (GridNearTxLocal<K, V>)tx, isRead,
- retval, timeout, filter);
+ @Override protected IgniteFuture<Boolean> lockAllAsync(Collection<? extends K> keys,
+ long timeout,
+ IgniteTxLocalEx<K, V> tx,
+ boolean isInvalidate,
+ boolean isRead,
+ boolean retval,
+ IgniteTxIsolation isolation,
+ long accessTtl,
+ IgnitePredicate<GridCacheEntry<K, V>>[] filter) {
+ GridNearLockFuture<K, V> fut = new GridNearLockFuture<>(ctx,
+ keys,
+ (GridNearTxLocal<K, V>)tx,
+ isRead,
+ retval,
+ timeout,
+ accessTtl,
+ filter);
if (!ctx.mvcc().addFuture(fut))
throw new IllegalStateException("Duplicate future ID: " + fut);
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5b142ddd/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearTxLocal.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearTxLocal.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearTxLocal.java
index 675933e..0d90831 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearTxLocal.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearTxLocal.java
@@ -27,6 +27,7 @@ import org.gridgain.grid.util.typedef.internal.*;
import org.jdk8.backport.*;
import org.jetbrains.annotations.*;
+import javax.cache.expiry.*;
import java.io.*;
import java.util.*;
import java.util.concurrent.*;
@@ -71,6 +72,9 @@ public class GridNearTxLocal<K, V> extends GridDhtTxLocalAdapter<K, V> {
/** True if transaction contains colocated cache entries mapped to local node. */
private boolean colocatedLocallyMapped;
+ /** Info for entries accessed locally in optimistic transaction. */
+ private Map<IgniteTxKey, IgniteCacheExpiryPolicy> accessMap;
+
/**
* Empty constructor required for {@link Externalizable}.
*/
@@ -86,9 +90,13 @@ public class GridNearTxLocal<K, V> extends GridDhtTxLocalAdapter<K, V> {
* @param concurrency Concurrency.
* @param isolation Isolation.
* @param timeout Timeout.
+ * @param invalidate
+ * @param storeEnabled
* @param txSize Transaction size.
* @param grpLockKey Group lock key if this is a group lock transaction.
* @param partLock {@code True} if this is a group-lock transaction and the whole partition should be locked.
+ * @param subjId Subject ID.
+ * @param taskNameHash Task name hash code.
*/
public GridNearTxLocal(
GridCacheSharedContext<K, V> ctx,
@@ -263,12 +271,17 @@ public class GridNearTxLocal<K, V> extends GridDhtTxLocalAdapter<K, V> {
/** {@inheritDoc} */
@Override public IgniteFuture<Boolean> loadMissing(
GridCacheContext<K, V> cacheCtx,
- boolean async, final Collection<? extends K> keys,
+ boolean async,
+ final Collection<? extends K> keys,
boolean deserializePortable,
final IgniteBiInClosure<K, V> c
) {
if (cacheCtx.isNear()) {
- return cacheCtx.nearTx().txLoadAsync(this, keys, CU.<K, V>empty(), deserializePortable).chain(new C1<IgniteFuture<Map<K, V>>, Boolean>() {
+ return cacheCtx.nearTx().txLoadAsync(this,
+ keys,
+ CU.<K, V>empty(),
+ deserializePortable,
+ accessPolicy(cacheCtx, keys)).chain(new C1<IgniteFuture<Map<K, V>>, Boolean>() {
@Override public Boolean apply(IgniteFuture<Map<K, V>> f) {
try {
Map<K, V> map = f.get();
@@ -289,9 +302,15 @@ public class GridNearTxLocal<K, V> extends GridDhtTxLocalAdapter<K, V> {
});
}
else if (cacheCtx.isColocated()) {
- return cacheCtx.colocated().loadAsync(keys, /*reload*/false, /*force primary*/false, topologyVersion(),
- CU.subjectId(this, cctx), resolveTaskName(), deserializePortable, null, null)
- .chain(new C1<IgniteFuture<Map<K, V>>, Boolean>() {
+ return cacheCtx.colocated().loadAsync(keys,
+ /*reload*/false,
+ /*force primary*/false,
+ topologyVersion(),
+ CU.subjectId(this, cctx),
+ resolveTaskName(),
+ deserializePortable,
+ null,
+ accessPolicy(cacheCtx, keys)).chain(new C1<IgniteFuture<Map<K, V>>, Boolean>() {
@Override public Boolean apply(IgniteFuture<Map<K, V>> f) {
try {
Map<K, V> map = f.get();
@@ -529,8 +548,11 @@ public class GridNearTxLocal<K, V> extends GridDhtTxLocalAdapter<K, V> {
* @param committedVers Committed versions.
* @param rolledbackVers Rolled back versions.
*/
- void readyNearLocks(GridDistributedTxMapping<K, V> mapping, Collection<GridCacheVersion> pendingVers,
- Collection<GridCacheVersion> committedVers, Collection<GridCacheVersion> rolledbackVers) {
+ void readyNearLocks(GridDistributedTxMapping<K, V> mapping,
+ Collection<GridCacheVersion> pendingVers,
+ Collection<GridCacheVersion> committedVers,
+ Collection<GridCacheVersion> rolledbackVers)
+ {
Collection<IgniteTxEntry<K, V>> entries = groupLock() ?
Collections.singletonList(groupLockEntry()) :
F.concat(false, mapping.reads(), mapping.writes());
@@ -1044,8 +1066,11 @@ public class GridNearTxLocal<K, V> extends GridDhtTxLocalAdapter<K, V> {
}
/** {@inheritDoc} */
- public IgniteFuture<GridCacheReturn<V>> lockAllAsync(GridCacheContext<K, V> cacheCtx, final Collection<? extends K> keys,
- boolean implicit, boolean read) {
+ public IgniteFuture<GridCacheReturn<V>> lockAllAsync(GridCacheContext<K, V> cacheCtx,
+ final Collection<? extends K> keys,
+ boolean implicit,
+ boolean read,
+ long accessTtl) {
assert pessimistic();
try {
@@ -1066,7 +1091,14 @@ public class GridNearTxLocal<K, V> extends GridDhtTxLocalAdapter<K, V> {
log.debug("Before acquiring transaction lock on keys: " + keys);
IgniteFuture<Boolean> fut = cacheCtx.colocated().lockAllAsyncInternal(keys,
- lockTimeout(), this, isInvalidate(), read, /*retval*/false, isolation, CU.<K, V>empty());
+ lockTimeout(),
+ this,
+ isInvalidate(),
+ read,
+ /*retval*/false,
+ isolation,
+ accessTtl,
+ CU.<K, V>empty());
return new GridEmbeddedFuture<>(
fut,
@@ -1136,6 +1168,70 @@ public class GridNearTxLocal<K, V> extends GridDhtTxLocalAdapter<K, V> {
}
/** {@inheritDoc} */
+ @Override protected IgniteCacheExpiryPolicy accessPolicy(GridCacheContext ctx,
+ IgniteTxKey key,
+ @Nullable ExpiryPolicy expiryPlc)
+ {
+ assert optimistic();
+
+ if (expiryPlc == null)
+ expiryPlc = ctx.expiry();
+
+ if (expiryPlc != null) {
+ IgniteCacheExpiryPolicy plc = ctx.cache().accessExpiryPolicy(expiryPlc);
+
+ if (plc != null) {
+ if (accessMap == null)
+ accessMap = new HashMap<>();
+
+ accessMap.put(key, plc);
+ }
+
+ return plc;
+ }
+
+ return null;
+ }
+
+ /**
+ * @param cacheCtx Cache context.
+ * @param keys Keys.
+ * @return Expiry policy.
+ */
+ private IgniteCacheExpiryPolicy accessPolicy(GridCacheContext<K, V> cacheCtx, Collection<? extends K> keys) {
+ if (accessMap != null) {
+ for (Map.Entry<IgniteTxKey, IgniteCacheExpiryPolicy> e : accessMap.entrySet()) {
+ if (e.getKey().cacheId() == cacheCtx.cacheId() && keys.contains(e.getKey().key()))
+ return e.getValue();
+ }
+ }
+
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void close() throws IgniteCheckedException {
+ super.close();
+
+ if (accessMap != null) {
+ assert optimistic();
+
+ for (Map.Entry<IgniteTxKey, IgniteCacheExpiryPolicy> e : accessMap.entrySet()) {
+ if (e.getValue().entries() != null) {
+ GridCacheContext cctx0 = cctx.cacheContext(e.getKey().cacheId());
+
+ if (cctx0.isNear())
+ cctx0.near().dht().sendTtlUpdateRequest(e.getValue());
+ else
+ cctx0.dht().sendTtlUpdateRequest(e.getValue());
+ }
+ }
+
+ accessMap = null;
+ }
+ }
+
+ /** {@inheritDoc} */
@Override public String toString() {
return S.toString(GridNearTxLocal.class, this, "mappings", mappings.keySet(), "super", super.toString());
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5b142ddd/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/local/GridLocalCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/local/GridLocalCache.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/local/GridLocalCache.java
index 9ea91b9..c461894 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/local/GridLocalCache.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/local/GridLocalCache.java
@@ -86,9 +86,14 @@ public class GridLocalCache<K, V> extends GridCacheAdapter<K, V> {
}
/** {@inheritDoc} */
- @Override public IgniteFuture<Boolean> txLockAsync(Collection<? extends K> keys, long timeout,
- IgniteTxLocalEx<K, V> tx, boolean isRead,
- boolean retval, IgniteTxIsolation isolation, boolean invalidate,
+ @Override public IgniteFuture<Boolean> txLockAsync(Collection<? extends K> keys,
+ long timeout,
+ IgniteTxLocalEx<K, V> tx,
+ boolean isRead,
+ boolean retval,
+ IgniteTxIsolation isolation,
+ boolean invalidate,
+ long accessTtl,
IgnitePredicate<GridCacheEntry<K, V>>[] filter) {
return lockAllAsync(keys, timeout, tx, filter);
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5b142ddd/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/local/GridLocalTx.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/local/GridLocalTx.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/local/GridLocalTx.java
index 0226ff2..86fed36 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/local/GridLocalTx.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/local/GridLocalTx.java
@@ -173,6 +173,7 @@ class GridLocalTx<K, V> extends IgniteTxLocalAdapter<K, V> {
rollbackAsync().get();
}
+ /** {@inheritDoc} */
@Override public IgniteFuture<IgniteTx> rollbackAsync() {
try {
state(ROLLING_BACK);
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5b142ddd/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/local/atomic/GridLocalAtomicCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/local/atomic/GridLocalAtomicCache.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/local/atomic/GridLocalAtomicCache.java
index 88b6cfc..eaf0173 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/local/atomic/GridLocalAtomicCache.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/local/atomic/GridLocalAtomicCache.java
@@ -594,8 +594,7 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> {
if (keyCheck)
validateCacheKeys(keys);
- final GetExpiryPolicy expiry =
- GetExpiryPolicy.forPolicy(expiryPlc != null ? expiryPlc : ctx.expiry());
+ final GetExpiryPolicy expiry = accessExpiryPolicy(expiryPlc);
boolean success = true;
@@ -1287,6 +1286,7 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> {
boolean retval,
IgniteTxIsolation isolation,
boolean invalidate,
+ long accessTtl,
IgnitePredicate<GridCacheEntry<K, V>>[] filter) {
return new GridFinishedFutureEx<>(new UnsupportedOperationException("Locks are not supported for " +
"GridCacheAtomicityMode.ATOMIC mode (use GridCacheAtomicityMode.TRANSACTIONAL instead)"));
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5b142ddd/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/transactions/IgniteTxHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/transactions/IgniteTxHandler.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/transactions/IgniteTxHandler.java
index 8d3e6a0..1d4b5d7 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/transactions/IgniteTxHandler.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/transactions/IgniteTxHandler.java
@@ -1149,8 +1149,15 @@ public class IgniteTxHandler<K, V> {
"(transaction has been completed): " + req.version());
}
- tx.addWrite(cacheCtx, txEntry.op(), txEntry.txKey(), txEntry.keyBytes(), txEntry.value(),
- txEntry.valueBytes(), txEntry.transformClosures(), txEntry.drVersion());
+ tx.addWrite(cacheCtx,
+ txEntry.op(),
+ txEntry.txKey(),
+ txEntry.keyBytes(),
+ txEntry.value(),
+ txEntry.valueBytes(),
+ txEntry.transformClosures(),
+ txEntry.drVersion(),
+ txEntry.ttl());
if (!marked) {
if (tx.markFinalizing(USER_FINISH))
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5b142ddd/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/transactions/IgniteTxLocalAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/transactions/IgniteTxLocalAdapter.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/transactions/IgniteTxLocalAdapter.java
index 8e742fd..27c1f44 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/transactions/IgniteTxLocalAdapter.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/transactions/IgniteTxLocalAdapter.java
@@ -410,6 +410,7 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V>
/**
* Gets cache entry for given key.
*
+ * @param cacheCtx Cache context.
* @param key Key.
* @return Cache entry.
*/
@@ -420,6 +421,7 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V>
/**
* Gets cache entry for given key and topology version.
*
+ * @param cacheCtx Cache context.
* @param key Key.
* @param topVer Topology version.
* @return Cache entry.
@@ -996,6 +998,12 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V>
}
}
+ protected IgniteCacheExpiryPolicy accessPolicy(GridCacheContext ctx,
+ IgniteTxKey key,
+ @Nullable ExpiryPolicy expiryPlc) {
+ return null;
+ }
+
/**
* Checks if there is a cached or swapped value for
* {@link #getAllAsync(GridCacheContext, Collection, GridCacheEntryEx, boolean, IgnitePredicate[])} method.
@@ -1156,6 +1164,9 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V>
V val = null;
if (!pessimistic() || readCommitted() || groupLock()) {
+ IgniteCacheExpiryPolicy accessPlc =
+ optimistic() ? accessPolicy(cacheCtx, txKey, expiryPlc) : null;
+
// This call will check for filter.
val = entry.innerGet(this,
/*swap*/true,
@@ -1169,7 +1180,7 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V>
null,
resolveTaskName(),
filter,
- null);
+ accessPlc);
if (val != null) {
V val0 = val;
@@ -1269,6 +1280,7 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V>
* Loads all missed keys for
* {@link #getAllAsync(GridCacheContext, Collection, GridCacheEntryEx, boolean, IgnitePredicate[])} method.
*
+ * @param cacheCtx Cache context.
* @param map Return map.
* @param missedMap Missed keys.
* @param redos Keys to retry.
@@ -1466,7 +1478,8 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V>
@Override public IgniteFuture<Map<K, V>> getAllAsync(
final GridCacheContext<K, V> cacheCtx,
Collection<? extends K> keys,
- @Nullable GridCacheEntryEx<K, V> cached, final boolean deserializePortable,
+ @Nullable GridCacheEntryEx<K, V> cached,
+ final boolean deserializePortable,
final IgnitePredicate<GridCacheEntry<K, V>>[] filter) {
if (F.isEmpty(keys))
return new GridFinishedFuture<>(cctx.kernalContext(), Collections.<K, V>emptyMap());
@@ -1486,10 +1499,12 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V>
GridCacheProjectionImpl<K, V> prj = cacheCtx.projectionPerCall();
+ ExpiryPolicy expiryPlc = prj != null ? prj.expiry() : null;
+
final Collection<K> lockKeys = enlistRead(cacheCtx,
keys,
cached,
- prj != null ? prj.expiry() : null,
+ expiryPlc,
retMap,
missed,
keysCnt,
@@ -1501,8 +1516,20 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V>
// Handle locks.
if (pessimistic() && !readCommitted() && !groupLock()) {
- IgniteFuture<Boolean> fut = cacheCtx.cache().txLockAsync(lockKeys, lockTimeout(), this, true, true,
- isolation, isInvalidate(), CU.<K, V>empty());
+ if (expiryPlc == null)
+ expiryPlc = cacheCtx.expiry();
+
+ long accessTtl = expiryPlc != null ? CU.toTtl(expiryPlc.getExpiryForAccess()) : -1L;
+
+ IgniteFuture<Boolean> fut = cacheCtx.cache().txLockAsync(lockKeys,
+ lockTimeout(),
+ this,
+ true,
+ true,
+ isolation,
+ isInvalidate(),
+ accessTtl,
+ CU.<K, V>empty());
PLC2<Map<K, V>> plc2 = new PLC2<Map<K, V>>() {
@Override public IgniteFuture<Map<K, V>> postLock() throws IgniteCheckedException {
@@ -2053,6 +2080,7 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V>
* @param rmv {@code True} if remove.
* @param retval Flag to return value or not.
* @param read {@code True} if read.
+ * @param accessTtl TTL for read operation.
* @param filter Filter to check entries.
* @return Failed keys.
* @throws IgniteCheckedException If error.
@@ -2067,6 +2095,7 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V>
boolean rmv,
boolean retval,
boolean read,
+ long accessTtl,
IgnitePredicate<GridCacheEntry<K, V>>[] filter
) throws IgniteCheckedException {
for (K k : keys) {
@@ -2158,10 +2187,14 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V>
}
if (updateTtl) {
- ExpiryPolicy expiryPlc = txEntry.expiry() != null ? txEntry.expiry() : cacheCtx.expiry();
+ if (!read) {
+ ExpiryPolicy expiryPlc = txEntry.expiry() != null ? txEntry.expiry() : cacheCtx.expiry();
- if (expiryPlc != null)
- txEntry.ttl(CU.toTtl(expiryPlc.getExpiryForAccess()));
+ if (expiryPlc != null)
+ txEntry.ttl(CU.toTtl(expiryPlc.getExpiryForAccess()));
+ }
+ else
+ txEntry.ttl(accessTtl);
}
break; // While.
@@ -2336,8 +2369,15 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V>
if (log.isDebugEnabled())
log.debug("Before acquiring transaction lock for put on keys: " + keys);
- IgniteFuture<Boolean> fut = cacheCtx.cache().txLockAsync(keys, lockTimeout(), this, false,
- retval, isolation, isInvalidate(), CU.<K, V>empty());
+ IgniteFuture<Boolean> fut = cacheCtx.cache().txLockAsync(keys,
+ lockTimeout(),
+ this,
+ false,
+ retval,
+ isolation,
+ isInvalidate(),
+ -1L,
+ CU.<K, V>empty());
PLC1<GridCacheReturn<V>> plc1 = new PLC1<GridCacheReturn<V>>(ret) {
@Override public GridCacheReturn<V> postLock(GridCacheReturn<V> ret) throws IgniteCheckedException {
@@ -2355,6 +2395,7 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V>
/*remove*/false,
retval,
/*read*/false,
+ -1L,
filter);
return ret;
@@ -2530,8 +2571,15 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V>
if (log.isDebugEnabled())
log.debug("Before acquiring transaction lock for remove on keys: " + passedKeys);
- IgniteFuture<Boolean> fut = cacheCtx.cache().txLockAsync(passedKeys, lockTimeout(), this, false, retval,
- isolation, isInvalidate(), CU.<K, V>empty());
+ IgniteFuture<Boolean> fut = cacheCtx.cache().txLockAsync(passedKeys,
+ lockTimeout(),
+ this,
+ false,
+ retval,
+ isolation,
+ isInvalidate(),
+ -1L,
+ CU.<K, V>empty());
PLC1<GridCacheReturn<V>> plc1 = new PLC1<GridCacheReturn<V>>(ret) {
@Override protected GridCacheReturn<V> postLock(GridCacheReturn<V> ret) throws IgniteCheckedException {
@@ -2547,6 +2595,7 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V>
/*remove*/true,
retval,
/*read*/false,
+ -1L,
filter);
return ret;
@@ -2681,7 +2730,14 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V>
// Lock group key in pessimistic mode only.
return pessimistic() ?
- cacheCtx.cache().txLockAsync(enlisted, lockTimeout(), this, false, false, isolation, isInvalidate(),
+ cacheCtx.cache().txLockAsync(enlisted,
+ lockTimeout(),
+ this,
+ false,
+ false,
+ isolation,
+ isInvalidate(),
+ -1L,
CU.<K, V>empty()) :
new GridFinishedFuture<>(cctx.kernalContext());
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5b142ddd/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheExpiryPolicyAbstractTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheExpiryPolicyAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheExpiryPolicyAbstractTest.java
index 6100479..a57da71 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheExpiryPolicyAbstractTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheExpiryPolicyAbstractTest.java
@@ -163,40 +163,58 @@ public abstract class IgniteCacheExpiryPolicyAbstractTest extends IgniteCacheAbs
}
if (atomicityMode() == TRANSACTIONAL) {
- for (final Integer key : keys()) {
- log.info("Test txGet [key=" + key + ']');
+ IgniteTxConcurrency[] txModes = {PESSIMISTIC};
- txGet(key);
+ for (IgniteTxConcurrency txMode : txModes) {
+ for (final Integer key : keys()) {
+ log.info("Test txGet [key=" + key + ", txMode=" + txMode + ']');
+
+ txGet(key, txMode);
+ }
}
- txGetAll();
+ for (IgniteTxConcurrency txMode : txModes) {
+ log.info("Test txGetAll [txMode=" + txMode + ']');
+
+ txGetAll(txMode);
+ }
}
}
/**
* @param key Key.
+ * @param txMode Transaction concurrency mode.
* @throws Exception If failed.
*/
- private void txGet(Integer key) throws Exception {
+ private void txGet(Integer key, IgniteTxConcurrency txMode) throws Exception {
IgniteCache<Integer, Integer> cache = jcache();
cache.put(key, 1);
checkTtl(key, 60_000L);
- try (IgniteTx tx = ignite(0).transactions().txStart()) {
+ try (IgniteTx tx = ignite(0).transactions().txStart(txMode, REPEATABLE_READ)) {
assertEquals((Integer)1, cache.get(key));
tx.commit();
}
checkTtl(key, 62_000L, true);
+
+ try (IgniteTx tx = ignite(0).transactions().txStart(txMode, REPEATABLE_READ)) {
+ assertEquals((Integer)1, cache.withExpiryPolicy(new TestPolicy(100L, 200L, 1000L)).get(key));
+
+ tx.commit();
+ }
+
+ checkTtl(key, 1000L, true);
}
/**
+ * @param txMode Transaction concurrency mode.
* @throws Exception If failed.
*/
- private void txGetAll() throws Exception {
+ private void txGetAll(IgniteTxConcurrency txMode) throws Exception {
IgniteCache<Integer, Integer> cache = jcache(0);
Map<Integer, Integer> vals = new HashMap<>();
@@ -206,7 +224,7 @@ public abstract class IgniteCacheExpiryPolicyAbstractTest extends IgniteCacheAbs
cache.putAll(vals);
- try (IgniteTx tx = ignite(0).transactions().txStart()) {
+ try (IgniteTx tx = ignite(0).transactions().txStart(txMode, REPEATABLE_READ)) {
assertEquals(vals, cache.getAll(vals.keySet()));
tx.commit();
@@ -214,6 +232,15 @@ public abstract class IgniteCacheExpiryPolicyAbstractTest extends IgniteCacheAbs
for (Integer key : vals.keySet())
checkTtl(key, 62_000L);
+
+ try (IgniteTx tx = ignite(0).transactions().txStart(txMode, REPEATABLE_READ)) {
+ assertEquals(vals, cache.withExpiryPolicy(new TestPolicy(100L, 200L, 1000L)).getAll(vals.keySet()));
+
+ tx.commit();
+ }
+
+ for (Integer key : vals.keySet())
+ checkTtl(key, 1000L);
}
/**