You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ag...@apache.org on 2016/02/15 16:03:51 UTC
ignite git commit: IGNITE-2610 - WIP
Repository: ignite
Updated Branches:
refs/heads/ignite-2610 [created] 1adc02a58
IGNITE-2610 - WIP
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/1adc02a5
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/1adc02a5
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/1adc02a5
Branch: refs/heads/ignite-2610
Commit: 1adc02a584c51008fa17f75271fd3e94912043e9
Parents: 46b6a76
Author: Alexey Goncharuk <al...@gmail.com>
Authored: Mon Feb 15 18:03:36 2016 +0300
Committer: Alexey Goncharuk <al...@gmail.com>
Committed: Mon Feb 15 18:03:36 2016 +0300
----------------------------------------------------------------------
.../processors/cache/GridCacheMapEntry.java | 47 +--
.../processors/cache/GridCacheTtlManager.java | 73 +++-
.../distributed/dht/GridDhtCacheEntry.java | 2 +-
.../atomic/GridDhtAtomicOffHeapCacheEntry.java | 9 +
.../distributed/near/GridNearCacheEntry.java | 4 +-
.../cache/query/GridCacheQueryManager.java | 367 ++++++++++++-------
.../IgniteCacheAtomicExpiryPolicyTest.java | 5 +
...gniteCacheAtomicOffheapExpiryPolicyTest.java | 35 ++
.../IgniteCacheExpiryPolicyAbstractTest.java | 245 +++++++++----
9 files changed, 549 insertions(+), 238 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/1adc02a5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
index c1eeb5e..bf75fc7 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
@@ -529,7 +529,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
val = cctx.kernalContext().cacheObjects().prepareForCache(val, cctx);
// Set unswapped value.
- update(val, e.expireTime(), e.ttl(), e.version());
+ update(val, e.expireTime(), e.ttl(), e.version(), false);
// Must update valPtr again since update() will reset it.
if (cctx.offheapTiered() && e.offheapPointer() > 0)
@@ -945,7 +945,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
boolean hadValPtr = hasOffHeapPointer();
// Don't change version for read-through.
- update(ret, expTime, ttl, nextVer);
+ update(ret, expTime, ttl, nextVer, true);
if (hadValPtr && cctx.offheapTiered())
cctx.swap().removeOffheap(key);
@@ -1040,7 +1040,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
deletedUnlocked(true);
}
- update(ret, expTime, ttl, nextVer);
+ update(ret, expTime, ttl, nextVer, true);
touch = true;
@@ -1194,7 +1194,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
if (updateCntr != null && updateCntr != 0)
updateCntr0 = updateCntr;
- update(val, expireTime, ttl, newVer);
+ update(val, expireTime, ttl, newVer, true);
drReplicate(drType, val, newVer);
@@ -1356,7 +1356,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
boolean hadValPtr = hasOffHeapPointer();
- update(null, 0, 0, newVer);
+ update(null, 0, 0, newVer, true);
if (cctx.offheapTiered() && hadValPtr) {
boolean rmv = cctx.swap().removeOffheap(key);
@@ -1572,7 +1572,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
else
clearIndex(null);
- update(old, expireTime, ttl, ver);
+ update(old, expireTime, ttl, ver, true);
}
// Apply metrics.
@@ -1719,7 +1719,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
assert ttl != CU.TTL_ZERO;
- update(updated, expireTime, ttl, ver);
+ update(updated, expireTime, ttl, ver, true);
if (evt) {
CacheObject evtOld = null;
@@ -1756,7 +1756,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
// in load methods without actually holding entry lock.
clearIndex(old);
- update(null, CU.TTL_ETERNAL, CU.EXPIRE_TIME_ETERNAL, ver);
+ update(null, CU.TTL_ETERNAL, CU.EXPIRE_TIME_ETERNAL, ver, true);
if (cctx.offheapTiered() && hasValPtr) {
boolean rmv = cctx.swap().removeOffheap(key);
@@ -2131,7 +2131,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
else
clearIndex(null);
- update(oldVal, initExpireTime, initTtl, ver);
+ update(oldVal, initExpireTime, initTtl, ver, true);
if (deletedUnlocked() && oldVal != null && !isInternal())
deletedUnlocked(false);
@@ -2345,7 +2345,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
// in load methods without actually holding entry lock.
updateIndex(updated, newExpireTime, newVer, oldVal);
- update(updated, newExpireTime, newTtl, newVer);
+ update(updated, newExpireTime, newTtl, newVer, true);
updateCntr0 = nextPartCounter(topVer);
@@ -2430,7 +2430,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
boolean hasValPtr = hasOffHeapPointer();
// Clear value on backup. Entry will be removed from cache when it got evicted from queue.
- update(null, CU.TTL_ETERNAL, CU.EXPIRE_TIME_ETERNAL, newVer);
+ update(null, CU.TTL_ETERNAL, CU.EXPIRE_TIME_ETERNAL, newVer, true);
assert newSysTtl == CU.TTL_NOT_CHANGED;
assert newSysExpireTime == CU.EXPIRE_TIME_CALCULATE;
@@ -2707,7 +2707,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
if (cctx.deferredDelete() && !isStartVersion() && !detached() && !isInternal()) {
if (!deletedUnlocked()) {
- update(null, 0L, 0L, ver);
+ update(null, 0L, 0L, ver, true);
deletedUnlocked(true);
@@ -2895,24 +2895,24 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
* @param ttl Time to live.
* @param ver Update version.
*/
- protected final void update(@Nullable CacheObject val, long expireTime, long ttl, GridCacheVersion ver) {
+ protected final void update(@Nullable CacheObject val, long expireTime, long ttl, GridCacheVersion ver, boolean addTracked) {
assert ver != null;
assert Thread.holdsLock(this);
assert ttl != CU.TTL_ZERO && ttl != CU.TTL_NOT_CHANGED && ttl >= 0 : ttl;
long oldExpireTime = expireTimeExtras();
- if (oldExpireTime != 0 && expireTime != oldExpireTime && cctx.config().isEagerTtl())
+ if (addTracked && oldExpireTime != 0 && (expireTime != oldExpireTime || isStartVersion()) && cctx.config().isEagerTtl())
cctx.ttl().removeTrackedEntry(this);
value(val);
ttlAndExpireTimeExtras(ttl, expireTime);
- if (expireTime != 0 && (expireTime != oldExpireTime || isStartVersion()) && cctx.config().isEagerTtl())
- cctx.ttl().addTrackedEntry(this);
-
this.ver = ver;
+
+ if (addTracked && expireTime != 0 && (expireTime != oldExpireTime || isStartVersion()) && cctx.config().isEagerTtl())
+ cctx.ttl().addTrackedEntry(this);
}
/**
@@ -2950,7 +2950,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
/**
* @param ttl Time to live.
*/
- private void updateTtl(long ttl) {
+ protected void updateTtl(long ttl) {
assert ttl >= 0 || ttl == CU.TTL_ZERO : ttl;
assert Thread.holdsLock(this);
@@ -3224,7 +3224,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
@Override public synchronized CacheObject rawPut(CacheObject val, long ttl) {
CacheObject old = this.val;
- update(val, CU.toExpireTime(ttl), ttl, nextVersion());
+ update(val, CU.toExpireTime(ttl), ttl, nextVersion(), true);
return old;
}
@@ -3252,7 +3252,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
updateIndex(val, expTime, ver, null);
// Version does not change for load ops.
- update(val, expTime, ttl, ver);
+ update(val, expTime, ttl, ver, true);
boolean skipQryNtf = false;
@@ -3338,7 +3338,8 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
update(val,
unswapped.expireTime(),
unswapped.ttl(),
- unswapped.version()
+ unswapped.version(),
+ true
);
return true;
@@ -3397,7 +3398,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
}
// Version does not change for load ops.
- update(val, expTime, ttl, newVer);
+ update(val, expTime, ttl, newVer, true);
return newVer;
}
@@ -3612,7 +3613,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
if (!obsolete()) {
if (cctx.deferredDelete() && !detached() && !isInternal()) {
if (!deletedUnlocked()) {
- update(null, 0L, 0L, ver0 = ver);
+ update(null, 0L, 0L, ver0 = ver, true);
deletedUnlocked(true);
http://git-wip-us.apache.org/repos/asf/ignite/blob/1adc02a5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheTtlManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheTtlManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheTtlManager.java
index bdb1f18..d119317 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheTtlManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheTtlManager.java
@@ -18,11 +18,13 @@
package org.apache.ignite.internal.processors.cache;
import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteException;
import org.apache.ignite.internal.IgniteInterruptedCheckedException;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
import org.apache.ignite.internal.util.GridConcurrentSkipListSet;
import org.apache.ignite.internal.util.typedef.X;
import org.apache.ignite.internal.util.typedef.internal.CU;
+import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.internal.util.worker.GridWorker;
import org.apache.ignite.thread.IgniteThread;
@@ -74,6 +76,9 @@ public class GridCacheTtlManager extends GridCacheManagerAdapter {
* @param entry Entry to add.
*/
public void addTrackedEntry(GridCacheMapEntry entry) {
+ assert Thread.holdsLock(entry);
+ assert cleanupWorker != null;
+
pendingEntries.add(new EntryWrapper(entry));
}
@@ -82,10 +87,18 @@ public class GridCacheTtlManager extends GridCacheManagerAdapter {
*/
public void removeTrackedEntry(GridCacheMapEntry entry) {
assert Thread.holdsLock(entry);
+ assert cleanupWorker != null;
pendingEntries.remove(new EntryWrapper(entry));
}
+ /**
+ * @return The size of pending entries.
+ */
+ public int pendingSize() {
+ return pendingEntries.sizex();
+ }
+
/** {@inheritDoc} */
@Override public void printMemoryStats() {
X.println(">>>");
@@ -150,6 +163,46 @@ public class GridCacheTtlManager extends GridCacheManagerAdapter {
}
/**
+ * @param cctx Cache context.
+ * @param key1 Left key to compare.
+ * @param key2 Right key to compare.
+ * @return Comparison result.
+ */
+ private static int compareKeys(GridCacheContext cctx, CacheObject key1, CacheObject key2) {
+ int key1Hash = key1.hashCode();
+ int key2Hash = key2.hashCode();
+
+ int res = Integer.compare(key1Hash, key2Hash);
+
+ if (res == 0) {
+ key1 = (CacheObject)cctx.unwrapTemporary(key1);
+ key2 = (CacheObject)cctx.unwrapTemporary(key2);
+
+ try {
+ byte[] key1ValBytes = key1.valueBytes(cctx.cacheObjectContext());
+ byte[] key2ValBytes = key2.valueBytes(cctx.cacheObjectContext());
+
+ // Must not do fair array comparison.
+ res = Integer.compare(key1ValBytes.length, key2ValBytes.length);
+
+ if (res == 0) {
+ for (int i = 0; i < key1ValBytes.length; i++) {
+ res = Byte.compare(key1ValBytes[i], key2ValBytes[i]);
+
+ if (res != 0)
+ break;
+ }
+ }
+ }
+ catch (IgniteCheckedException e) {
+ throw new IgniteException(e);
+ }
+ }
+
+ return res;
+ }
+
+ /**
* Entry wrapper.
*/
private static class EntryWrapper implements Comparable<EntryWrapper> {
@@ -174,8 +227,12 @@ public class GridCacheTtlManager extends GridCacheManagerAdapter {
@Override public int compareTo(EntryWrapper o) {
int res = Long.compare(expireTime, o.expireTime);
- if (res == 0)
- res = Long.compare(entry.startVersion(), o.entry.startVersion());
+ if (res == 0) {
+ // Must compare entries of the same cache.
+ assert entry.context() == o.entry.context();
+
+ res = compareKeys(entry.context(), entry.key(), o.entry.key());
+ }
return res;
}
@@ -190,7 +247,7 @@ public class GridCacheTtlManager extends GridCacheManagerAdapter {
EntryWrapper that = (EntryWrapper)o;
- return expireTime == that.expireTime && entry.startVersion() == that.entry.startVersion();
+ return expireTime == that.expireTime && compareKeys(entry.context(), entry.key(), that.entry.key()) == 0;
}
@@ -198,10 +255,15 @@ public class GridCacheTtlManager extends GridCacheManagerAdapter {
@Override public int hashCode() {
int res = (int)(expireTime ^ (expireTime >>> 32));
- res = 31 * res + (int)(entry.startVersion() ^ (entry.startVersion() >>> 32));
+ res = 31 * res + entry.key().hashCode();
return res;
}
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(EntryWrapper.class, this);
+ }
}
/**
@@ -230,10 +292,9 @@ public class GridCacheTtlManager extends GridCacheManagerAdapter {
@Override public boolean add(EntryWrapper e) {
boolean res = super.add(e);
- assert res;
+ assert res : "Failed to add entry wrapper:" + e;
size.increment();
-
return res;
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/1adc02a5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java
index 14e3d3e..fae8219 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java
@@ -580,7 +580,7 @@ public class GridDhtCacheEntry extends GridDistributedCacheEntry {
clearIndex(prev);
// Give to GC.
- update(null, 0L, 0L, ver);
+ update(null, 0L, 0L, ver, true);
if (swap) {
releaseSwap();
http://git-wip-us.apache.org/repos/asf/ignite/blob/1adc02a5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicOffHeapCacheEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicOffHeapCacheEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicOffHeapCacheEntry.java
index 85cfb80..9321449 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicOffHeapCacheEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicOffHeapCacheEntry.java
@@ -21,6 +21,8 @@ import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.CacheObject;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.KeyCacheObject;
+import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
+import org.jetbrains.annotations.Nullable;
/**
* DHT atomic cache entry for off-heap tiered or off-heap values modes.
@@ -52,6 +54,13 @@ public class GridDhtAtomicOffHeapCacheEntry extends GridDhtAtomicCacheEntry {
}
/** {@inheritDoc} */
+ @Override protected void updateTtl(long ttl) {
+ super.updateTtl(ttl);
+
+ valPtr = 0;
+ }
+
+ /** {@inheritDoc} */
@Override protected long offHeapPointer() {
return valPtr;
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/1adc02a5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheEntry.java
index 026fb4d..943a91a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheEntry.java
@@ -166,7 +166,7 @@ public class GridNearCacheEntry extends GridDistributedCacheEntry {
if (isNew() || !valid(topVer)) {
// Version does not change for load ops.
- update(e.value(), e.expireTime(), e.ttl(), e.isNew() ? ver : e.version());
+ update(e.value(), e.expireTime(), e.ttl(), e.isNew() ? ver : e.version(), true);
if (cctx.deferredDelete() && !isNew() && !isInternal()) {
boolean deleted = val == null;
@@ -402,7 +402,7 @@ public class GridNearCacheEntry extends GridDistributedCacheEntry {
if (this.dhtVer == null || this.dhtVer.compareTo(dhtVer) < 0) {
primaryNode(primaryNodeId, topVer);
- update(val, expireTime, ttl, ver);
+ update(val, expireTime, ttl, ver, true);
if (cctx.deferredDelete() && !isInternal()) {
boolean deleted = val == null;
http://git-wip-us.apache.org/repos/asf/ignite/blob/1adc02a5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
index 0d8f795..751279d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
@@ -809,161 +809,67 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
prj0 = prj0.keepBinary();
- final IgniteInternalCache<K, V> prj = prj0;
+ final IgniteInternalCache prj = prj0;
final IgniteBiPredicate<K, V> keyValFilter = qry.scanFilter();
try {
injectResources(keyValFilter);
- final GridDhtCacheAdapter dht = cctx.isLocal() ? null : (cctx.isNear() ? cctx.near().dht() : cctx.dht());
-
- final GridCacheAdapter cache = dht != null ? dht : cctx.cache();
-
final ExpiryPolicy plc = cctx.expiry();
final AffinityTopologyVersion topVer = cctx.affinity().affinityTopologyVersion();
final boolean backups = qry.includeBackups() || cctx.isReplicated();
- final GridCloseableIteratorAdapter<IgniteBiTuple<K, V>> heapIt =
- new GridCloseableIteratorAdapter<IgniteBiTuple<K, V>>() {
- private IgniteBiTuple<K, V> next;
-
- private IgniteCacheExpiryPolicy expiryPlc = cctx.cache().expiryPolicy(plc);
-
- private Iterator<K> iter;
+ Iterator<KeyCacheObject> keyIter;
- private GridDhtLocalPartition locPart;
+ GridDhtLocalPartition locPart = null;
- {
- Integer part = qry.partition();
+ Integer part = qry.partition();
- if (part == null || dht == null)
- iter = backups ? prj.keySetx().iterator() : prj.primaryKeySet().iterator();
- else if (part < 0 || part >= cctx.affinity().partitions())
- iter = F.emptyIterator();
- else {
- locPart = dht.topology().localPartition(part, topVer, false);
-
- // double check for owning state
- if (locPart == null || locPart.state() != OWNING || !locPart.reserve() ||
- locPart.state() != OWNING)
- throw new GridDhtUnreservedPartitionException(part,
- cctx.affinity().affinityTopologyVersion(), "Partition can not be reserved");
-
- iter = new Iterator<K>() {
- private Iterator<KeyCacheObject> iter0 = locPart.keySet().iterator();
+ if (part == null || cctx.isLocal())
+ keyIter = backups ? prj.keySetx().iterator() : prj.primaryKeySet().iterator();
+ else if (part < 0 || part >= cctx.affinity().partitions())
+ keyIter = F.emptyIterator();
+ else {
+ final GridDhtCacheAdapter dht = cctx.isNear() ? cctx.near().dht() : cctx.dht();
- @Override public boolean hasNext() {
- return iter0.hasNext();
- }
+ locPart = dht.topology().localPartition(part, topVer, false);
- @Override public K next() {
- KeyCacheObject key = iter0.next();
+ // double check for owning state
+ if (locPart == null || locPart.state() != OWNING || !locPart.reserve() || locPart.state() != OWNING)
+ throw new GridDhtUnreservedPartitionException(part, cctx.affinity().affinityTopologyVersion(),
+ "Partition can not be reserved");
- return (K)cctx.unwrapBinaryIfNeeded(key, true);
- }
+ final GridDhtLocalPartition locPart0 = locPart;
- @Override public void remove() {
- iter0.remove();
- }
- };
- }
+ keyIter = new Iterator<KeyCacheObject>() {
+ private Iterator<KeyCacheObject> iter0 = locPart0.keySet().iterator();
- advance();
+ @Override public boolean hasNext() {
+ return iter0.hasNext();
}
- @Override public boolean onHasNext() {
- return next != null;
+ @Override public KeyCacheObject next() {
+ return iter0.next();
}
- @Override public IgniteBiTuple<K, V> onNext() {
- if (next == null)
- throw new NoSuchElementException();
-
- IgniteBiTuple<K, V> next0 = next;
-
- advance();
-
- return next0;
+ @Override public void remove() {
+ iter0.remove();
}
+ };
+ }
- private void advance() {
- IgniteBiTuple<K, V> next0 = null;
-
- while (iter.hasNext()) {
- next0 = null;
-
- K key = iter.next();
-
- V val;
-
- try {
- GridCacheEntryEx entry = cache.peekEx(key);
-
- CacheObject cacheVal =
- entry != null ? entry.peek(true, false, false, topVer, expiryPlc) : null;
-
- val = (V)cctx.cacheObjectContext().unwrapBinaryIfNeeded(cacheVal, true);
- }
- catch (GridCacheEntryRemovedException e) {
- val = null;
- }
- catch (IgniteCheckedException e) {
- if (log.isDebugEnabled())
- log.debug("Failed to peek value: " + e);
-
- val = null;
- }
-
- if (dht != null && expiryPlc != null && expiryPlc.readyToFlush(100)) {
- dht.sendTtlUpdateRequest(expiryPlc);
-
- expiryPlc = cctx.cache().expiryPolicy(plc);
- }
-
- if (val != null) {
- next0 = F.t(key, val);
-
- if (checkPredicate(next0))
- break;
- else
- next0 = null;
- }
- }
-
- next = next0 != null ?
- new IgniteBiTuple<>(next0.getKey(), next0.getValue()) :
- null;
-
- if (next == null)
- sendTtlUpdate();
- }
+ final GridDhtLocalPartition locPart0 = locPart;
+ final GridCloseableIteratorAdapter<IgniteBiTuple<K, V>> heapIt =
+ new PeekValueExpiryAwareIterator<K, V>(keyIter, plc, topVer, keyValFilter, qry.keepBinary()) {
@Override protected void onClose() {
- sendTtlUpdate();
+ super.onClose();
- if (locPart != null)
- locPart.release();
- }
-
- private void sendTtlUpdate() {
- if (dht != null && expiryPlc != null) {
- dht.sendTtlUpdateRequest(expiryPlc);
-
- expiryPlc = null;
- }
- }
-
- private boolean checkPredicate(Map.Entry<K, V> e) {
- if (keyValFilter != null) {
- Map.Entry<K, V> e0 = (Map.Entry<K, V>)cctx.unwrapBinaryIfNeeded(e, qry.keepBinary());
-
- return keyValFilter.apply(e0.getKey(), e0.getValue());
- }
-
- return true;
+ if (locPart0 != null)
+ locPart0.release();
}
};
@@ -975,10 +881,10 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
iters.add(heapIt);
if (cctx.isOffHeapEnabled())
- iters.add(offheapIterator(qry, backups));
+ iters.add(offheapIterator(qry, topVer, backups, plc));
if (cctx.swap().swapEnabled())
- iters.add(swapIterator(qry, backups));
+ iters.add(swapIterator(qry, topVer, backups, plc));
it = new CompoundIterator<>(iters);
}
@@ -1032,8 +938,12 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
* @return Swap iterator.
* @throws IgniteCheckedException If failed.
*/
- private GridIterator<IgniteBiTuple<K, V>> swapIterator(GridCacheQueryAdapter<?> qry, boolean backups)
- throws IgniteCheckedException {
+ private GridIterator<IgniteBiTuple<K, V>> swapIterator(
+ GridCacheQueryAdapter<?> qry,
+ AffinityTopologyVersion topVer,
+ boolean backups,
+ ExpiryPolicy expPlc
+ ) throws IgniteCheckedException {
IgniteBiPredicate<K, V> filter = qry.scanFilter();
Integer part = qry.partition();
@@ -1041,6 +951,14 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
Iterator<Map.Entry<byte[], byte[]>> it = part == null ? cctx.swap().rawSwapIterator(true, backups) :
cctx.swap().rawSwapIterator(part);
+ if (expPlc != null)
+ return scanExpiryIterator(
+ it,
+ topVer,
+ filter,
+ expPlc,
+ qry.keepBinary());
+
return scanIterator(it, filter, qry.keepBinary());
}
@@ -1049,9 +967,23 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
* @param backups Include backups.
* @return Offheap iterator.
*/
- private GridIterator<IgniteBiTuple<K, V>> offheapIterator(GridCacheQueryAdapter<?> qry, boolean backups) {
+ private GridIterator<IgniteBiTuple<K, V>> offheapIterator(
+ GridCacheQueryAdapter<?> qry,
+ AffinityTopologyVersion topVer,
+ boolean backups,
+ ExpiryPolicy expPlc
+ ) {
IgniteBiPredicate<K, V> filter = qry.scanFilter();
+ if (expPlc != null) {
+ return scanExpiryIterator(
+ cctx.swap().rawOffHeapIterator(qry.partition(), true, backups),
+ topVer,
+ filter,
+ expPlc,
+ qry.keepBinary());
+ }
+
if (cctx.offheapTiered() && filter != null) {
OffheapIteratorClosure c = new OffheapIteratorClosure(filter, qry.keepBinary());
@@ -1125,6 +1057,38 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
};
}
+ private GridIteratorAdapter<IgniteBiTuple<K, V>> scanExpiryIterator(
+ final Iterator<Map.Entry<byte[], byte[]>> it,
+ AffinityTopologyVersion topVer,
+ @Nullable final IgniteBiPredicate<K, V> filter,
+ ExpiryPolicy expPlc,
+ final boolean keepBinary
+ ) {
+ Iterator <KeyCacheObject> keyIter = new Iterator<KeyCacheObject>() {
+ /** {@inheritDoc} */
+ @Override public boolean hasNext() {
+ return it.hasNext();
+ }
+
+ /** {@inheritDoc} */
+ @Override public KeyCacheObject next() {
+ try {
+ return cctx.toCacheKeyObject(it.next().getKey());
+ }
+ catch (IgniteCheckedException e) {
+ throw new IgniteException(e);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public void remove() {
+ it.remove();
+ }
+ };
+
+ return new PeekValueExpiryAwareIterator<>(keyIter, expPlc, topVer, filter, keepBinary);
+ }
+
/**
* @param o Object to inject resources to.
* @throws IgniteCheckedException If failure occurred while injecting resources.
@@ -3115,4 +3079,145 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
false,
keepBinary);
}
+
+ private class PeekValueExpiryAwareIterator<K, V> extends GridCloseableIteratorAdapter<IgniteBiTuple<K, V>> {
+ /** */
+ private final ExpiryPolicy plc;
+
+ /** */
+ private final GridCacheAdapter cache;
+
+ /** */
+ private final AffinityTopologyVersion topVer;
+
+ /** */
+ private final GridDhtCacheAdapter dht;
+
+ /** */
+ private final IgniteBiPredicate<K, V> keyValFilter;
+
+ /** */
+ private final boolean keepBinary;
+
+ /** */
+ private IgniteBiTuple<K, V> next;
+
+ /** */
+ private IgniteCacheExpiryPolicy expiryPlc;
+
+ /** */
+ private Iterator<KeyCacheObject> keyIt;
+
+ public PeekValueExpiryAwareIterator(
+ Iterator<KeyCacheObject> keyIt,
+ ExpiryPolicy plc,
+ AffinityTopologyVersion topVer,
+ IgniteBiPredicate<K, V> keyValFilter,
+ boolean keepBinary
+ ) {
+ this.keyIt = keyIt;
+ this.plc = plc;
+ this.topVer = topVer;
+ this.keyValFilter = keyValFilter;
+
+ dht = cctx.isLocal() ? null : (cctx.isNear() ? cctx.near().dht() : cctx.dht());
+ cache = dht != null ? dht : cctx.cache();
+
+ this.keepBinary = keepBinary;
+ expiryPlc = cctx.cache().expiryPolicy(plc);
+
+ advance();
+ }
+
+ @Override public boolean onHasNext() {
+ return next != null;
+ }
+
+ @Override public IgniteBiTuple<K, V> onNext() {
+ if (next == null)
+ throw new NoSuchElementException();
+
+ IgniteBiTuple<K, V> next0 = next;
+
+ advance();
+
+ return next0;
+ }
+
+ private void advance() {
+ IgniteBiTuple<K, V> next0 = null;
+
+ while (keyIt.hasNext()) {
+ next0 = null;
+
+ KeyCacheObject key = keyIt.next();
+
+ V val;
+
+ try {
+ GridCacheEntryEx entry = cache.peekEx(key);
+
+ CacheObject cacheVal =
+ entry != null ? entry.peek(true, false, false, topVer, expiryPlc) : null;
+
+ val = (V)cctx.cacheObjectContext().unwrapBinaryIfNeeded(cacheVal, true);
+ }
+ catch (GridCacheEntryRemovedException e) {
+ val = null;
+ }
+ catch (IgniteCheckedException e) {
+ if (log.isDebugEnabled())
+ log.debug("Failed to peek value: " + e);
+
+ val = null;
+ }
+
+ if (dht != null && expiryPlc != null && expiryPlc.readyToFlush(100)) {
+ dht.sendTtlUpdateRequest(expiryPlc);
+
+ expiryPlc = cctx.cache().expiryPolicy(plc);
+ }
+
+ if (val != null) {
+ next0 = F.t(
+ (K)cctx.unwrapBinaryIfNeeded(key, keepBinary),
+ (V)cctx.unwrapBinaryIfNeeded(val, keepBinary));
+
+ if (checkPredicate(next0))
+ break;
+ else
+ next0 = null;
+ }
+ }
+
+ next = next0 != null ?
+ new IgniteBiTuple<>(next0.getKey(), next0.getValue()) :
+ null;
+
+ if (next == null)
+ sendTtlUpdate();
+ }
+
+ @Override protected void onClose() {
+ sendTtlUpdate();
+ }
+
+ private void sendTtlUpdate() {
+ if (dht != null && expiryPlc != null) {
+ dht.sendTtlUpdateRequest(expiryPlc);
+
+ expiryPlc = null;
+ }
+ }
+
+ private boolean checkPredicate(Map.Entry<K, V> e) {
+ if (keyValFilter != null) {
+ Map.Entry<K, V> e0 = (Map.Entry<K, V>)cctx.unwrapBinaryIfNeeded(e, keepBinary);
+
+ return keyValFilter.apply(e0.getKey(), e0.getValue());
+ }
+
+ return true;
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/1adc02a5/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheAtomicExpiryPolicyTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheAtomicExpiryPolicyTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheAtomicExpiryPolicyTest.java
index c2ee607..7ff3f26 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheAtomicExpiryPolicyTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheAtomicExpiryPolicyTest.java
@@ -54,4 +54,9 @@ public class IgniteCacheAtomicExpiryPolicyTest extends IgniteCacheExpiryPolicyAb
@Override protected NearCacheConfiguration nearConfiguration() {
return null;
}
+
+ /** {@inheritDoc} */
+ @Override public void testAccess() throws Exception {
+ super.testAccess();
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/1adc02a5/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheAtomicOffheapExpiryPolicyTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheAtomicOffheapExpiryPolicyTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheAtomicOffheapExpiryPolicyTest.java
new file mode 100644
index 0000000..f218c14
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheAtomicOffheapExpiryPolicyTest.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.expiry;
+
+import org.apache.ignite.cache.CacheMemoryMode;
+
+/**
+ *
+ */
+public class IgniteCacheAtomicOffheapExpiryPolicyTest extends IgniteCacheAtomicExpiryPolicyTest {
+ /** {@inheritDoc} */
+ @Override protected CacheMemoryMode memoryMode() {
+ return CacheMemoryMode.OFFHEAP_TIERED;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void testAccess() throws Exception {
+ super.testAccess();
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/1adc02a5/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheExpiryPolicyAbstractTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheExpiryPolicyAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheExpiryPolicyAbstractTest.java
index 7d22206..f57d860 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheExpiryPolicyAbstractTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheExpiryPolicyAbstractTest.java
@@ -33,21 +33,30 @@ import javax.cache.expiry.CreatedExpiryPolicy;
import javax.cache.expiry.Duration;
import javax.cache.expiry.EternalExpiryPolicy;
import javax.cache.expiry.ExpiryPolicy;
+import javax.cache.expiry.ModifiedExpiryPolicy;
import javax.cache.processor.EntryProcessor;
import javax.cache.processor.MutableEntry;
import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.cache.CacheMemoryMode;
import org.apache.ignite.cache.CachePeekMode;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.NearCacheConfiguration;
+import org.apache.ignite.internal.IgniteInterruptedCheckedException;
import org.apache.ignite.internal.IgniteKernal;
import org.apache.ignite.internal.processors.cache.GridCacheAdapter;
import org.apache.ignite.internal.processors.cache.GridCacheEntryEx;
+import org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedException;
import org.apache.ignite.internal.processors.cache.IgniteCacheAbstractTest;
+import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtInvalidPartitionException;
import org.apache.ignite.internal.util.lang.GridAbsPredicate;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.PA;
+import org.apache.ignite.internal.util.typedef.PAX;
import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.testframework.GridTestUtils;
import org.apache.ignite.transactions.Transaction;
import org.apache.ignite.transactions.TransactionConcurrency;
@@ -95,7 +104,6 @@ public abstract class IgniteCacheExpiryPolicyAbstractTest extends IgniteCacheAbs
storeMap.clear();
}
-
/** {@inheritDoc} */
@Override protected CacheConfiguration cacheConfiguration(String gridName) throws Exception {
CacheConfiguration cfg = super.cacheConfiguration(gridName);
@@ -105,6 +113,11 @@ public abstract class IgniteCacheExpiryPolicyAbstractTest extends IgniteCacheAbs
cfg.setExpiryPolicyFactory(factory);
+ cfg.setMemoryMode(memoryMode());
+
+ if (memoryMode() == CacheMemoryMode.OFFHEAP_TIERED)
+ cfg.setOffHeapMaxMemory(0);
+
if (disableEagerTtl)
cfg.setEagerTtl(false);
@@ -112,7 +125,44 @@ public abstract class IgniteCacheExpiryPolicyAbstractTest extends IgniteCacheAbs
}
/**
- * @throws Exception If failed.
+ * @return Cache memory mode.
+ */
+ protected CacheMemoryMode memoryMode() {
+ return CacheMemoryMode.ONHEAP_TIERED;
+ }
+
+ /**
+ * @throws Exception if failed.
+ */
+ public void testCreateUpdate0() throws Exception {
+ startGrids(1);
+
+ long ttl = 60L;
+
+ final String key = "key1";
+
+ final IgniteCache<String, String> cache = jcache();
+
+ for (int i = 0; i < 1000; i++) {
+ final IgniteCache<String, String> cache0 = cache.withExpiryPolicy(new ModifiedExpiryPolicy(new Duration(TimeUnit.HOURS, ttl)));
+
+ cache0.put(key, key);
+
+ info("PUT DONE");
+ }
+
+ int pSize = grid(0).context().cache().internalCache(null).context().ttl().pendingSize();
+
+ assertTrue("Too many pending entries: " + pSize, pSize <= 1);
+
+ cache.remove(key);
+
+ pSize = grid(0).context().cache().internalCache(null).context().ttl().pendingSize();
+
+ assertEquals(0, pSize);
+ }
+
+ /** * @throws Exception If failed.
*/
public void testZeroOnCreate() throws Exception {
factory = CreatedExpiryPolicy.factoryOf(Duration.ZERO);
@@ -307,48 +357,50 @@ public abstract class IgniteCacheExpiryPolicyAbstractTest extends IgniteCacheAbs
startGrids();
- for (final Integer key : keys()) {
- log.info("Test access [key=" + key + ']');
-
- access(key);
- }
-
- accessGetAll();
-
- for (final Integer key : keys()) {
- log.info("Test filterAccessRemove access [key=" + key + ']');
-
- filterAccessRemove(key);
- }
-
- for (final Integer key : keys()) {
- log.info("Test filterAccessReplace access [key=" + key + ']');
-
- filterAccessReplace(key);
- }
-
- if (atomicityMode() == TRANSACTIONAL) {
- TransactionConcurrency[] txModes = {PESSIMISTIC};
-
- for (TransactionConcurrency txMode : txModes) {
- for (final Integer key : keys()) {
- log.info("Test txGet [key=" + key + ", txMode=" + txMode + ']');
-
- txGet(key, txMode);
- }
- }
-
- for (TransactionConcurrency txMode : txModes) {
- log.info("Test txGetAll [txMode=" + txMode + ']');
-
- txGetAll(txMode);
- }
- }
+// for (final Integer key : keys()) {
+// log.info("Test access [key=" + key + ']');
+//
+// access(key);
+// }
+//
+// accessGetAll();
+//
+// for (final Integer key : keys()) {
+// log.info("Test filterAccessRemove access [key=" + key + ']');
+//
+// filterAccessRemove(key);
+// }
+//
+// for (final Integer key : keys()) {
+// log.info("Test filterAccessReplace access [key=" + key + ']');
+//
+// filterAccessReplace(key);
+// }
+//
+// if (atomicityMode() == TRANSACTIONAL) {
+// TransactionConcurrency[] txModes = {PESSIMISTIC};
+//
+// for (TransactionConcurrency txMode : txModes) {
+// for (final Integer key : keys()) {
+// log.info("Test txGet [key=" + key + ", txMode=" + txMode + ']');
+//
+// txGet(key, txMode);
+// }
+// }
+//
+// for (TransactionConcurrency txMode : txModes) {
+// log.info("Test txGetAll [txMode=" + txMode + ']');
+//
+// txGetAll(txMode);
+// }
+// }
IgniteCache<Integer, Integer> cache = jcache(0);
Collection<Integer> putKeys = keys();
+ info("Put keys: " + putKeys);
+
for (final Integer key : putKeys)
cache.put(key, key);
@@ -359,10 +411,15 @@ public abstract class IgniteCacheExpiryPolicyAbstractTest extends IgniteCacheAbs
while (it.hasNext())
itKeys.add(it.next().getKey());
+ info("It keys: " + itKeys);
+
assertTrue(itKeys.size() >= putKeys.size());
- for (Integer key : itKeys)
+ for (Integer key : itKeys) {
+ info("Checking iterator key: " + key);
+
checkTtl(key, 62_000L, true);
+ }
}
/**
@@ -1016,7 +1073,7 @@ public abstract class IgniteCacheExpiryPolicyAbstractTest extends IgniteCacheAbs
ClusterNode node = grid(i).cluster().localNode();
for (Integer key : keys) {
- Object val = jcache(i).localPeek(key, CachePeekMode.ONHEAP);
+ Object val = jcache(i).localPeek(key, CachePeekMode.ONHEAP, CachePeekMode.OFFHEAP);
if (val != null) {
log.info("Unexpected value [grid=" + i +
@@ -1059,51 +1116,53 @@ public abstract class IgniteCacheExpiryPolicyAbstractTest extends IgniteCacheAbs
GridCacheAdapter<Object, Object> cache = grid.context().cache().internalCache();
- GridCacheEntryEx e = cache.peekEx(key);
+ if (cache.context().isNear())
+ cache = cache.context().near().dht();
- if (e == null && cache.context().isNear())
- e = cache.context().near().dht().peekEx(key);
+ while (true) {
+ try {
+ GridCacheEntryEx e = memoryMode() == CacheMemoryMode.ONHEAP_TIERED ?
+ cache.peekEx(key) : cache.entryEx(key);
- if (e != null && e.deleted()) {
- assertEquals(0, e.ttl());
+ if (e != null && e.deleted()) {
+ assertEquals(0, e.ttl());
- assertFalse(cache.affinity().isPrimaryOrBackup(grid.localNode(), key));
+ assertFalse(cache.affinity().isPrimaryOrBackup(grid.localNode(), key));
- continue;
- }
+ continue;
+ }
- if (e == null)
- assertTrue("Not found " + key, !cache.affinity().isPrimaryOrBackup(grid.localNode(), key));
- else {
- found = true;
+ if (e == null)
+ assertTrue("Not found " + key, !cache.affinity().isPrimaryOrBackup(grid.localNode(), key));
+ else {
+ e.unswap();
- if (wait) {
- final GridCacheEntryEx e0 = e;
+ found = true;
- GridTestUtils.waitForCondition(new PA() {
- @Override public boolean apply() {
- try {
- return e0.ttl() == ttl;
- }
- catch (Exception e) {
- fail("Unexpected error: " + e);
+ if (wait)
+ waitTtl(cache, key, ttl);
- return true;
- }
- }
- }, 3000);
- }
+ boolean primary = cache.affinity().isPrimary(grid.localNode(), key);
+ boolean backup = cache.affinity().isBackup(grid.localNode(), key);
- boolean primary = cache.affinity().isPrimary(grid.localNode(), key);
- boolean backup = cache.affinity().isBackup(grid.localNode(), key);
+ assertEquals("Unexpected ttl [grid=" + i + ", key=" + key + ", e=" + e +
+ ", primary=" + primary + ", backup=" + backup + ']', ttl, e.ttl());
- assertEquals("Unexpected ttl [grid=" + i + ", key=" + key + ", e=" + e +
- ", primary=" + primary + ", backup=" + backup + ']', ttl, e.ttl());
+ if (ttl > 0)
+ assertTrue(e.expireTime() > 0);
+ else
+ assertEquals(0, e.expireTime());
+ }
- if (ttl > 0)
- assertTrue(e.expireTime() > 0);
- else
- assertEquals(0, e.expireTime());
+ break;
+ }
+ catch (GridCacheEntryRemovedException ignore) {
+ // Retry.
+ }
+ catch (GridDhtInvalidPartitionException ignore) {
+ // No need to check.
+ break;
+ }
}
}
@@ -1111,6 +1170,40 @@ public abstract class IgniteCacheExpiryPolicyAbstractTest extends IgniteCacheAbs
}
/**
+ * @param cache Cache.
+ * @param key Key.
+ * @param ttl TTL to wait.
+ * @throws IgniteInterruptedCheckedException If wait has been interrupted.
+ */
+ private void waitTtl(final GridCacheAdapter<Object, Object> cache, final Object key, final long ttl)
+ throws IgniteInterruptedCheckedException {
+ GridTestUtils.waitForCondition(new PAX() {
+ @Override public boolean applyx() throws IgniteCheckedException {
+ GridCacheEntryEx entry = null;
+
+ while (true) {
+ try {
+ entry = memoryMode() == CacheMemoryMode.ONHEAP_TIERED ?
+ cache.peekEx(key) : cache.entryEx(key);
+
+ assert entry != null;
+
+ entry.unswap();
+
+ return entry.ttl() == ttl;
+ }
+ catch (GridCacheEntryRemovedException ignore) {
+ // Retry.
+ }
+ catch (GridDhtInvalidPartitionException ignore) {
+ return true;
+ }
+ }
+ }
+ }, 3000);
+ }
+
+ /**
*
*/
private static class GetEntryProcessor implements EntryProcessor<Integer, Integer, Integer> {
@@ -1153,6 +1246,8 @@ public abstract class IgniteCacheExpiryPolicyAbstractTest extends IgniteCacheAbs
/** {@inheritDoc} */
@Override public Duration getExpiryForAccess() {
+ U.dumpStack();
+
return access != null ? new Duration(TimeUnit.MILLISECONDS, access) : null;
}