You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2014/12/16 16:12:21 UTC
[18/18] incubator-ignite git commit: # ignite-41
# ignite-41
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/688a2e71
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/688a2e71
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/688a2e71
Branch: refs/heads/ignite-41
Commit: 688a2e71509b9b3cebd148045d7388386e5ce0fb
Parents: e85a938
Author: sboikov <sb...@gridgain.com>
Authored: Tue Dec 16 13:06:26 2014 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Tue Dec 16 18:11:24 2014 +0300
----------------------------------------------------------------------
.../processors/cache/GridCacheAdapter.java | 3 +-
.../processors/cache/GridCacheIoManager.java | 2 +
.../processors/cache/GridCacheMapEntry.java | 31 +-
.../cache/GridCacheTxLocalAdapter.java | 58 +++-
.../cache/GridCacheUpdateAtomicResult.java | 2 +-
.../distributed/GridCacheExpiryPolicy.java | 74 ++++-
.../GridDistributedTxRemoteAdapter.java | 2 +
.../dht/atomic/GridDhtAtomicCache.java | 79 +++--
.../dht/atomic/GridDhtAtomicUpdateFuture.java | 30 +-
.../dht/atomic/GridDhtAtomicUpdateRequest.java | 172 ++++++----
.../atomic/GridNearAtomicUpdateResponse.java | 111 ++++++-
.../distributed/near/GridNearAtomicCache.java | 35 ++-
.../expiry/IgniteCacheExpiryPolicyTest.java | 310 +++++++++++++++++--
13 files changed, 749 insertions(+), 160 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/688a2e71/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheAdapter.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheAdapter.java
index f1f4436..39b7338 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheAdapter.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheAdapter.java
@@ -2043,8 +2043,7 @@ public abstract class GridCacheAdapter<K, V> extends GridMetadataAwareAdapter im
return tx.putAllAsync(ctx, F.t(key, val), true, cached, ttl, filter).get().value();
}
- @Override
- public String toString() {
+ @Override public String toString() {
return "put [key=" + key + ", val=" + val + ", filter=" + Arrays.toString(filter) + ']';
}
}));
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/688a2e71/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheIoManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheIoManager.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheIoManager.java
index a222c32..a50e461 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheIoManager.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheIoManager.java
@@ -200,6 +200,8 @@ public class GridCacheIoManager<K, V> extends GridCacheSharedManagerAdapter<K, V
unmarshall(nodeId, cacheMsg);
+ log.info("Message: " + cacheMsg);
+
if (cacheMsg.allowForStartup())
processMessage(nodeId, cacheMsg, c);
else {
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/688a2e71/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheMapEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheMapEntry.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheMapEntry.java
index 6483c8a..2c377e7 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheMapEntry.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheMapEntry.java
@@ -151,7 +151,7 @@ public abstract class GridCacheMapEntry<K, V> implements GridCacheEntryEx<K, V>
*/
protected GridCacheMapEntry(GridCacheContext<K, V> cctx, K key, int hash, V val,
GridCacheMapEntry<K, V> next, long ttl, int hdrId) {
- log = U.logger(cctx.kernalContext(), logRef, this);
+ log = U.logger(cctx.kernalContext(), logRef, GridCacheMapEntry.class);
if (cctx.portableEnabled())
key = (K)cctx.kernalContext().portable().detachPortable(key);
@@ -1112,6 +1112,8 @@ public abstract class GridCacheMapEntry<K, V> implements GridCacheEntryEx<K, V>
@Nullable UUID subjId,
String taskName
) throws IgniteCheckedException, GridCacheEntryRemovedException {
+ log.info("Inner set " + key + " " + val + " " + ttl);
+
V old;
boolean valid = valid(tx != null ? tx.topologyVersion() : topVer);
@@ -1630,7 +1632,7 @@ public abstract class GridCacheMapEntry<K, V> implements GridCacheEntryEx<K, V>
return toTtl(duration);
}
- private static long toTtl(Duration duration) {
+ public static long toTtl(Duration duration) {
if (duration == null)
return -1;
@@ -1685,7 +1687,7 @@ public abstract class GridCacheMapEntry<K, V> implements GridCacheEntryEx<K, V>
GridDrResolveResult<V> drRes = null;
- long newTtl = 0L;
+ long newTtl = -1L;
long newExpireTime = 0L;
long newDrExpireTime = -1L; // Explicit DR expire time which possibly will be sent to DHT node.
@@ -1869,12 +1871,14 @@ public abstract class GridCacheMapEntry<K, V> implements GridCacheEntryEx<K, V>
}
}
+ long ttl0 = newTtl;
+
if (drRes == null) {
// Calculate TTL and expire time for local update.
if (drTtl >= 0L) {
assert drExpireTime >= 0L;
- newTtl = drTtl;
+ ttl0 = drTtl;
newExpireTime = drExpireTime;
}
else {
@@ -1902,10 +1906,9 @@ public abstract class GridCacheMapEntry<K, V> implements GridCacheEntryEx<K, V>
else
newTtl = -1L;
- if (newTtl < 0)
- newTtl = ttlExtras();
+ ttl0 = newTtl < 0 ? ttlExtras() : newTtl;
- newExpireTime = toExpireTime(newTtl);
+ newExpireTime = toExpireTime(ttl0);
}
}
@@ -1937,7 +1940,7 @@ public abstract class GridCacheMapEntry<K, V> implements GridCacheEntryEx<K, V>
// in load methods without actually holding entry lock.
updateIndex(updated, valBytes, newExpireTime, newVer, old);
- update(updated, valBytes, newExpireTime, newTtl, newVer);
+ update(updated, valBytes, newExpireTime, ttl0, newVer);
drReplicate(drType, updated, valBytes, newVer);
@@ -2048,7 +2051,7 @@ public abstract class GridCacheMapEntry<K, V> implements GridCacheEntryEx<K, V>
res = hadVal;
// Do not propagate zeroed TTL and expire time.
- newTtl = 0L;
+ newTtl = -1L;
newDrExpireTime = -1L;
}
@@ -2500,7 +2503,7 @@ public abstract class GridCacheMapEntry<K, V> implements GridCacheEntryEx<K, V>
* @param ttl Time to live.
* @return Expiration time.
*/
- protected long toExpireTime(long ttl) {
+ public static long toExpireTime(long ttl) {
long expireTime = ttl == 0 ? 0 : U.currentTimeMillis() + ttl;
// Account for overflow.
@@ -2953,14 +2956,6 @@ public abstract class GridCacheMapEntry<K, V> implements GridCacheEntryEx<K, V>
}
/** {@inheritDoc} */
- /*
- @Override public synchronized GridDrEntry<K, V> drEntry() throws IgniteCheckedException {
- return new GridDrPlainEntry<>(key, isStartVersion() ? unswap(true, true) : rawGetOrUnmarshalUnlocked(false),
- ttlExtras(), expireTimeExtras(), ver.drVersion());
- }
- */
-
- /** {@inheritDoc} */
@Override public synchronized V rawPut(V val, long ttl) {
V old = this.val;
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/688a2e71/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheTxLocalAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheTxLocalAdapter.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheTxLocalAdapter.java
index abb9fef..c6888f1 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheTxLocalAdapter.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheTxLocalAdapter.java
@@ -25,6 +25,7 @@ import org.gridgain.grid.util.typedef.*;
import org.gridgain.grid.util.typedef.internal.*;
import org.jetbrains.annotations.*;
+import javax.cache.expiry.*;
import java.io.*;
import java.util.*;
import java.util.concurrent.atomic.*;
@@ -585,6 +586,8 @@ public abstract class GridCacheTxLocalAdapter<K, V> extends GridCacheTxAdapter<K
addGroupTxMapping(writeSet());
if (!empty) {
+ log.info("User commit");
+
// We are holding transaction-level locks for entries here, so we can get next write version.
writeVersion(cctx.versions().next(topologyVersion()));
@@ -649,6 +652,19 @@ public abstract class GridCacheTxLocalAdapter<K, V> extends GridCacheTxAdapter<K
V val = res.get2();
byte[] valBytes = res.get3();
+ if (op == CREATE || op == UPDATE && txEntry.drExpireTime() == -1L) {
+ ExpiryPolicy expiry = cacheCtx.expiry();
+
+ if (expiry != null) {
+ Duration duration = cached.hasValue() ?
+ expiry.getExpiryForUpdate() : expiry.getExpiryForCreation();
+
+ txEntry.ttl(GridCacheMapEntry.toTtl(duration));
+
+ log.info("Calculated expiry (userCommit), update=" + cached.hasValue() + ", ttl=" + txEntry.ttl() + ", detached=" + cached.detached());
+ }
+ }
+
// Preserve TTL if needed.
if (txEntry.ttl() < 0)
txEntry.ttl(cached.ttl());
@@ -1154,7 +1170,7 @@ public abstract class GridCacheTxLocalAdapter<K, V> extends GridCacheTxAdapter<K
missed.put(key, ver);
if (!readCommitted()) {
- txEntry = addEntry(READ, val, null, entry, -1, filter, true, -1L, -1L, null);
+ txEntry = addEntry(READ, val, null, entry, null, filter, true, -1L, -1L, null);
if (groupLock())
txEntry.groupLockEntry(true);
@@ -1179,7 +1195,7 @@ public abstract class GridCacheTxLocalAdapter<K, V> extends GridCacheTxAdapter<K
// Value for which failure occurred.
V val = e.<V>value();
- txEntry = addEntry(READ, val, null, entry, -1, CU.<K, V>empty(), false, -1L, -1L, null);
+ txEntry = addEntry(READ, val, null, entry, null, CU.<K, V>empty(), false, -1L, -1L, null);
// Mark as checked immediately for non-pessimistic.
if (val != null && !pessimistic())
@@ -1698,7 +1714,7 @@ public abstract class GridCacheTxLocalAdapter<K, V> extends GridCacheTxAdapter<K
*
* @param keys Keys to enlist.
* @param cached Cached entry.
- * @param ttl Time to live for entry. If negative, leave unchanged.
+ * @param expiry Expiry policy for entry. If {@code null}, leave unchanged.
* @param implicit Implicit flag.
* @param lookup Value lookup map ({@code null} for remove).
* @param transformMap Map with transform closures if this is a transform operation.
@@ -1715,7 +1731,7 @@ public abstract class GridCacheTxLocalAdapter<K, V> extends GridCacheTxAdapter<K
GridCacheContext<K, V> cacheCtx,
Collection<? extends K> keys,
@Nullable GridCacheEntryEx<K, V> cached,
- long ttl,
+ @Nullable ExpiryPolicy expiry,
boolean implicit,
@Nullable Map<? extends K, ? extends V> lookup,
@Nullable Map<? extends K, ? extends IgniteClosure<V, V>> transformMap,
@@ -1856,7 +1872,7 @@ public abstract class GridCacheTxLocalAdapter<K, V> extends GridCacheTxAdapter<K
if (!readCommitted() && old != null) {
// Enlist failed filters as reads for non-read-committed mode,
// so future ops will get the same values.
- txEntry = addEntry(READ, old, null, entry, -1, CU.<K, V>empty(), false, -1L, -1L,
+ txEntry = addEntry(READ, old, null, entry, null, CU.<K, V>empty(), false, -1L, -1L,
null);
txEntry.markValid();
@@ -1869,7 +1885,7 @@ public abstract class GridCacheTxLocalAdapter<K, V> extends GridCacheTxAdapter<K
}
txEntry = addEntry(lockOnly ? NOOP : rmv ? DELETE : transformClo != null ? TRANSFORM :
- old != null ? UPDATE : CREATE, val, transformClo, entry, ttl, filter, true, drTtl,
+ old != null ? UPDATE : CREATE, val, transformClo, entry, expiry, filter, true, drTtl,
drExpireTime, drVer);
if (!implicit() && readCommitted())
@@ -1956,7 +1972,7 @@ public abstract class GridCacheTxLocalAdapter<K, V> extends GridCacheTxAdapter<K
}
txEntry = addEntry(rmv ? DELETE : transformClo != null ? TRANSFORM :
- v != null ? UPDATE : CREATE, val, transformClo, entry, ttl, filter, true, drTtl,
+ v != null ? UPDATE : CREATE, val, transformClo, entry, expiry, filter, true, drTtl,
drExpireTime, drVer);
enlisted.add(key);
@@ -2219,11 +2235,13 @@ public abstract class GridCacheTxLocalAdapter<K, V> extends GridCacheTxAdapter<K
Collection<K> enlisted = new LinkedList<>();
+ GridCacheProjectionImpl<K, V> prj = cacheCtx.projectionPerCall();
+
final IgniteFuture<Set<K>> loadFut = enlistWrite(
cacheCtx,
keySet,
cached,
- ttl,
+ prj != null ? prj.expiry() : null,
implicit,
map0,
transformMap0,
@@ -2404,7 +2422,7 @@ public abstract class GridCacheTxLocalAdapter<K, V> extends GridCacheTxAdapter<K
cacheCtx,
keys0,
/** cached entry */null,
- /** ttl */-1,
+ /** expiry */null,
implicit,
/** lookup map */null,
/** transform map */null,
@@ -2555,7 +2573,7 @@ public abstract class GridCacheTxLocalAdapter<K, V> extends GridCacheTxAdapter<K
cacheCtx,
keys,
/** cached entry */null,
- /** ttl - leave unchanged */-1,
+ /** expiry - leave unchanged */null,
/** implicit */false,
/** lookup map */null,
/** transform map */null,
@@ -2652,7 +2670,7 @@ public abstract class GridCacheTxLocalAdapter<K, V> extends GridCacheTxAdapter<K
/**
* @param op Cache operation.
* @param val Value.
- * @param ttl Time to leave to set to tx entry. If {@code null}, leave unchanged.
+ * @param expiryPlc Expiry policy, if {@code null}, leave unchanged.
* @param transformClos Transform closure.
* @param entry Cache entry.
* @param filter Filter.
@@ -2662,10 +2680,16 @@ public abstract class GridCacheTxLocalAdapter<K, V> extends GridCacheTxAdapter<K
* @param drVer DR version.
* @return Transaction entry.
*/
- protected final GridCacheTxEntry<K, V> addEntry(GridCacheOperation op, @Nullable V val,
- @Nullable IgniteClosure<V, V> transformClos, GridCacheEntryEx<K, V> entry, long ttl,
- IgnitePredicate<GridCacheEntry<K, V>>[] filter, boolean filtersSet, long drTtl,
- long drExpireTime, @Nullable GridCacheVersion drVer) {
+ protected final GridCacheTxEntry<K, V> addEntry(GridCacheOperation op,
+ @Nullable V val,
+ @Nullable IgniteClosure<V, V> transformClos,
+ GridCacheEntryEx<K, V> entry,
+ @Nullable ExpiryPolicy expiryPlc, // TODO IGNITE-41
+ IgnitePredicate<GridCacheEntry<K, V>>[] filter,
+ boolean filtersSet,
+ long drTtl,
+ long drExpireTime,
+ @Nullable GridCacheVersion drVer) {
GridCacheTxKey<K> key = entry.txKey();
checkInternal(key);
@@ -2706,6 +2730,8 @@ public abstract class GridCacheTxLocalAdapter<K, V> extends GridCacheTxAdapter<K
old.cached(entry, old.keyBytes());
old.filters(filter);
+ long ttl = -1L;
+
// Update ttl if specified.
if (drTtl >= 0L) {
assert drExpireTime >= 0L;
@@ -2721,6 +2747,8 @@ public abstract class GridCacheTxLocalAdapter<K, V> extends GridCacheTxAdapter<K
log.debug("Updated transaction entry: " + txEntry);
}
else {
+ long ttl = -1L;
+
if (drTtl >= 0L)
ttl = drTtl;
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/688a2e71/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheUpdateAtomicResult.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheUpdateAtomicResult.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheUpdateAtomicResult.java
index 048df15..43ca819 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheUpdateAtomicResult.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheUpdateAtomicResult.java
@@ -98,7 +98,7 @@ public class GridCacheUpdateAtomicResult<K, V> {
}
/**
- * @return New TTL.
+ * @return {@code -1} if TTL did not change, otherwise new TTL.
*/
public long newTtl() {
return newTtl;
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/688a2e71/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/GridCacheExpiryPolicy.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/GridCacheExpiryPolicy.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/GridCacheExpiryPolicy.java
index f7fe27a..3a77884 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/GridCacheExpiryPolicy.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/GridCacheExpiryPolicy.java
@@ -9,6 +9,9 @@
package org.gridgain.grid.kernal.processors.cache.distributed;
+import org.gridgain.grid.util.typedef.internal.*;
+import org.jetbrains.annotations.*;
+
import javax.cache.expiry.*;
import java.io.*;
import java.util.concurrent.*;
@@ -27,11 +30,24 @@ public class GridCacheExpiryPolicy implements ExpiryPolicy, Externalizable {
private static final byte UPDATE_TTL_MASK = 0x02;
/** */
+ private static final byte ACCESS_TTL_MASK = 0x04;
+
+ /** */
private Duration forCreate;
/** */
private Duration forUpdate;
+ /** */
+ private Duration forAccess;
+
+ /**
+ * Required by {@link Externalizable}.
+ */
+ public GridCacheExpiryPolicy() {
+ // No-op.
+ }
+
/**
* @param plc Expiry policy.
*/
@@ -48,9 +64,7 @@ public class GridCacheExpiryPolicy implements ExpiryPolicy, Externalizable {
/** {@inheritDoc} */
@Override public Duration getExpiryForAccess() {
- assert false;
-
- return null;
+ return forAccess;
}
/** {@inheritDoc} */
@@ -58,6 +72,38 @@ public class GridCacheExpiryPolicy implements ExpiryPolicy, Externalizable {
return forUpdate;
}
+ /**
+ * @param out Output stream.
+ * @param duration Duration.
+ * @throws IOException
+ */
+ private void writeDuration(ObjectOutput out, @Nullable Duration duration) throws IOException {
+ if (duration != null) {
+ if (duration.isEternal())
+ out.writeLong(0);
+ else if (duration.getDurationAmount() == 0)
+ out.writeLong(1);
+ else
+ out.writeLong(duration.getTimeUnit().toMillis(duration.getDurationAmount()));
+ }
+ }
+
+ /**
+ * @param in Input stream.
+ * @return Duration.
+ * @throws IOException
+ */
+ private Duration readDuration(ObjectInput in) throws IOException {
+ long ttl = in.readLong();
+
+ assert ttl >= 0;
+
+ if (ttl == 0)
+ return Duration.ETERNAL;
+
+ return new Duration(TimeUnit.MILLISECONDS, ttl);
+ }
+
/** {@inheritDoc} */
@Override public void writeExternal(ObjectOutput out) throws IOException {
byte flags = 0;
@@ -72,7 +118,18 @@ public class GridCacheExpiryPolicy implements ExpiryPolicy, Externalizable {
if (update != null)
flags |= UPDATE_TTL_MASK;
+ Duration access = plc.getExpiryForAccess();
+
+ if (access != null)
+ flags |= ACCESS_TTL_MASK;
+
out.writeByte(flags);
+
+ writeDuration(out, create);
+
+ writeDuration(out, update);
+
+ writeDuration(out, access);
}
/** {@inheritDoc} */
@@ -80,9 +137,16 @@ public class GridCacheExpiryPolicy implements ExpiryPolicy, Externalizable {
byte flags = in.readByte();
if ((flags & CREATE_TTL_MASK) != 0)
- forCreate = new Duration(TimeUnit.MILLISECONDS, in.readLong());
+ forCreate = readDuration(in);
if ((flags & UPDATE_TTL_MASK) != 0)
- forUpdate = new Duration(TimeUnit.MILLISECONDS, in.readLong());
+ forUpdate = readDuration(in);
+
+ if ((flags & ACCESS_TTL_MASK) != 0)
+ forAccess = readDuration(in);
+ }
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(GridCacheExpiryPolicy.class, this);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/688a2e71/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java
index 3cd3e2d..3b34552 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java
@@ -446,6 +446,8 @@ public class GridDistributedTxRemoteAdapter<K, V> extends GridCacheTxAdapter<K,
@SuppressWarnings({"CatchGenericClass"})
private void commitIfLocked() throws IgniteCheckedException {
if (state() == COMMITTING) {
+ log.info("commitIfLocked");
+
for (GridCacheTxEntry<K, V> txEntry : writeMap.values()) {
assert txEntry != null : "Missing transaction entry for tx: " + this;
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/688a2e71/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
index fd2d98d..64c95c3 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
@@ -1355,6 +1355,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
req.subjectId(),
taskName);
+ assert updRes.newTtl() == -1L || (expiry != null || updRes.drExpireTime() >= 0);
+
if (dhtFut == null && !F.isEmpty(filteredReaders)) {
dhtFut = createDhtFuture(ver, req, res, completionCb, true);
@@ -1366,7 +1368,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
GridDrResolveResult<V> ctx = updRes.drResolveResult();
long ttl = updRes.newTtl();
- long drExpireTime = updRes.drExpireTime();
+ long expireTime = updRes.drExpireTime();
if (ctx == null)
newDrVer = null;
@@ -1380,19 +1382,24 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
if (req.forceTransformBackups() && op == TRANSFORM)
transformC = (IgniteClosure<V, V>)writeVal;
- if (!readersOnly)
+ if (!readersOnly) {
dhtFut.addWriteEntry(entry,
updRes.newValue(),
newValBytes,
transformC,
- drExpireTime >= 0L ? ttl : -1L,
- drExpireTime,
- newDrVer,
- drExpireTime < 0L ? req.expiry() : null);
+ updRes.newTtl(),
+ expireTime,
+ newDrVer);
+ }
if (!F.isEmpty(filteredReaders))
- dhtFut.addNearWriteEntries(filteredReaders, entry, updRes.newValue(), newValBytes,
- transformC, drExpireTime < 0L ? req.expiry() : null);
+ dhtFut.addNearWriteEntries(filteredReaders,
+ entry,
+ updRes.newValue(),
+ newValBytes,
+ transformC,
+ ttl,
+ expireTime);
}
else {
// TODO IGNITE-41 ttl could be changed.
@@ -1408,14 +1415,21 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
if (!ctx.affinity().belongs(node, entry.partition(), topVer)) {
GridDrResolveResult<V> ctx = updRes.drResolveResult();
- // TODO IGNITE-41 dr ttl for near cache.
+ long ttl = updRes.newTtl();
+ long expireTime = updRes.drExpireTime();
if (ctx != null && ctx.isMerge())
newValBytes = null;
// If put the same value as in request then do not need to send it back.
if (op == TRANSFORM || writeVal != updRes.newValue())
- res.addNearValue(i, updRes.newValue(), newValBytes);
+ res.addNearValue(i,
+ updRes.newValue(),
+ newValBytes,
+ ttl,
+ expireTime);
+ else
+ res.addNearTtl(i, ttl, expireTime);
if (updRes.newValue() != null || newValBytes != null) {
IgniteFuture<Boolean> f = entry.addReader(node.id(), req.messageId(), topVer);
@@ -1596,6 +1610,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
req.subjectId(),
taskName);
+ assert updRes.newTtl() == -1L || expiry != null;
+
if (intercept) {
if (op == UPDATE)
ctx.config().getInterceptor().onAfterPut(entry.key(), updRes.newValue());
@@ -1624,25 +1640,42 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
IgniteClosure<V, V> transformC = transformMap == null ? null : transformMap.get(entry.key());
if (!batchRes.readersOnly())
- dhtFut.addWriteEntry(entry, writeVal, valBytes, transformC, -1, -1, null, req.expiry());
+ dhtFut.addWriteEntry(entry,
+ writeVal,
+ valBytes,
+ transformC,
+ updRes.newTtl(),
+ -1,
+ null);
if (!F.isEmpty(filteredReaders))
- dhtFut.addNearWriteEntries(filteredReaders, entry, writeVal, valBytes, transformC,
- req.expiry());
+ dhtFut.addNearWriteEntries(filteredReaders,
+ entry,
+ writeVal,
+ valBytes,
+ transformC,
+ updRes.newTtl(),
+ -1);
}
if (hasNear) {
if (primary) {
if (!ctx.affinity().belongs(node, entry.partition(), topVer)) {
- if (req.operation() == TRANSFORM) {
- int idx = firstEntryIdx + i;
+ int idx = firstEntryIdx + i;
+ if (req.operation() == TRANSFORM) {
GridCacheValueBytes valBytesTuple = entry.valueBytes();
byte[] valBytes = valBytesTuple.getIfMarshaled();
- res.addNearValue(idx, writeVal, valBytes);
+ res.addNearValue(idx,
+ writeVal,
+ valBytes,
+ updRes.newTtl(),
+ -1);
}
+ else
+ res.addNearTtl(idx, updRes.newTtl(), -1);
if (writeVal != null || !entry.valueBytes().isNull()) {
IgniteFuture<Boolean> f = entry.addReader(node.id(), req.messageId(), topVer);
@@ -2037,8 +2070,6 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
String taskName = ctx.kernalContext().task().resolveTaskName(req.taskNameHash());
- ExpiryPolicy expiry = req.expiry() != null ? req.expiry() : ctx.expiry();
-
for (int i = 0; i < req.size(); i++) {
K key = req.key(i);
@@ -2058,6 +2089,12 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
UPDATE :
DELETE;
+ long ttl = req.drTtl(i);
+ long expireTime = req.drExpireTime(i);
+
+ if (ttl != -1L && expireTime == -1L)
+ expireTime = GridCacheMapEntry.toExpireTime(ttl);
+
GridCacheUpdateAtomicResult<K, V> updRes = entry.innerUpdate(
ver,
nodeId,
@@ -2067,15 +2104,15 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
valBytes,
/*write-through*/false,
/*retval*/false,
- expiry,
+ null,
/*event*/true,
/*metrics*/true,
/*primary*/false,
/*check version*/!req.forceTransformBackups(),
CU.<K, V>empty(),
replicate ? DR_BACKUP : DR_NONE,
- req.drTtl(i),
- req.drExpireTime(i),
+ ttl,
+ expireTime,
req.drVersion(i),
false,
intercept,
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/688a2e71/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
index 3c7da7b..25bc875 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
@@ -202,7 +202,6 @@ public class GridDhtAtomicUpdateFuture<K, V> extends GridFutureAdapter<Void>
* @param drTtl DR TTL (optional).
* @param drExpireTime DR expire time (optional).
* @param drVer DR version (optional).
- * @param expiryPlc Expiry policy.
*/
public void addWriteEntry(GridDhtCacheEntry<K, V> entry,
@Nullable V val,
@@ -210,8 +209,7 @@ public class GridDhtAtomicUpdateFuture<K, V> extends GridFutureAdapter<Void>
IgniteClosure<V, V> transformC,
long drTtl,
long drExpireTime,
- @Nullable GridCacheVersion drVer,
- @Nullable ExpiryPolicy expiryPlc) {
+ @Nullable GridCacheVersion drVer) {
long topVer = updateReq.topologyVersion();
Collection<ClusterNode> dhtNodes = cctx.dht().topology().nodes(entry.partition(), topVer);
@@ -237,7 +235,6 @@ public class GridDhtAtomicUpdateFuture<K, V> extends GridFutureAdapter<Void>
writeVer,
syncMode,
topVer,
- expiryPlc,
forceTransformBackups,
this.updateReq.subjectId(),
this.updateReq.taskNameHash());
@@ -245,8 +242,14 @@ public class GridDhtAtomicUpdateFuture<K, V> extends GridFutureAdapter<Void>
mappings.put(nodeId, updateReq);
}
- updateReq.addWriteValue(entry.key(), entry.keyBytes(), val, valBytes, transformC, drTtl,
- drExpireTime, drVer);
+ updateReq.addWriteValue(entry.key(),
+ entry.keyBytes(),
+ val,
+ valBytes,
+ transformC,
+ drTtl,
+ drExpireTime,
+ drVer);
}
}
}
@@ -256,14 +259,16 @@ public class GridDhtAtomicUpdateFuture<K, V> extends GridFutureAdapter<Void>
* @param entry Entry.
* @param val Value.
* @param valBytes Value bytes.
- * @param expiryPlc Expiry policy..
+ * @param TTL for near cache update (optional).
+ * @param expireTime Expire time for near cache update (optional).
*/
public void addNearWriteEntries(Iterable<UUID> readers,
GridDhtCacheEntry<K, V> entry,
@Nullable V val,
@Nullable byte[] valBytes,
IgniteClosure<V, V> transformC,
- @Nullable ExpiryPolicy expiryPlc) {
+ long ttl,
+ long expireTime) {
GridCacheWriteSynchronizationMode syncMode = updateReq.writeSynchronizationMode();
keys.add(entry.key());
@@ -287,7 +292,6 @@ public class GridDhtAtomicUpdateFuture<K, V> extends GridFutureAdapter<Void>
writeVer,
syncMode,
topVer,
- expiryPlc,
forceTransformBackups,
this.updateReq.subjectId(),
this.updateReq.taskNameHash());
@@ -300,7 +304,13 @@ public class GridDhtAtomicUpdateFuture<K, V> extends GridFutureAdapter<Void>
nearReadersEntries.put(entry.key(), entry);
- updateReq.addNearWriteValue(entry.key(), entry.keyBytes(), val, valBytes, transformC);
+ updateReq.addNearWriteValue(entry.key(),
+ entry.keyBytes(),
+ val,
+ valBytes,
+ transformC,
+ ttl,
+ expireTime);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/688a2e71/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java
index c3b0918..fda44c9 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java
@@ -77,14 +77,14 @@ public class GridDhtAtomicUpdateRequest<K, V> extends GridCacheMessage<K, V> imp
/** DR TTLs. */
private GridLongList drExpireTimes;
- /** Write synchronization mode. */
- private GridCacheWriteSynchronizationMode syncMode;
+ /** Near TTLs. */
+ private GridLongList nearTtls;
- /** Expiry policy. */
- private ExpiryPolicy expiryPlc;
+ /** Near expire times. */
+ private GridLongList nearExpireTimes;
- /** Expiry policy bytes. */
- private byte[] expiryPlcBytes;
+ /** Write synchronization mode. */
+ private GridCacheWriteSynchronizationMode syncMode;
/** Keys to update. */
@GridToStringInclude
@@ -154,7 +154,6 @@ public class GridDhtAtomicUpdateRequest<K, V> extends GridCacheMessage<K, V> imp
* @param writeVer Write version for cache values.
* @param syncMode Cache write synchronization mode.
* @param topVer Topology version.
- * @param expiryPlc Expiry policy.
* @param forceTransformBackups Force transform backups flag.
* @param subjId Subject ID.
*/
@@ -165,7 +164,6 @@ public class GridDhtAtomicUpdateRequest<K, V> extends GridCacheMessage<K, V> imp
GridCacheVersion writeVer,
GridCacheWriteSynchronizationMode syncMode,
long topVer,
- @Nullable ExpiryPolicy expiryPlc,
boolean forceTransformBackups,
UUID subjId,
int taskNameHash
@@ -175,7 +173,6 @@ public class GridDhtAtomicUpdateRequest<K, V> extends GridCacheMessage<K, V> imp
this.futVer = futVer;
this.writeVer = writeVer;
this.syncMode = syncMode;
- this.expiryPlc = expiryPlc;
this.topVer = topVer;
this.forceTransformBackups = forceTransformBackups;
this.subjId = subjId;
@@ -210,8 +207,14 @@ public class GridDhtAtomicUpdateRequest<K, V> extends GridCacheMessage<K, V> imp
* @param drExpireTime DR expire time (optional).
* @param drVer DR version (optional).
*/
- public void addWriteValue(K key, @Nullable byte[] keyBytes, @Nullable V val, @Nullable byte[] valBytes,
- IgniteClosure<V, V> transformC, long drTtl, long drExpireTime, @Nullable GridCacheVersion drVer) {
+ public void addWriteValue(K key,
+ @Nullable byte[] keyBytes,
+ @Nullable V val,
+ @Nullable byte[] valBytes,
+ IgniteClosure<V, V> transformC,
+ long drTtl,
+ long drExpireTime,
+ @Nullable GridCacheVersion drVer) {
keys.add(key);
this.keyBytes.add(keyBytes);
@@ -265,8 +268,13 @@ public class GridDhtAtomicUpdateRequest<K, V> extends GridCacheMessage<K, V> imp
* @param val Value, {@code null} if should be removed.
* @param valBytes Value bytes, {@code null} if should be removed.
*/
- public void addNearWriteValue(K key, @Nullable byte[] keyBytes, @Nullable V val, @Nullable byte[] valBytes,
- IgniteClosure<V, V> transformC) {
+ public void addNearWriteValue(K key,
+ @Nullable byte[] keyBytes,
+ @Nullable V val,
+ @Nullable byte[] valBytes,
+ IgniteClosure<V, V> transformC,
+ long ttl,
+ long expireTime) {
if (nearKeys == null) {
nearKeys = new ArrayList<>();
nearKeyBytes = new ArrayList<>();
@@ -293,6 +301,28 @@ public class GridDhtAtomicUpdateRequest<K, V> extends GridCacheMessage<K, V> imp
nearVals.add(val);
nearValBytes.add(valBytes != null ? GridCacheValueBytes.marshaled(valBytes) : null);
}
+
+ if (ttl >= 0) {
+ if (nearTtls == null) {
+ nearTtls = new GridLongList(nearKeys.size());
+
+ for (int i = 0; i < nearKeys.size() - 1; i++)
+ nearTtls.add(-1);
+ }
+
+ nearTtls.add(ttl);
+ }
+
+ if (expireTime >= 0) {
+ if (nearExpireTimes == null) {
+ nearExpireTimes = new GridLongList(nearKeys.size());
+
+ for (int i = 0; i < nearKeys.size() - 1; i++)
+ nearExpireTimes.add(-1);
+ }
+
+ nearExpireTimes.add(expireTime);
+ }
}
/** {@inheritDoc} */
@@ -364,13 +394,6 @@ public class GridDhtAtomicUpdateRequest<K, V> extends GridCacheMessage<K, V> imp
}
/**
- * @return Expiry policy.
- */
- @Nullable public ExpiryPolicy expiry() {
- return expiryPlc;
- }
-
- /**
* @return Keys.
*/
public Collection<K> keys() {
@@ -542,6 +565,20 @@ public class GridDhtAtomicUpdateRequest<K, V> extends GridCacheMessage<K, V> imp
}
/**
+ * @param idx Index.
+ * @return TTL for near cache update.
+ */
+ public long nearTtl(int idx) {
+ if (nearTtls != null) {
+ assert idx >= 0 && idx < nearTtls.size();
+
+ return nearTtls.get(idx);
+ }
+
+ return -1L;
+ }
+
+ /**
* @return DR TTLs.
*/
@Nullable public GridLongList drExpireTimes() {
@@ -562,6 +599,20 @@ public class GridDhtAtomicUpdateRequest<K, V> extends GridCacheMessage<K, V> imp
return -1L;
}
+ /**
+ * @param idx Index.
+ * @return Expire time for near cache update.
+ */
+ public long nearExpireTime(int idx) {
+ if (nearExpireTimes != null) {
+ assert idx >= 0 && idx < nearExpireTimes.size();
+
+ return nearExpireTimes.get(idx);
+ }
+
+ return -1L;
+ }
+
/** {@inheritDoc}
* @param ctx*/
@Override public void prepareMarshal(GridCacheSharedContext<K, V> ctx) throws IgniteCheckedException {
@@ -625,7 +676,6 @@ public class GridDhtAtomicUpdateRequest<K, V> extends GridCacheMessage<K, V> imp
_clone.drTtls = drTtls;
_clone.drExpireTimes = drExpireTimes;
_clone.syncMode = syncMode;
- _clone.expiryPlc = expiryPlc;
_clone.nearKeys = nearKeys;
_clone.nearKeyBytes = nearKeyBytes;
_clone.nearVals = nearVals;
@@ -635,6 +685,8 @@ public class GridDhtAtomicUpdateRequest<K, V> extends GridCacheMessage<K, V> imp
_clone.transformClosBytes = transformClosBytes;
_clone.nearTransformClos = nearTransformClos;
_clone.nearTransformClosBytes = nearTransformClosBytes;
+ _clone.nearExpireTimes = nearExpireTimes;
+ _clone.nearTtls = nearTtls;
_clone.subjId = subjId;
_clone.taskNameHash = taskNameHash;
}
@@ -746,12 +798,6 @@ public class GridDhtAtomicUpdateRequest<K, V> extends GridCacheMessage<K, V> imp
commState.idx++;
case 11:
- if (!commState.putByteArray(expiryPlcBytes))
- return false;
-
- commState.idx++;
-
- case 12:
if (valBytes != null) {
if (commState.it == null) {
if (!commState.putInt(valBytes.size()))
@@ -778,13 +824,13 @@ public class GridDhtAtomicUpdateRequest<K, V> extends GridCacheMessage<K, V> imp
commState.idx++;
- case 13:
+ case 12:
if (!commState.putCacheVersion(writeVer))
return false;
commState.idx++;
- case 14:
+ case 13:
if (nearKeyBytes != null) {
if (commState.it == null) {
if (!commState.putInt(nearKeyBytes.size()))
@@ -811,7 +857,7 @@ public class GridDhtAtomicUpdateRequest<K, V> extends GridCacheMessage<K, V> imp
commState.idx++;
- case 15:
+ case 14:
if (nearValBytes != null) {
if (commState.it == null) {
if (!commState.putInt(nearValBytes.size()))
@@ -838,13 +884,13 @@ public class GridDhtAtomicUpdateRequest<K, V> extends GridCacheMessage<K, V> imp
commState.idx++;
- case 16:
+ case 15:
if (!commState.putBoolean(forceTransformBackups))
return false;
commState.idx++;
- case 17:
+ case 16:
if (nearTransformClosBytes != null) {
if (commState.it == null) {
if (!commState.putInt(nearTransformClosBytes.size()))
@@ -871,7 +917,7 @@ public class GridDhtAtomicUpdateRequest<K, V> extends GridCacheMessage<K, V> imp
commState.idx++;
- case 18:
+ case 17:
if (transformClosBytes != null) {
if (commState.it == null) {
if (!commState.putInt(transformClosBytes.size()))
@@ -898,18 +944,29 @@ public class GridDhtAtomicUpdateRequest<K, V> extends GridCacheMessage<K, V> imp
commState.idx++;
- case 19:
+ case 18:
if (!commState.putUuid(subjId))
return false;
commState.idx++;
- case 20:
+ case 19:
if (!commState.putInt(taskNameHash))
return false;
commState.idx++;
+ case 20:
+ if (!commState.putLongList(nearExpireTimes))
+ return false;
+
+ commState.idx++;
+
+ case 21:
+ if (!commState.putLongList(nearTtls))
+ return false;
+
+ commState.idx++;
}
return true;
@@ -1041,16 +1098,6 @@ public class GridDhtAtomicUpdateRequest<K, V> extends GridCacheMessage<K, V> imp
commState.idx++;
case 11:
- byte[] expiryPlcBytes0 = commState.getByteArray();
-
- if (expiryPlcBytes0 == BYTE_ARR_NOT_READ)
- return false;
-
- expiryPlcBytes = expiryPlcBytes0;
-
- commState.idx++;
-
- case 12:
if (commState.readSize == -1) {
if (buf.remaining() < 4)
return false;
@@ -1079,7 +1126,7 @@ public class GridDhtAtomicUpdateRequest<K, V> extends GridCacheMessage<K, V> imp
commState.idx++;
- case 13:
+ case 12:
GridCacheVersion writeVer0 = commState.getCacheVersion();
if (writeVer0 == CACHE_VER_NOT_READ)
@@ -1089,7 +1136,7 @@ public class GridDhtAtomicUpdateRequest<K, V> extends GridCacheMessage<K, V> imp
commState.idx++;
- case 14:
+ case 13:
if (commState.readSize == -1) {
if (buf.remaining() < 4)
return false;
@@ -1118,7 +1165,7 @@ public class GridDhtAtomicUpdateRequest<K, V> extends GridCacheMessage<K, V> imp
commState.idx++;
- case 15:
+ case 14:
if (commState.readSize == -1) {
if (buf.remaining() < 4)
return false;
@@ -1147,7 +1194,7 @@ public class GridDhtAtomicUpdateRequest<K, V> extends GridCacheMessage<K, V> imp
commState.idx++;
- case 16:
+ case 15:
if (buf.remaining() < 1)
return false;
@@ -1155,7 +1202,7 @@ public class GridDhtAtomicUpdateRequest<K, V> extends GridCacheMessage<K, V> imp
commState.idx++;
- case 17:
+ case 16:
if (commState.readSize == -1) {
if (buf.remaining() < 4)
return false;
@@ -1184,7 +1231,7 @@ public class GridDhtAtomicUpdateRequest<K, V> extends GridCacheMessage<K, V> imp
commState.idx++;
- case 18:
+ case 17:
if (commState.readSize == -1) {
if (buf.remaining() < 4)
return false;
@@ -1213,7 +1260,7 @@ public class GridDhtAtomicUpdateRequest<K, V> extends GridCacheMessage<K, V> imp
commState.idx++;
- case 19:
+ case 18:
UUID subjId0 = commState.getUuid();
if (subjId0 == UUID_NOT_READ)
@@ -1223,7 +1270,7 @@ public class GridDhtAtomicUpdateRequest<K, V> extends GridCacheMessage<K, V> imp
commState.idx++;
- case 20:
+ case 19:
if (buf.remaining() < 4)
return false;
@@ -1231,6 +1278,25 @@ public class GridDhtAtomicUpdateRequest<K, V> extends GridCacheMessage<K, V> imp
commState.idx++;
+ case 20:
+ GridLongList nearExpireTimes0 = commState.getLongList();
+
+ if (nearExpireTimes0 == LONG_LIST_NOT_READ)
+ return false;
+
+ nearExpireTimes = nearExpireTimes0;
+
+ commState.idx++;
+
+ case 21:
+ GridLongList nearTtls0 = commState.getLongList();
+
+ if (nearTtls0 == LONG_LIST_NOT_READ)
+ return false;
+
+ nearTtls = nearTtls0;
+
+ commState.idx++;
}
return true;
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/688a2e71/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateResponse.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateResponse.java
index fd4d7dc..0be44aa 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateResponse.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateResponse.java
@@ -12,6 +12,7 @@ package org.gridgain.grid.kernal.processors.cache.distributed.dht.atomic;
import org.apache.ignite.*;
import org.gridgain.grid.kernal.*;
import org.gridgain.grid.kernal.processors.cache.*;
+import org.gridgain.grid.util.*;
import org.gridgain.grid.util.direct.*;
import org.gridgain.grid.util.tostring.*;
import org.gridgain.grid.util.typedef.internal.*;
@@ -94,6 +95,12 @@ public class GridNearAtomicUpdateResponse<K, V> extends GridCacheMessage<K, V> i
@GridDirectVersion(1)
private GridCacheVersion nearVer;
+ /** Near TTLs. */
+ private GridLongList nearTtls;
+
+ /** Near expire times. */
+ private GridLongList nearExpireTimes;
+
/**
* Empty constructor required by {@link Externalizable}.
*/
@@ -186,20 +193,87 @@ public class GridNearAtomicUpdateResponse<K, V> extends GridCacheMessage<K, V> i
* @param keyIdx Key index.
* @param val Value.
* @param valBytes Value bytes.
+ * @param ttl TTL for near cache update.
+ * @param expireTime Expire time for near cache update.
*/
- public void addNearValue(int keyIdx, @Nullable V val, @Nullable byte[] valBytes) {
+ public void addNearValue(int keyIdx,
+ @Nullable V val,
+ @Nullable byte[] valBytes,
+ long ttl,
+ long expireTime) {
if (nearValsIdxs == null) {
nearValsIdxs = new ArrayList<>();
nearValBytes = new ArrayList<>();
nearVals = new ArrayList<>();
}
+ addNearTtl(keyIdx, ttl, expireTime);
+
nearValsIdxs.add(keyIdx);
nearVals.add(val);
nearValBytes.add(valBytes != null ? GridCacheValueBytes.marshaled(valBytes) : null);
}
/**
+ * @param ttl TTL for near cache update.
+ * @param expireTime Expire time for near cache update.
+ */
+ @SuppressWarnings("ForLoopReplaceableByForEach")
+ public void addNearTtl(int keyIdx, long ttl, long expireTime) {
+ if (ttl >= 0) {
+ if (nearTtls == null) {
+ nearTtls = new GridLongList(16);
+
+ for (int i = 0; i < keyIdx; i++)
+ nearTtls.add(-1L);
+ }
+ }
+
+ if (nearTtls != null)
+ nearTtls.add(ttl);
+
+ if (expireTime >= 0) {
+ if (nearExpireTimes == null) {
+ nearExpireTimes = new GridLongList(16);
+
+ for (int i = 0; i < keyIdx; i++)
+ nearExpireTimes.add(-1);
+ }
+ }
+
+ if (nearExpireTimes != null)
+ nearExpireTimes.add(expireTime);
+ }
+
+ /**
+ * @param idx Index.
+ * @return Expire time for near cache update.
+ */
+ public long nearExpireTime(int idx) {
+ if (nearExpireTimes != null) {
+ assert idx >= 0 && idx < nearExpireTimes.size();
+
+ return nearExpireTimes.get(idx);
+ }
+
+ return -1L;
+ }
+
+ /**
+ * @param idx Index.
+ * @return TTL for near cache update.
+ */
+ public long nearTtl(int idx) {
+ if (nearTtls != null) {
+ assert idx >= 0 && idx < nearTtls.size();
+
+ return nearTtls.get(idx);
+ }
+
+ return -1L;
+ }
+
+ /**
* @param nearVer Version generated on primary node to be used for originating node's near cache update.
*/
public void nearVersion(GridCacheVersion nearVer) {
@@ -221,6 +295,8 @@ public class GridNearAtomicUpdateResponse<K, V> extends GridCacheMessage<K, V> i
nearSkipIdxs = new ArrayList<>();
nearSkipIdxs.add(keyIdx);
+
+ addNearTtl(keyIdx, -1L, -1L);
}
/**
@@ -366,6 +442,8 @@ public class GridNearAtomicUpdateResponse<K, V> extends GridCacheMessage<K, V> i
_clone.nearVals = nearVals;
_clone.nearValBytes = nearValBytes;
_clone.nearVer = nearVer;
+ _clone.nearTtls = nearTtls;
+ _clone.nearExpireTimes = nearExpireTimes;
}
/** {@inheritDoc} */
@@ -501,6 +579,17 @@ public class GridNearAtomicUpdateResponse<K, V> extends GridCacheMessage<K, V> i
commState.idx++;
+ case 12:
+ if (!commState.putLongList(nearExpireTimes))
+ return false;
+
+ commState.idx++;
+
+ case 13:
+ if (!commState.putLongList(nearTtls))
+ return false;
+
+ commState.idx++;
}
return true;
@@ -662,6 +751,26 @@ public class GridNearAtomicUpdateResponse<K, V> extends GridCacheMessage<K, V> i
commState.idx++;
+ case 12:
+ GridLongList nearExpireTimes0 = commState.getLongList();
+
+ if (nearExpireTimes0 == LONG_LIST_NOT_READ)
+ return false;
+
+ nearExpireTimes = nearExpireTimes0;
+
+ commState.idx++;
+
+ case 13:
+ GridLongList nearTtls0 = commState.getLongList();
+
+ if (nearTtls0 == LONG_LIST_NOT_READ)
+ return false;
+
+ nearTtls = nearTtls0;
+
+ commState.idx++;
+
}
return true;
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/688a2e71/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearAtomicCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearAtomicCache.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearAtomicCache.java
index 2aa32c3..1da6626 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearAtomicCache.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearAtomicCache.java
@@ -148,12 +148,19 @@ public class GridNearAtomicCache<K, V> extends GridNearCacheAdapter<K, V> {
}
}
+ long ttl = res.nearTtl(i);
+ long expireTime = res.nearExpireTime(i);
+
+ if (ttl != -1L && expireTime == -1L)
+ expireTime = GridCacheMapEntry.toExpireTime(ttl);
+
try {
processNearAtomicUpdateResponse(ver,
key,
val,
valBytes,
- req.expiry(),
+ ttl,
+ expireTime,
req.nodeId(),
req.subjectId(),
taskName);
@@ -169,7 +176,8 @@ public class GridNearAtomicCache<K, V> extends GridNearCacheAdapter<K, V> {
* @param key Key.
* @param val Value.
* @param valBytes Value bytes.
- * @param expiryPlc Expiry policy.
+ * @param ttl TTL.
+ * @param expireTime Expire time.
* @param nodeId Node ID.
* @throws IgniteCheckedException If failed.
*/
@@ -178,7 +186,8 @@ public class GridNearAtomicCache<K, V> extends GridNearCacheAdapter<K, V> {
K key,
@Nullable V val,
@Nullable byte[] valBytes,
- ExpiryPolicy expiryPlc,
+ long ttl,
+ long expireTime,
UUID nodeId,
UUID subjId,
String taskName
@@ -203,15 +212,15 @@ public class GridNearAtomicCache<K, V> extends GridNearCacheAdapter<K, V> {
valBytes,
/*write-through*/false,
/*retval*/false,
- expiryPlc != null ? expiryPlc : ctx.expiry(),
+ null,
/*event*/true,
/*metrics*/true,
/*primary*/false,
/*check version*/true,
CU.<K, V>empty(),
DR_NONE,
- -1,
- -1,
+ ttl,
+ expireTime,
null,
false,
false,
@@ -260,8 +269,6 @@ public class GridNearAtomicCache<K, V> extends GridNearCacheAdapter<K, V> {
String taskName = ctx.kernalContext().task().resolveTaskName(req.taskNameHash());
- ExpiryPolicy expiry = req.expiry() != null ? req.expiry() : ctx.expiry();
-
for (int i = 0; i < req.nearSize(); i++) {
K key = req.nearKey(i);
@@ -292,6 +299,12 @@ public class GridNearAtomicCache<K, V> extends GridNearCacheAdapter<K, V> {
UPDATE :
DELETE;
+ long ttl = req.nearTtl(i);
+ long expireTime = req.nearExpireTime(i);
+
+ if (ttl != -1L && expireTime == -1L)
+ expireTime = GridCacheMapEntry.toExpireTime(ttl);
+
GridCacheUpdateAtomicResult<K, V> updRes = entry.innerUpdate(
ver,
nodeId,
@@ -301,15 +314,15 @@ public class GridNearAtomicCache<K, V> extends GridNearCacheAdapter<K, V> {
valBytes,
/*write-through*/false,
/*retval*/false,
- expiry,
+ null,
/*event*/true,
/*metrics*/true,
/*primary*/false,
/*check version*/!req.forceTransformBackups(),
CU.<K, V>empty(),
DR_NONE,
- -1,
- -1,
+ ttl,
+ expireTime,
null,
false,
intercept,
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/688a2e71/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheExpiryPolicyTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheExpiryPolicyTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheExpiryPolicyTest.java
index 0d22f62..f96e5a7 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheExpiryPolicyTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheExpiryPolicyTest.java
@@ -36,6 +36,9 @@ public class IgniteCacheExpiryPolicyTest extends IgniteCacheTest {
/** */
private Factory<? extends ExpiryPolicy> factory;
+ /** */
+ private boolean nearCache;
+
/** {@inheritDoc} */
@Override protected void beforeTestsStarted() throws Exception {
// No-op.
@@ -48,7 +51,160 @@ public class IgniteCacheExpiryPolicyTest extends IgniteCacheTest {
/** {@inheritDoc} */
@Override protected int gridCount() {
- return 2;
+ return 3;
+ }
+
+ public void testPrimary() throws Exception {
+ factory = new FactoryBuilder.SingletonFactory<>(new TestPolicy(60_000L, 61_000L, null));
+
+ nearCache = false;
+
+ boolean inTx = false;
+
+ startGrids();
+
+ IgniteCache<Integer, Integer> cache = jcache(0);
+
+ GridCache<Integer, Object> cache0 = cache(0);
+
+ Integer key = primaryKey(cache0);
+
+ log.info("Create: " + key);
+
+ GridCacheTx tx = inTx ? grid(0).transactions().txStart() : null;
+
+ cache.put(key, 1);
+
+ if (tx != null)
+ tx.commit();
+
+ checkTtl(key, 60_000);
+
+ tx = inTx ? grid(0).transactions().txStart() : null;
+
+ log.info("Update: " + key);
+
+ cache.put(key, 2);
+
+ if (tx != null)
+ tx.commit();
+
+ checkTtl(key, 61_000);
+ }
+
+ public void testBackup() throws Exception {
+ factory = new FactoryBuilder.SingletonFactory<>(new TestPolicy(60_000L, 61_000L, null));
+
+ nearCache = false;
+
+ boolean inTx = false;
+
+ startGrids();
+
+ IgniteCache<Integer, Integer> cache = jcache(0);
+
+ GridCache<Integer, Object> cache0 = cache(0);
+
+ Integer key = backupKey(cache0);
+
+ log.info("Create: " + key);
+
+ GridCacheTx tx = inTx ? grid(0).transactions().txStart() : null;
+
+ cache.put(key, 1);
+
+ if (tx != null)
+ tx.commit();
+
+ checkTtl(key, 60_000);
+
+ tx = inTx ? grid(0).transactions().txStart() : null;
+
+ log.info("Update: " + key);
+
+ cache.put(key, 2);
+
+ if (tx != null)
+ tx.commit();
+
+ checkTtl(key, 61_000);
+ }
+
+ public void testNear() throws Exception {
+ factory = new FactoryBuilder.SingletonFactory<>(new TestPolicy(60_000L, 61_000L, null));
+
+ nearCache = false;
+
+ boolean inTx = true;
+
+ startGrids();
+
+ IgniteCache<Integer, Integer> cache = jcache(0);
+
+ GridCache<Integer, Object> cache0 = cache(0);
+
+ Integer key = nearKey(cache0);
+
+ log.info("Create: " + key);
+
+ GridCacheTx tx = inTx ? grid(0).transactions().txStart() : null;
+
+ cache.put(key, 1);
+
+ if (tx != null)
+ tx.commit();
+
+ checkTtl(key, 60_000);
+
+ tx = inTx ? grid(0).transactions().txStart() : null;
+
+ log.info("Update: " + key);
+
+ cache.put(key, 2);
+
+ if (tx != null)
+ tx.commit();
+
+ checkTtl(key, 61_000);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void test1() throws Exception {
+ factory = new FactoryBuilder.SingletonFactory<>(new TestPolicy(60_000L, null, null));
+
+ nearCache = false;
+
+ boolean inTx = true;
+
+ startGrids();
+
+ Collection<Integer> keys = keys();
+
+ IgniteCache<Integer, Integer> cache = jcache(0);
+
+ for (final Integer key : keys) {
+ log.info("Test key1: " + key);
+
+ GridCacheTx tx = inTx ? grid(0).transactions().txStart() : null;
+
+ cache.put(key, 1);
+
+ if (tx != null)
+ tx.commit();
+ }
+
+ for (final Integer key : keys) {
+ log.info("Test key2: " + key);
+
+ GridCacheTx tx = inTx ? grid(0).transactions().txStart() : null;
+
+ cache.put(key, 2);
+
+ if (tx != null)
+ tx.commit();
+ }
}
/**
@@ -93,6 +249,94 @@ public class IgniteCacheExpiryPolicyTest extends IgniteCacheTest {
}
/**
+ * @throws Exception If failed.
+ */
+ public void testNearPut() throws Exception {
+ factory = new FactoryBuilder.SingletonFactory<>(new TestPolicy(60_000L, null, null));
+
+ nearCache = true;
+
+ startGrids();
+
+ GridCache<Integer, Object> cache0 = cache(0);
+
+ Integer key = nearKey(cache0);
+
+ IgniteCache<Integer, Integer> jcache0 = jcache(0);
+
+ jcache0.put(key, 1);
+
+ checkTtl(key, 60_000);
+
+ IgniteCache<Integer, Integer> jcache1 = jcache(1);
+
+ // Update from another node with provided TTL.
+ jcache1.withExpiryPolicy(new TestPolicy(null, 1000L, null)).put(key, 2);
+
+ checkTtl(key, 1000);
+
+ waitExpired(key);
+
+ jcache1.remove(key);
+
+ jcache0.put(key, 1);
+
+ checkTtl(key, 60_000);
+
+ // Update from near node with provided TTL.
+ jcache0.withExpiryPolicy(new TestPolicy(null, 1100L, null)).put(key, 2);
+
+ checkTtl(key, 1100);
+
+ waitExpired(key);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testNearPutAll() throws Exception {
+ factory = new FactoryBuilder.SingletonFactory<>(new TestPolicy(60_000L, null, null));
+
+ nearCache = true;
+
+ startGrids();
+
+ Map<Integer, Integer> vals = new HashMap<>();
+
+ for (int i = 0; i < 1000; i++)
+ vals.put(i, i);
+
+ IgniteCache<Integer, Integer> jcache0 = jcache(0);
+
+ jcache0.putAll(vals);
+
+ for (Integer key : vals.keySet())
+ checkTtl(key, 60_000);
+
+ IgniteCache<Integer, Integer> jcache1 = jcache(1);
+
+ // Update from another node with provided TTL.
+ jcache1.withExpiryPolicy(new TestPolicy(null, 1000L, null)).putAll(vals);
+
+ for (Integer key : vals.keySet())
+ checkTtl(key, 1000);
+
+ waitExpired(vals.keySet());
+
+ jcache0.removeAll(vals.keySet());
+
+ jcache0.putAll(vals);
+
+ // Update from near node with provided TTL.
+ jcache1.withExpiryPolicy(new TestPolicy(null, 1101L, null)).putAll(vals);
+
+ for (Integer key : vals.keySet())
+ checkTtl(key, 1101L);
+
+ waitExpired(vals.keySet());
+ }
+
+ /**
* @return Test keys.
* @throws Exception If failed.
*/
@@ -106,7 +350,7 @@ public class IgniteCacheExpiryPolicyTest extends IgniteCacheTest {
if (gridCount() > 1) {
keys.add(backupKey(cache));
- if (cache.configuration().getDistributionMode() == NEAR_PARTITIONED)
+ if (cache.configuration().getCacheMode() != REPLICATED)
keys.add(nearKey(cache));
}
@@ -117,16 +361,27 @@ public class IgniteCacheExpiryPolicyTest extends IgniteCacheTest {
* @param key Key.
* @throws Exception If failed.
*/
- private void waitExpired(final Integer key) throws Exception {
+ private void waitExpired(Integer key) throws Exception {
+ waitExpired(Collections.singleton(key));
+ }
+
+ /**
+ * @param keys Keys.
+ * @throws Exception If failed.
+ */
+ private void waitExpired(final Collection<Integer> keys) throws Exception {
GridTestUtils.waitForCondition(new GridAbsPredicate() {
@Override public boolean apply() {
for (int i = 0; i < gridCount(); i++) {
- Object val = jcache(i).localPeek(key);
+ for (Integer key : keys) {
+ Object val = jcache(i).localPeek(key);
- log.info("Value [grid=" + i + ", val=" + val + ']');
+ if (val != null) {
+ // log.info("Value [grid=" + i + ", val=" + val + ']');
- if (val != null)
- return false;
+ return false;
+ }
+ }
}
return false;
@@ -138,17 +393,23 @@ public class IgniteCacheExpiryPolicyTest extends IgniteCacheTest {
for (int i = 0; i < gridCount(); i++) {
ClusterNode node = grid(i).cluster().localNode();
- Object val = jcache(i).localPeek(key);
+ for (Integer key : keys) {
+ Object val = jcache(i).localPeek(key);
- log.info("Value [grid=" + i +
- ", primary=" + cache.affinity().isPrimary(node, key) +
- ", backup=" + cache.affinity().isBackup(node, key) + ']');
+ if (val != null) {
+ log.info("Unexpected value [grid=" + i +
+ ", primary=" + cache.affinity().isPrimary(node, key) +
+ ", backup=" + cache.affinity().isBackup(node, key) + ']');
+ }
- assertNull("Unexpected non-null value for grid " + i, val);
+ assertNull("Unexpected non-null value for grid " + i, val);
+ }
}
- for (int i = 0; i < gridCount(); i++)
- assertNull("Unexpected non-null value for grid " + i, jcache(i).get(key));
+ for (int i = 0; i < gridCount(); i++) {
+ for (Integer key : keys)
+ assertNull("Unexpected non-null value for grid " + i, jcache(i).get(key));
+ }
}
/**
@@ -167,11 +428,8 @@ public class IgniteCacheExpiryPolicyTest extends IgniteCacheTest {
if (e == null && cache.context().isNear())
e = cache.context().near().dht().peekEx(key);
- if (e == null) {
- assertTrue(i > 0);
-
+ if (e == null)
assertTrue(!cache.affinity().isPrimaryOrBackup(grid.localNode(), key));
- }
else
assertEquals("Unexpected ttl for grid " + i, ttl, e.ttl());
}
@@ -184,10 +442,16 @@ public class IgniteCacheExpiryPolicyTest extends IgniteCacheTest {
GridCacheConfiguration cfg = super.cacheConfiguration(gridName);
cfg.setCacheMode(PARTITIONED);
- cfg.setAtomicityMode(ATOMIC);
+ cfg.setAtomicityMode(TRANSACTIONAL);
+
+ //cfg.setAtomicityMode(ATOMIC);
+
cfg.setBackups(1);
- cfg.setDistributionMode(PARTITIONED_ONLY);
+ if (nearCache && gridName.equals(getTestGridName(0)))
+ cfg.setDistributionMode(NEAR_PARTITIONED);
+ else
+ cfg.setDistributionMode(PARTITIONED_ONLY);
cfg.setExpiryPolicyFactory(factory);
@@ -213,11 +477,11 @@ public class IgniteCacheExpiryPolicyTest extends IgniteCacheTest {
* @param update TTL for update.
*/
TestPolicy(@Nullable Long create,
- @Nullable Long access,
- @Nullable Long update) {
+ @Nullable Long update,
+ @Nullable Long access) {
this.create = create;
- this.access = access;
this.update = update;
+ this.access = access;
}
/** {@inheritDoc} */