You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2016/02/19 08:11:37 UTC
[3/6] ignite git commit: IGNITE-2610 - Fixed TTL issues with offheap
cache memory mode - Fixes #488.
IGNITE-2610 - Fixed TTL issues with offheap cache memory mode - Fixes #488.
Signed-off-by: Alexey Goncharuk <al...@gmail.com>
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/078689b2
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/078689b2
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/078689b2
Branch: refs/heads/ignite-1232
Commit: 078689b2629d2d9a7dc418f504b7e6384b0da9fc
Parents: 9b5dcfe
Author: Alexey Goncharuk <al...@gmail.com>
Authored: Thu Feb 18 18:19:53 2016 +0300
Committer: Alexey Goncharuk <al...@gmail.com>
Committed: Thu Feb 18 18:19:53 2016 +0300
----------------------------------------------------------------------
.../processors/cache/GridCacheEntryEx.java | 2 +-
.../cache/GridCacheEvictionManager.java | 6 +-
.../processors/cache/GridCacheMapEntry.java | 105 ++---
.../processors/cache/GridCacheTtlManager.java | 75 +++-
.../distributed/dht/GridDhtCacheAdapter.java | 29 +-
.../distributed/dht/GridDhtCacheEntry.java | 2 +-
.../distributed/near/GridNearCacheEntry.java | 4 +-
.../cache/query/GridCacheQueryManager.java | 424 +++++++++++++------
...CacheAtomicLocalOffheapExpiryPolicyTest.java | 30 ++
...gniteCacheAtomicOffheapExpiryPolicyTest.java | 30 ++
...rimaryWriteOrderOffheapExpiryPolicyTest.java | 31 ++
...teOrderWithStoreOffheapExpiryPolicyTest.java | 31 ++
...AtomicReplicatedOffheapExpiryPolicyTest.java | 30 ++
...eAtomicWithStoreOffheapExpiryPolicyTest.java | 30 ++
.../IgniteCacheExpiryPolicyAbstractTest.java | 169 ++++++--
.../IgniteCacheExpiryPolicyTestSuite.java | 13 +
...niteCacheTxLocalOffheapExpiryPolicyTest.java | 30 ++
.../IgniteCacheTxOffheapExpiryPolicyTest.java | 30 ++
...acheTxReplicatedOffheapExpiryPolicyTest.java | 30 ++
...CacheTxWithStoreOffheapExpiryPolicyTest.java | 30 ++
.../testsuites/IgniteBinaryCacheTestSuite.java | 2 +
.../cache/IgniteClientReconnectQueriesTest.java | 9 +-
22 files changed, 895 insertions(+), 247 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/078689b2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java
index 9bee307..71bb034 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java
@@ -917,7 +917,7 @@ public interface GridCacheEntryEx {
* @param ver Version.
* @param ttl Time to live.
*/
- public void updateTtl(@Nullable GridCacheVersion ver, long ttl);
+ public void updateTtl(@Nullable GridCacheVersion ver, long ttl) throws GridCacheEntryRemovedException;
/**
* Called when entry should be evicted from offheap.
http://git-wip-us.apache.org/repos/asf/ignite/blob/078689b2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEvictionManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEvictionManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEvictionManager.java
index 3a7bc8e..ae4a362 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEvictionManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEvictionManager.java
@@ -725,8 +725,10 @@ public class GridCacheEvictionManager extends GridCacheManagerAdapter {
if (log.isDebugEnabled())
log.debug("Entry was evicted [entry=" + entry + ", localNode=" + cctx.nodeId() + ']');
}
- else if (log.isDebugEnabled())
- log.debug("Entry was not evicted [entry=" + entry + ", localNode=" + cctx.nodeId() + ']');
+ else {
+ if (log.isDebugEnabled())
+ log.debug("Entry was not evicted [entry=" + entry + ", localNode=" + cctx.nodeId() + ']');
+ }
return evicted;
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/078689b2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
index c1eeb5e..e8652e7 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
@@ -96,6 +96,9 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
private static final byte IS_OFFHEAP_PTR_MASK = 0x04;
/** */
+ private static final byte IS_SWAPPING_REQUIRED = 0x08;
+
+ /** */
public static final GridCacheAtomicVersionComparator ATOMIC_VER_COMPARATOR = new GridCacheAtomicVersionComparator();
/**
@@ -529,7 +532,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
val = cctx.kernalContext().cacheObjects().prepareForCache(val, cctx);
// Set unswapped value.
- update(val, e.expireTime(), e.ttl(), e.version());
+ update(val, e.expireTime(), e.ttl(), e.version(), false);
// Must update valPtr again since update() will reset it.
if (cctx.offheapTiered() && e.offheapPointer() > 0)
@@ -550,7 +553,9 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
* @throws IgniteCheckedException If failed.
*/
private void swap() throws IgniteCheckedException {
- if (cctx.isSwapOrOffheapEnabled() && !deletedUnlocked() && hasValueUnlocked() && !detached()) {
+ boolean swapNeeded = (flags & IS_SWAPPING_REQUIRED) != 0;
+
+ if (cctx.isSwapOrOffheapEnabled() && !deletedUnlocked() && (hasValueUnlocked() || swapNeeded) && !detached()) {
assert Thread.holdsLock(this);
long expireTime = expireTimeExtras();
@@ -566,7 +571,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
return;
}
- if (cctx.offheapTiered() && hasOffHeapPointer()) {
+ if (cctx.offheapTiered() && hasOffHeapPointer() && !swapNeeded) {
if (log.isDebugEnabled())
log.debug("Value did not change, skip write swap entry: " + this);
@@ -600,6 +605,8 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
keyClsLdrId,
valClsLdrId);
+ flags &= ~IS_SWAPPING_REQUIRED;
+
if (log.isDebugEnabled())
log.debug("Wrote swap entry: " + this);
}
@@ -945,7 +952,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
boolean hadValPtr = hasOffHeapPointer();
// Don't change version for read-through.
- update(ret, expTime, ttl, nextVer);
+ update(ret, expTime, ttl, nextVer, true);
if (hadValPtr && cctx.offheapTiered())
cctx.swap().removeOffheap(key);
@@ -1040,7 +1047,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
deletedUnlocked(true);
}
- update(ret, expTime, ttl, nextVer);
+ update(ret, expTime, ttl, nextVer, true);
touch = true;
@@ -1194,7 +1201,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
if (updateCntr != null && updateCntr != 0)
updateCntr0 = updateCntr;
- update(val, expireTime, ttl, newVer);
+ update(val, expireTime, ttl, newVer, true);
drReplicate(drType, val, newVer);
@@ -1356,7 +1363,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
boolean hadValPtr = hasOffHeapPointer();
- update(null, 0, 0, newVer);
+ update(null, 0, 0, newVer, true);
if (cctx.offheapTiered() && hadValPtr) {
boolean rmv = cctx.swap().removeOffheap(key);
@@ -1572,7 +1579,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
else
clearIndex(null);
- update(old, expireTime, ttl, ver);
+ update(old, expireTime, ttl, ver, true);
}
// Apply metrics.
@@ -1719,7 +1726,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
assert ttl != CU.TTL_ZERO;
- update(updated, expireTime, ttl, ver);
+ update(updated, expireTime, ttl, ver, true);
if (evt) {
CacheObject evtOld = null;
@@ -1756,7 +1763,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
// in load methods without actually holding entry lock.
clearIndex(old);
- update(null, CU.TTL_ETERNAL, CU.EXPIRE_TIME_ETERNAL, ver);
+ update(null, CU.TTL_ETERNAL, CU.EXPIRE_TIME_ETERNAL, ver, true);
if (cctx.offheapTiered() && hasValPtr) {
boolean rmv = cctx.swap().removeOffheap(key);
@@ -2131,7 +2138,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
else
clearIndex(null);
- update(oldVal, initExpireTime, initTtl, ver);
+ update(oldVal, initExpireTime, initTtl, ver, true);
if (deletedUnlocked() && oldVal != null && !isInternal())
deletedUnlocked(false);
@@ -2345,7 +2352,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
// in load methods without actually holding entry lock.
updateIndex(updated, newExpireTime, newVer, oldVal);
- update(updated, newExpireTime, newTtl, newVer);
+ update(updated, newExpireTime, newTtl, newVer, true);
updateCntr0 = nextPartCounter(topVer);
@@ -2430,7 +2437,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
boolean hasValPtr = hasOffHeapPointer();
// Clear value on backup. Entry will be removed from cache when it got evicted from queue.
- update(null, CU.TTL_ETERNAL, CU.EXPIRE_TIME_ETERNAL, newVer);
+ update(null, CU.TTL_ETERNAL, CU.EXPIRE_TIME_ETERNAL, newVer, true);
assert newSysTtl == CU.TTL_NOT_CHANGED;
assert newSysExpireTime == CU.EXPIRE_TIME_CALCULATE;
@@ -2707,7 +2714,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
if (cctx.deferredDelete() && !isStartVersion() && !detached() && !isInternal()) {
if (!deletedUnlocked()) {
- update(null, 0L, 0L, ver);
+ update(null, 0L, 0L, ver, true);
deletedUnlocked(true);
@@ -2895,24 +2902,24 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
* @param ttl Time to live.
* @param ver Update version.
*/
- protected final void update(@Nullable CacheObject val, long expireTime, long ttl, GridCacheVersion ver) {
+ protected final void update(@Nullable CacheObject val, long expireTime, long ttl, GridCacheVersion ver, boolean addTracked) {
assert ver != null;
assert Thread.holdsLock(this);
assert ttl != CU.TTL_ZERO && ttl != CU.TTL_NOT_CHANGED && ttl >= 0 : ttl;
long oldExpireTime = expireTimeExtras();
- if (oldExpireTime != 0 && expireTime != oldExpireTime && cctx.config().isEagerTtl())
+ if (addTracked && oldExpireTime != 0 && (expireTime != oldExpireTime || isStartVersion()) && cctx.config().isEagerTtl())
cctx.ttl().removeTrackedEntry(this);
value(val);
ttlAndExpireTimeExtras(ttl, expireTime);
- if (expireTime != 0 && (expireTime != oldExpireTime || isStartVersion()) && cctx.config().isEagerTtl())
- cctx.ttl().addTrackedEntry(this);
-
this.ver = ver;
+
+ if (addTracked && expireTime != 0 && (expireTime != oldExpireTime || isStartVersion()) && cctx.config().isEagerTtl())
+ cctx.ttl().addTrackedEntry(this);
}
/**
@@ -2931,11 +2938,9 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
* Update TTL is it is changed.
*
* @param expiryPlc Expiry policy.
- * @throws IgniteCheckedException If failed.
* @throws GridCacheEntryRemovedException If failed.
*/
- private void updateTtl(IgniteCacheExpiryPolicy expiryPlc)
- throws IgniteCheckedException, GridCacheEntryRemovedException {
+ private void updateTtl(IgniteCacheExpiryPolicy expiryPlc) throws GridCacheEntryRemovedException {
long ttl = expiryPlc.forAccess();
if (ttl != CU.TTL_NOT_CHANGED) {
@@ -2950,7 +2955,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
/**
* @param ttl Time to live.
*/
- private void updateTtl(long ttl) {
+ protected void updateTtl(long ttl) {
assert ttl >= 0 || ttl == CU.TTL_ZERO : ttl;
assert Thread.holdsLock(this);
@@ -2970,6 +2975,9 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
ttlAndExpireTimeExtras(ttl, expireTime);
+ if (cctx.isSwapOrOffheapEnabled())
+ flags |= IS_SWAPPING_REQUIRED;
+
if (expireTime != 0 && expireTime != oldExpireTime && cctx.config().isEagerTtl())
cctx.ttl().addTrackedEntry(this);
}
@@ -3045,27 +3053,20 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
throws GridCacheEntryRemovedException, IgniteCheckedException {
assert heap || offheap || swap;
- try {
- if (heap) {
- GridTuple<CacheObject> val = peekGlobal(false, topVer, null, expiryPlc);
+ if (heap) {
+ GridTuple<CacheObject> val = peekGlobal(topVer, null, expiryPlc);
- if (val != null)
- return val.get();
- }
-
- if (offheap || swap) {
- GridCacheSwapEntry e = cctx.swap().read(this, false, offheap, swap, true);
+ if (val != null)
+ return val.get();
+ }
- return e != null ? e.value() : null;
- }
+ if (offheap || swap) {
+ GridCacheSwapEntry e = cctx.swap().read(this, false, offheap, swap, true);
- return null;
+ return e != null ? e.value() : null;
}
- catch (GridCacheFilterFailedException ignored) {
- assert false;
- return null;
- }
+ return null;
}
/** {@inheritDoc} */
@@ -3083,21 +3084,19 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
}
/**
- * @param failFast Fail fast flag.
* @param topVer Topology version.
* @param filter Filter.
* @param expiryPlc Optional expiry policy.
* @return Peeked value.
- * @throws GridCacheFilterFailedException If filter failed.
* @throws GridCacheEntryRemovedException If entry got removed.
* @throws IgniteCheckedException If unexpected cache failure occurred.
*/
@SuppressWarnings({"RedundantTypeArguments"})
- @Nullable private GridTuple<CacheObject> peekGlobal(boolean failFast,
+ @Nullable private GridTuple<CacheObject> peekGlobal(
AffinityTopologyVersion topVer,
CacheEntryPredicate[] filter,
@Nullable IgniteCacheExpiryPolicy expiryPlc)
- throws GridCacheEntryRemovedException, GridCacheFilterFailedException, IgniteCheckedException {
+ throws GridCacheEntryRemovedException, IgniteCheckedException {
if (!valid(topVer))
return null;
@@ -3124,8 +3123,11 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
updateTtl(expiryPlc);
}
+ if (val == null)
+ return null;
+
if (!cctx.isAll(this, filter))
- return F.t(CU.<CacheObject>failed(failFast));
+ return null;
if (F.isEmptyOrNulls(filter) || ver.equals(version()))
return F.t(val);
@@ -3224,7 +3226,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
@Override public synchronized CacheObject rawPut(CacheObject val, long ttl) {
CacheObject old = this.val;
- update(val, CU.toExpireTime(ttl), ttl, nextVersion());
+ update(val, CU.toExpireTime(ttl), ttl, nextVersion(), true);
return old;
}
@@ -3252,7 +3254,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
updateIndex(val, expTime, ver, null);
// Version does not change for load ops.
- update(val, expTime, ttl, ver);
+ update(val, expTime, ttl, ver, true);
boolean skipQryNtf = false;
@@ -3338,7 +3340,8 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
update(val,
unswapped.expireTime(),
unswapped.ttl(),
- unswapped.version()
+ unswapped.version(),
+ true
);
return true;
@@ -3397,7 +3400,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
}
// Version does not change for load ops.
- update(val, expTime, ttl, newVer);
+ update(val, expTime, ttl, newVer, true);
return newVer;
}
@@ -3612,7 +3615,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
if (!obsolete()) {
if (cctx.deferredDelete() && !detached() && !isInternal()) {
if (!deletedUnlocked()) {
- update(null, 0L, 0L, ver0 = ver);
+ update(null, 0L, 0L, ver0 = ver, true);
deletedUnlocked(true);
@@ -3726,8 +3729,10 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
}
/** {@inheritDoc} */
- @Override public void updateTtl(@Nullable GridCacheVersion ver, long ttl) {
+ @Override public void updateTtl(@Nullable GridCacheVersion ver, long ttl) throws GridCacheEntryRemovedException {
synchronized (this) {
+ checkObsolete();
+
updateTtl(ttl);
/*
http://git-wip-us.apache.org/repos/asf/ignite/blob/078689b2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheTtlManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheTtlManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheTtlManager.java
index bdb1f18..657cf8d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheTtlManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheTtlManager.java
@@ -18,11 +18,14 @@
package org.apache.ignite.internal.processors.cache;
import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.internal.IgniteInterruptedCheckedException;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
import org.apache.ignite.internal.util.GridConcurrentSkipListSet;
import org.apache.ignite.internal.util.typedef.X;
import org.apache.ignite.internal.util.typedef.internal.CU;
+import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.internal.util.worker.GridWorker;
import org.apache.ignite.thread.IgniteThread;
@@ -31,7 +34,7 @@ import org.jsr166.LongAdder8;
/**
* Eagerly removes expired entries from cache when
- * {@link org.apache.ignite.configuration.CacheConfiguration#isEagerTtl()} flag is set.
+ * {@link CacheConfiguration#isEagerTtl()} flag is set.
*/
@SuppressWarnings("NakedNotify")
public class GridCacheTtlManager extends GridCacheManagerAdapter {
@@ -74,6 +77,9 @@ public class GridCacheTtlManager extends GridCacheManagerAdapter {
* @param entry Entry to add.
*/
public void addTrackedEntry(GridCacheMapEntry entry) {
+ assert Thread.holdsLock(entry);
+ assert cleanupWorker != null;
+
pendingEntries.add(new EntryWrapper(entry));
}
@@ -82,10 +88,18 @@ public class GridCacheTtlManager extends GridCacheManagerAdapter {
*/
public void removeTrackedEntry(GridCacheMapEntry entry) {
assert Thread.holdsLock(entry);
+ assert cleanupWorker != null;
pendingEntries.remove(new EntryWrapper(entry));
}
+ /**
+ * @return The size of pending entries.
+ */
+ public int pendingSize() {
+ return pendingEntries.sizex();
+ }
+
/** {@inheritDoc} */
@Override public void printMemoryStats() {
X.println(">>>");
@@ -150,6 +164,50 @@ public class GridCacheTtlManager extends GridCacheManagerAdapter {
}
/**
+ * @param cctx1 First cache context.
+ * @param key1 Left key to compare.
+ * @param cctx2 Second cache context.
+ * @param key2 Right key to compare.
+ * @return Comparison result.
+ */
+ private static int compareKeys(GridCacheContext cctx1, CacheObject key1, GridCacheContext cctx2, CacheObject key2) {
+ int key1Hash = key1.hashCode();
+ int key2Hash = key2.hashCode();
+
+ int res = Integer.compare(key1Hash, key2Hash);
+
+ if (res == 0) {
+ key1 = (CacheObject)cctx1.unwrapTemporary(key1);
+ key2 = (CacheObject)cctx2.unwrapTemporary(key2);
+
+ try {
+ byte[] key1ValBytes = key1.valueBytes(cctx1.cacheObjectContext());
+ byte[] key2ValBytes = key2.valueBytes(cctx2.cacheObjectContext());
+
+ // Must not do fair array comparison.
+ res = Integer.compare(key1ValBytes.length, key2ValBytes.length);
+
+ if (res == 0) {
+ for (int i = 0; i < key1ValBytes.length; i++) {
+ res = Byte.compare(key1ValBytes[i], key2ValBytes[i]);
+
+ if (res != 0)
+ break;
+ }
+ }
+
+ if (res == 0)
+ res = Boolean.compare(cctx1.isNear(), cctx2.isNear());
+ }
+ catch (IgniteCheckedException e) {
+ throw new IgniteException(e);
+ }
+ }
+
+ return res;
+ }
+
+ /**
* Entry wrapper.
*/
private static class EntryWrapper implements Comparable<EntryWrapper> {
@@ -175,7 +233,7 @@ public class GridCacheTtlManager extends GridCacheManagerAdapter {
int res = Long.compare(expireTime, o.expireTime);
if (res == 0)
- res = Long.compare(entry.startVersion(), o.entry.startVersion());
+ res = compareKeys(entry.context(), entry.key(), o.entry.context(), o.entry.key());
return res;
}
@@ -190,18 +248,23 @@ public class GridCacheTtlManager extends GridCacheManagerAdapter {
EntryWrapper that = (EntryWrapper)o;
- return expireTime == that.expireTime && entry.startVersion() == that.entry.startVersion();
-
+ return expireTime == that.expireTime &&
+ compareKeys(entry.context(), entry.key(), that.entry.context(), that.entry.key()) == 0;
}
/** {@inheritDoc} */
@Override public int hashCode() {
int res = (int)(expireTime ^ (expireTime >>> 32));
- res = 31 * res + (int)(entry.startVersion() ^ (entry.startVersion() >>> 32));
+ res = 31 * res + entry.key().hashCode();
return res;
}
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(EntryWrapper.class, this);
+ }
}
/**
@@ -230,7 +293,7 @@ public class GridCacheTtlManager extends GridCacheManagerAdapter {
@Override public boolean add(EntryWrapper e) {
boolean res = super.add(e);
- assert res;
+ assert res : "Failed to add entry wrapper:" + e;
size.increment();
http://git-wip-us.apache.org/repos/asf/ignite/blob/078689b2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
index 8e456e3..ee9525a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
@@ -1030,26 +1030,27 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap
GridCacheEntryEx entry = null;
try {
- if (swap) {
- while (true) {
- try {
+ while (true) {
+ try {
+ if (swap) {
entry = cache.entryEx(keys.get(i));
entry.unswap(false);
-
- break;
- }
- catch (GridCacheEntryRemovedException e) {
- if (log.isDebugEnabled())
- log.debug("Got removed entry: " + entry);
}
+ else
+ entry = cache.peekEx(keys.get(i));
+
+ if (entry != null)
+ entry.updateTtl(vers.get(i), ttl);
+
+ break;
+ }
+ catch (GridCacheEntryRemovedException ignore) {
+ // Retry
+ if (log.isDebugEnabled())
+ log.debug("Got removed entry: " + entry);
}
}
- else
- entry = cache.peekEx(keys.get(i));
-
- if (entry != null)
- entry.updateTtl(vers.get(i), ttl);
}
finally {
if (entry != null)
http://git-wip-us.apache.org/repos/asf/ignite/blob/078689b2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java
index 14e3d3e..fae8219 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java
@@ -580,7 +580,7 @@ public class GridDhtCacheEntry extends GridDistributedCacheEntry {
clearIndex(prev);
// Give to GC.
- update(null, 0L, 0L, ver);
+ update(null, 0L, 0L, ver, true);
if (swap) {
releaseSwap();
http://git-wip-us.apache.org/repos/asf/ignite/blob/078689b2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheEntry.java
index 026fb4d..943a91a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheEntry.java
@@ -166,7 +166,7 @@ public class GridNearCacheEntry extends GridDistributedCacheEntry {
if (isNew() || !valid(topVer)) {
// Version does not change for load ops.
- update(e.value(), e.expireTime(), e.ttl(), e.isNew() ? ver : e.version());
+ update(e.value(), e.expireTime(), e.ttl(), e.isNew() ? ver : e.version(), true);
if (cctx.deferredDelete() && !isNew() && !isInternal()) {
boolean deleted = val == null;
@@ -402,7 +402,7 @@ public class GridNearCacheEntry extends GridDistributedCacheEntry {
if (this.dhtVer == null || this.dhtVer.compareTo(dhtVer) < 0) {
primaryNode(primaryNodeId, topVer);
- update(val, expireTime, ttl, ver);
+ update(val, expireTime, ttl, ver, true);
if (cctx.deferredDelete() && !isInternal()) {
boolean deleted = val == null;
http://git-wip-us.apache.org/repos/asf/ignite/blob/078689b2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
index 0d8f795..df95e2e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
@@ -809,161 +809,67 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
prj0 = prj0.keepBinary();
- final IgniteInternalCache<K, V> prj = prj0;
+ final IgniteInternalCache prj = prj0;
final IgniteBiPredicate<K, V> keyValFilter = qry.scanFilter();
try {
injectResources(keyValFilter);
- final GridDhtCacheAdapter dht = cctx.isLocal() ? null : (cctx.isNear() ? cctx.near().dht() : cctx.dht());
-
- final GridCacheAdapter cache = dht != null ? dht : cctx.cache();
-
final ExpiryPolicy plc = cctx.expiry();
final AffinityTopologyVersion topVer = cctx.affinity().affinityTopologyVersion();
final boolean backups = qry.includeBackups() || cctx.isReplicated();
- final GridCloseableIteratorAdapter<IgniteBiTuple<K, V>> heapIt =
- new GridCloseableIteratorAdapter<IgniteBiTuple<K, V>>() {
- private IgniteBiTuple<K, V> next;
-
- private IgniteCacheExpiryPolicy expiryPlc = cctx.cache().expiryPolicy(plc);
-
- private Iterator<K> iter;
-
- private GridDhtLocalPartition locPart;
+ Iterator<K> keyIter;
- {
- Integer part = qry.partition();
+ GridDhtLocalPartition locPart = null;
- if (part == null || dht == null)
- iter = backups ? prj.keySetx().iterator() : prj.primaryKeySet().iterator();
- else if (part < 0 || part >= cctx.affinity().partitions())
- iter = F.emptyIterator();
- else {
- locPart = dht.topology().localPartition(part, topVer, false);
+ Integer part = qry.partition();
- // double check for owning state
- if (locPart == null || locPart.state() != OWNING || !locPart.reserve() ||
- locPart.state() != OWNING)
- throw new GridDhtUnreservedPartitionException(part,
- cctx.affinity().affinityTopologyVersion(), "Partition can not be reserved");
-
- iter = new Iterator<K>() {
- private Iterator<KeyCacheObject> iter0 = locPart.keySet().iterator();
+ if (part == null || cctx.isLocal())
+ keyIter = backups ? prj.keySetx().iterator() : prj.primaryKeySet().iterator();
+ else if (part < 0 || part >= cctx.affinity().partitions())
+ keyIter = F.emptyIterator();
+ else {
+ final GridDhtCacheAdapter dht = cctx.isNear() ? cctx.near().dht() : cctx.dht();
- @Override public boolean hasNext() {
- return iter0.hasNext();
- }
+ locPart = dht.topology().localPartition(part, topVer, false);
- @Override public K next() {
- KeyCacheObject key = iter0.next();
+ // double check for owning state
+ if (locPart == null || locPart.state() != OWNING || !locPart.reserve() || locPart.state() != OWNING)
+ throw new GridDhtUnreservedPartitionException(part, cctx.affinity().affinityTopologyVersion(),
+ "Partition can not be reserved");
- return (K)cctx.unwrapBinaryIfNeeded(key, true);
- }
+ final GridDhtLocalPartition locPart0 = locPart;
- @Override public void remove() {
- iter0.remove();
- }
- };
- }
+ keyIter = new Iterator<K>() {
+ private Iterator<KeyCacheObject> iter0 = locPart0.keySet().iterator();
- advance();
+ @Override public boolean hasNext() {
+ return iter0.hasNext();
}
- @Override public boolean onHasNext() {
- return next != null;
+ @Override public K next() {
+ return (K)iter0.next();
}
- @Override public IgniteBiTuple<K, V> onNext() {
- if (next == null)
- throw new NoSuchElementException();
-
- IgniteBiTuple<K, V> next0 = next;
-
- advance();
-
- return next0;
+ @Override public void remove() {
+ iter0.remove();
}
+ };
+ }
- private void advance() {
- IgniteBiTuple<K, V> next0 = null;
-
- while (iter.hasNext()) {
- next0 = null;
-
- K key = iter.next();
-
- V val;
-
- try {
- GridCacheEntryEx entry = cache.peekEx(key);
-
- CacheObject cacheVal =
- entry != null ? entry.peek(true, false, false, topVer, expiryPlc) : null;
-
- val = (V)cctx.cacheObjectContext().unwrapBinaryIfNeeded(cacheVal, true);
- }
- catch (GridCacheEntryRemovedException e) {
- val = null;
- }
- catch (IgniteCheckedException e) {
- if (log.isDebugEnabled())
- log.debug("Failed to peek value: " + e);
-
- val = null;
- }
-
- if (dht != null && expiryPlc != null && expiryPlc.readyToFlush(100)) {
- dht.sendTtlUpdateRequest(expiryPlc);
-
- expiryPlc = cctx.cache().expiryPolicy(plc);
- }
-
- if (val != null) {
- next0 = F.t(key, val);
-
- if (checkPredicate(next0))
- break;
- else
- next0 = null;
- }
- }
-
- next = next0 != null ?
- new IgniteBiTuple<>(next0.getKey(), next0.getValue()) :
- null;
-
- if (next == null)
- sendTtlUpdate();
- }
+ final GridDhtLocalPartition locPart0 = locPart;
+ final GridCloseableIteratorAdapter<IgniteBiTuple<K, V>> heapIt =
+ new PeekValueExpiryAwareIterator(keyIter, plc, topVer, keyValFilter, qry.keepBinary(), true) {
@Override protected void onClose() {
- sendTtlUpdate();
-
- if (locPart != null)
- locPart.release();
- }
-
- private void sendTtlUpdate() {
- if (dht != null && expiryPlc != null) {
- dht.sendTtlUpdateRequest(expiryPlc);
+ super.onClose();
- expiryPlc = null;
- }
- }
-
- private boolean checkPredicate(Map.Entry<K, V> e) {
- if (keyValFilter != null) {
- Map.Entry<K, V> e0 = (Map.Entry<K, V>)cctx.unwrapBinaryIfNeeded(e, qry.keepBinary());
-
- return keyValFilter.apply(e0.getKey(), e0.getValue());
- }
-
- return true;
+ if (locPart0 != null)
+ locPart0.release();
}
};
@@ -975,10 +881,10 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
iters.add(heapIt);
if (cctx.isOffHeapEnabled())
- iters.add(offheapIterator(qry, backups));
+ iters.add(offheapIterator(qry, topVer, backups, plc));
if (cctx.swap().swapEnabled())
- iters.add(swapIterator(qry, backups));
+ iters.add(swapIterator(qry, topVer, backups, plc));
it = new CompoundIterator<>(iters);
}
@@ -1032,8 +938,12 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
* @return Swap iterator.
* @throws IgniteCheckedException If failed.
*/
- private GridIterator<IgniteBiTuple<K, V>> swapIterator(GridCacheQueryAdapter<?> qry, boolean backups)
- throws IgniteCheckedException {
+ private GridIterator<IgniteBiTuple<K, V>> swapIterator(
+ GridCacheQueryAdapter<?> qry,
+ AffinityTopologyVersion topVer,
+ boolean backups,
+ ExpiryPolicy expPlc
+ ) throws IgniteCheckedException {
IgniteBiPredicate<K, V> filter = qry.scanFilter();
Integer part = qry.partition();
@@ -1041,6 +951,14 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
Iterator<Map.Entry<byte[], byte[]>> it = part == null ? cctx.swap().rawSwapIterator(true, backups) :
cctx.swap().rawSwapIterator(part);
+ if (expPlc != null)
+ return scanExpiryIterator(
+ it,
+ topVer,
+ filter,
+ expPlc,
+ qry.keepBinary());
+
return scanIterator(it, filter, qry.keepBinary());
}
@@ -1049,9 +967,23 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
* @param backups Include backups.
* @return Offheap iterator.
*/
- private GridIterator<IgniteBiTuple<K, V>> offheapIterator(GridCacheQueryAdapter<?> qry, boolean backups) {
+ private GridIterator<IgniteBiTuple<K, V>> offheapIterator(
+ GridCacheQueryAdapter<?> qry,
+ AffinityTopologyVersion topVer,
+ boolean backups,
+ ExpiryPolicy expPlc
+ ) {
IgniteBiPredicate<K, V> filter = qry.scanFilter();
+ if (expPlc != null) {
+ return scanExpiryIterator(
+ cctx.swap().rawOffHeapIterator(qry.partition(), true, backups),
+ topVer,
+ filter,
+ expPlc,
+ qry.keepBinary());
+ }
+
if (cctx.offheapTiered() && filter != null) {
OffheapIteratorClosure c = new OffheapIteratorClosure(filter, qry.keepBinary());
@@ -1126,6 +1058,48 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
}
/**
+ * @param it Raw iterator.
+ * @param topVer Topology version.
+ * @param filter Filter.
+ * @param expPlc Expiry policy.
+ * @param keepBinary Keep binary flag.
+ * @return Final key-value iterator.
+ */
+ private GridIterator<IgniteBiTuple<K,V>> scanExpiryIterator(
+ final Iterator<Map.Entry<byte[], byte[]>> it,
+ AffinityTopologyVersion topVer,
+ @Nullable final IgniteBiPredicate<K, V> filter,
+ ExpiryPolicy expPlc,
+ final boolean keepBinary
+ ) {
+ Iterator <K> keyIter = new Iterator<K>() {
+ /** {@inheritDoc} */
+ @Override public boolean hasNext() {
+ return it.hasNext();
+ }
+
+ /** {@inheritDoc} */
+ @Override public K next() {
+ try {
+ KeyCacheObject key = cctx.toCacheKeyObject(it.next().getKey());
+
+ return (K)cctx.unwrapBinaryIfNeeded(key, keepBinary);
+ }
+ catch (IgniteCheckedException e) {
+ throw new IgniteException(e);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public void remove() {
+ it.remove();
+ }
+ };
+
+ return new PeekValueExpiryAwareIterator(keyIter, expPlc, topVer, filter, keepBinary, false);
+ }
+
+ /**
* @param o Object to inject resources to.
* @throws IgniteCheckedException If failure occurred while injecting resources.
*/
@@ -3115,4 +3089,192 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
false,
keepBinary);
}
+
+ /**
+ *
+ */
+ private class PeekValueExpiryAwareIterator extends GridCloseableIteratorAdapter<IgniteBiTuple<K, V>> {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** */
+ private final ExpiryPolicy plc;
+
+ /** */
+ private final GridCacheAdapter cache;
+
+ /** */
+ private final AffinityTopologyVersion topVer;
+
+ /** */
+ private final GridDhtCacheAdapter dht;
+
+ /** */
+ private final IgniteBiPredicate<K, V> keyValFilter;
+
+ /** Heap only flag. */
+ private boolean heapOnly;
+
+ /** */
+ private final boolean keepBinary;
+
+ /** */
+ private IgniteBiTuple<K, V> next;
+
+ /** */
+ private IgniteCacheExpiryPolicy expiryPlc;
+
+ /** */
+ private Iterator<K> keyIt;
+
+ /**
+ * @param keyIt Key iterator.
+ * @param plc Expiry policy.
+ * @param topVer Topology version.
+ * @param keyValFilter Key-value filter.
+ * @param keepBinary Keep binary flag from the query.
+ */
+ private PeekValueExpiryAwareIterator(
+ Iterator<K> keyIt,
+ ExpiryPolicy plc,
+ AffinityTopologyVersion topVer,
+ IgniteBiPredicate<K, V> keyValFilter,
+ boolean keepBinary,
+ boolean heapOnly
+ ) {
+ this.keyIt = keyIt;
+ this.plc = plc;
+ this.topVer = topVer;
+ this.keyValFilter = keyValFilter;
+ this.heapOnly = heapOnly;
+
+ dht = cctx.isLocal() ? null : (cctx.isNear() ? cctx.near().dht() : cctx.dht());
+ cache = dht != null ? dht : cctx.cache();
+
+ this.keepBinary = keepBinary;
+ expiryPlc = cctx.cache().expiryPolicy(plc);
+
+ advance();
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean onHasNext() {
+ return next != null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public IgniteBiTuple<K, V> onNext() {
+ if (next == null)
+ throw new NoSuchElementException();
+
+ IgniteBiTuple<K, V> next0 = next;
+
+ advance();
+
+ return next0;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void onClose() {
+ sendTtlUpdate();
+ }
+
+ /**
+ * Moves the iterator to the next cache entry.
+ */
+ private void advance() {
+ IgniteBiTuple<K, V> next0 = null;
+
+ while (keyIt.hasNext()) {
+ next0 = null;
+
+ K key = keyIt.next();
+
+ CacheObject val;
+
+ try {
+ val = value(key);
+ }
+ catch (IgniteCheckedException e) {
+ if (log.isDebugEnabled())
+ log.debug("Failed to peek value: " + e);
+
+ val = null;
+ }
+
+ if (dht != null && expiryPlc != null && expiryPlc.readyToFlush(100)) {
+ dht.sendTtlUpdateRequest(expiryPlc);
+
+ expiryPlc = cctx.cache().expiryPolicy(plc);
+ }
+
+ if (val != null) {
+ next0 = F.t(
+ (K)cctx.unwrapBinaryIfNeeded(key, true),
+ (V)cctx.unwrapBinaryIfNeeded(val, true));
+
+ if (checkPredicate(next0))
+ break;
+ else
+ next0 = null;
+ }
+ }
+
+ next = next0 != null ?
+ new IgniteBiTuple<>(next0.getKey(), next0.getValue()) :
+ null;
+
+ if (next == null)
+ sendTtlUpdate();
+ }
+
+ /**
+ * Sends TTL update.
+ */
+ private void sendTtlUpdate() {
+ if (dht != null && expiryPlc != null) {
+ dht.sendTtlUpdateRequest(expiryPlc);
+
+ expiryPlc = null;
+ }
+ }
+
+ /**
+ * @param key Key.
+ * @return Value.
+ * @throws IgniteCheckedException If failed to peek value.
+ */
+ private CacheObject value(K key) throws IgniteCheckedException {
+ while (true) {
+ try {
+ GridCacheEntryEx entry = heapOnly ? cache.peekEx(key) : cache.entryEx(key);
+
+ if (expiryPlc != null && !heapOnly)
+ entry.unswap();
+
+ return entry != null ? entry.peek(true, !heapOnly, !heapOnly, topVer, expiryPlc) : null;
+ }
+ catch (GridCacheEntryRemovedException ignore) {
+ if (heapOnly)
+ return null;
+ }
+ }
+ }
+
+ /**
+ * Check key-value predicate.
+ *
+ * @param e Entry to check.
+ * @return Filter evaluation result.
+ */
+ private boolean checkPredicate(Map.Entry<K, V> e) {
+ if (keyValFilter != null) {
+ Map.Entry<K, V> e0 = (Map.Entry<K, V>)cctx.unwrapBinaryIfNeeded(e, keepBinary);
+
+ return keyValFilter.apply(e0.getKey(), e0.getValue());
+ }
+
+ return true;
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/078689b2/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheAtomicLocalOffheapExpiryPolicyTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheAtomicLocalOffheapExpiryPolicyTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheAtomicLocalOffheapExpiryPolicyTest.java
new file mode 100644
index 0000000..24a7478
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheAtomicLocalOffheapExpiryPolicyTest.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.expiry;
+
+import org.apache.ignite.cache.CacheMemoryMode;
+
+/**
+ *
+ */
+public class IgniteCacheAtomicLocalOffheapExpiryPolicyTest extends IgniteCacheAtomicLocalExpiryPolicyTest {
+ /** {@inheritDoc} */
+ @Override protected CacheMemoryMode memoryMode() {
+ return CacheMemoryMode.OFFHEAP_TIERED;
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/078689b2/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheAtomicOffheapExpiryPolicyTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheAtomicOffheapExpiryPolicyTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheAtomicOffheapExpiryPolicyTest.java
new file mode 100644
index 0000000..45a0183
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheAtomicOffheapExpiryPolicyTest.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.expiry;
+
+import org.apache.ignite.cache.CacheMemoryMode;
+
+/**
+ *
+ */
+public class IgniteCacheAtomicOffheapExpiryPolicyTest extends IgniteCacheAtomicExpiryPolicyTest {
+ /** {@inheritDoc} */
+ @Override protected CacheMemoryMode memoryMode() {
+ return CacheMemoryMode.OFFHEAP_TIERED;
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/078689b2/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheAtomicPrimaryWriteOrderOffheapExpiryPolicyTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheAtomicPrimaryWriteOrderOffheapExpiryPolicyTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheAtomicPrimaryWriteOrderOffheapExpiryPolicyTest.java
new file mode 100644
index 0000000..594a6d9
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheAtomicPrimaryWriteOrderOffheapExpiryPolicyTest.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.expiry;
+
+import org.apache.ignite.cache.CacheMemoryMode;
+
+/**
+ *
+ */
+public class IgniteCacheAtomicPrimaryWriteOrderOffheapExpiryPolicyTest extends
+ IgniteCacheAtomicPrimaryWriteOrderExpiryPolicyTest {
+ /** {@inheritDoc} */
+ @Override protected CacheMemoryMode memoryMode() {
+ return CacheMemoryMode.OFFHEAP_TIERED;
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/078689b2/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheAtomicPrimaryWriteOrderWithStoreOffheapExpiryPolicyTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheAtomicPrimaryWriteOrderWithStoreOffheapExpiryPolicyTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheAtomicPrimaryWriteOrderWithStoreOffheapExpiryPolicyTest.java
new file mode 100644
index 0000000..51d8ba2
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheAtomicPrimaryWriteOrderWithStoreOffheapExpiryPolicyTest.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.expiry;
+
+import org.apache.ignite.cache.CacheMemoryMode;
+
+/**
+ *
+ */
+public class IgniteCacheAtomicPrimaryWriteOrderWithStoreOffheapExpiryPolicyTest extends
+ IgniteCacheAtomicPrimaryWriteOrderWithStoreExpiryPolicyTest {
+ /** {@inheritDoc} */
+ @Override protected CacheMemoryMode memoryMode() {
+ return CacheMemoryMode.OFFHEAP_TIERED;
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/078689b2/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheAtomicReplicatedOffheapExpiryPolicyTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheAtomicReplicatedOffheapExpiryPolicyTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheAtomicReplicatedOffheapExpiryPolicyTest.java
new file mode 100644
index 0000000..a65e155
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheAtomicReplicatedOffheapExpiryPolicyTest.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.expiry;
+
+import org.apache.ignite.cache.CacheMemoryMode;
+
+/**
+ *
+ */
+public class IgniteCacheAtomicReplicatedOffheapExpiryPolicyTest extends IgniteCacheAtomicReplicatedExpiryPolicyTest {
+ /** {@inheritDoc} */
+ @Override protected CacheMemoryMode memoryMode() {
+ return CacheMemoryMode.OFFHEAP_TIERED;
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/078689b2/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheAtomicWithStoreOffheapExpiryPolicyTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheAtomicWithStoreOffheapExpiryPolicyTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheAtomicWithStoreOffheapExpiryPolicyTest.java
new file mode 100644
index 0000000..ffb170b
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheAtomicWithStoreOffheapExpiryPolicyTest.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.expiry;
+
+import org.apache.ignite.cache.CacheMemoryMode;
+
+/**
+ *
+ */
+public class IgniteCacheAtomicWithStoreOffheapExpiryPolicyTest extends IgniteCacheAtomicWithStoreExpiryPolicyTest {
+ /** {@inheritDoc} */
+ @Override protected CacheMemoryMode memoryMode() {
+ return CacheMemoryMode.OFFHEAP_TIERED;
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/078689b2/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheExpiryPolicyAbstractTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheExpiryPolicyAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheExpiryPolicyAbstractTest.java
index 7d22206..f4cc025 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheExpiryPolicyAbstractTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheExpiryPolicyAbstractTest.java
@@ -33,20 +33,26 @@ import javax.cache.expiry.CreatedExpiryPolicy;
import javax.cache.expiry.Duration;
import javax.cache.expiry.EternalExpiryPolicy;
import javax.cache.expiry.ExpiryPolicy;
+import javax.cache.expiry.ModifiedExpiryPolicy;
import javax.cache.processor.EntryProcessor;
import javax.cache.processor.MutableEntry;
import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.cache.CacheMemoryMode;
import org.apache.ignite.cache.CachePeekMode;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.NearCacheConfiguration;
+import org.apache.ignite.internal.IgniteInterruptedCheckedException;
import org.apache.ignite.internal.IgniteKernal;
import org.apache.ignite.internal.processors.cache.GridCacheAdapter;
import org.apache.ignite.internal.processors.cache.GridCacheEntryEx;
+import org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedException;
import org.apache.ignite.internal.processors.cache.IgniteCacheAbstractTest;
+import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtInvalidPartitionException;
import org.apache.ignite.internal.util.lang.GridAbsPredicate;
import org.apache.ignite.internal.util.typedef.F;
-import org.apache.ignite.internal.util.typedef.PA;
+import org.apache.ignite.internal.util.typedef.PAX;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.testframework.GridTestUtils;
import org.apache.ignite.transactions.Transaction;
@@ -95,7 +101,6 @@ public abstract class IgniteCacheExpiryPolicyAbstractTest extends IgniteCacheAbs
storeMap.clear();
}
-
/** {@inheritDoc} */
@Override protected CacheConfiguration cacheConfiguration(String gridName) throws Exception {
CacheConfiguration cfg = super.cacheConfiguration(gridName);
@@ -105,6 +110,11 @@ public abstract class IgniteCacheExpiryPolicyAbstractTest extends IgniteCacheAbs
cfg.setExpiryPolicyFactory(factory);
+ cfg.setMemoryMode(memoryMode());
+
+ if (memoryMode() == CacheMemoryMode.OFFHEAP_TIERED)
+ cfg.setOffHeapMaxMemory(0);
+
if (disableEagerTtl)
cfg.setEagerTtl(false);
@@ -112,7 +122,44 @@ public abstract class IgniteCacheExpiryPolicyAbstractTest extends IgniteCacheAbs
}
/**
- * @throws Exception If failed.
+ * @return Cache memory mode.
+ */
+ protected CacheMemoryMode memoryMode() {
+ return CacheMemoryMode.ONHEAP_TIERED;
+ }
+
+ /**
+ * @throws Exception if failed.
+ */
+ public void testCreateUpdate0() throws Exception {
+ startGrids(1);
+
+ long ttl = 60L;
+
+ final String key = "key1";
+
+ final IgniteCache<String, String> cache = jcache();
+
+ for (int i = 0; i < 1000; i++) {
+ final IgniteCache<String, String> cache0 = cache.withExpiryPolicy(new ModifiedExpiryPolicy(new Duration(TimeUnit.HOURS, ttl)));
+
+ cache0.put(key, key);
+
+ info("PUT DONE");
+ }
+
+ int pSize = grid(0).context().cache().internalCache(null).context().ttl().pendingSize();
+
+ assertTrue("Too many pending entries: " + pSize, pSize <= 1);
+
+ cache.remove(key);
+
+ pSize = grid(0).context().cache().internalCache(null).context().ttl().pendingSize();
+
+ assertEquals(0, pSize);
+ }
+
+ /** * @throws Exception If failed.
*/
public void testZeroOnCreate() throws Exception {
factory = CreatedExpiryPolicy.factoryOf(Duration.ZERO);
@@ -349,6 +396,8 @@ public abstract class IgniteCacheExpiryPolicyAbstractTest extends IgniteCacheAbs
Collection<Integer> putKeys = keys();
+ info("Put keys: " + putKeys);
+
for (final Integer key : putKeys)
cache.put(key, key);
@@ -359,10 +408,15 @@ public abstract class IgniteCacheExpiryPolicyAbstractTest extends IgniteCacheAbs
while (it.hasNext())
itKeys.add(it.next().getKey());
+ info("It keys: " + itKeys);
+
assertTrue(itKeys.size() >= putKeys.size());
- for (Integer key : itKeys)
+ for (Integer key : itKeys) {
+ info("Checking iterator key: " + key);
+
checkTtl(key, 62_000L, true);
+ }
}
/**
@@ -1016,7 +1070,7 @@ public abstract class IgniteCacheExpiryPolicyAbstractTest extends IgniteCacheAbs
ClusterNode node = grid(i).cluster().localNode();
for (Integer key : keys) {
- Object val = jcache(i).localPeek(key, CachePeekMode.ONHEAP);
+ Object val = jcache(i).localPeek(key, CachePeekMode.ONHEAP, CachePeekMode.OFFHEAP);
if (val != null) {
log.info("Unexpected value [grid=" + i +
@@ -1059,51 +1113,54 @@ public abstract class IgniteCacheExpiryPolicyAbstractTest extends IgniteCacheAbs
GridCacheAdapter<Object, Object> cache = grid.context().cache().internalCache();
- GridCacheEntryEx e = cache.peekEx(key);
+ if (cache.context().isNear())
+ cache = cache.context().near().dht();
- if (e == null && cache.context().isNear())
- e = cache.context().near().dht().peekEx(key);
+ while (true) {
+ try {
+ GridCacheEntryEx e = memoryMode() == CacheMemoryMode.ONHEAP_TIERED ?
+ cache.peekEx(key) : cache.entryEx(key);
- if (e != null && e.deleted()) {
- assertEquals(0, e.ttl());
+ if (e != null && e.deleted()) {
+ assertEquals(0, e.ttl());
- assertFalse(cache.affinity().isPrimaryOrBackup(grid.localNode(), key));
+ assertFalse(cache.affinity().isPrimaryOrBackup(grid.localNode(), key));
- continue;
- }
+ continue;
+ }
- if (e == null)
- assertTrue("Not found " + key, !cache.affinity().isPrimaryOrBackup(grid.localNode(), key));
- else {
- found = true;
+ if (e == null)
+ assertTrue("Not found " + key, !cache.affinity().isPrimaryOrBackup(grid.localNode(), key));
+ else {
+ e.unswap();
- if (wait) {
- final GridCacheEntryEx e0 = e;
+ found = true;
- GridTestUtils.waitForCondition(new PA() {
- @Override public boolean apply() {
- try {
- return e0.ttl() == ttl;
- }
- catch (Exception e) {
- fail("Unexpected error: " + e);
+ if (wait)
+ waitTtl(cache, key, ttl);
- return true;
- }
- }
- }, 3000);
- }
+ boolean primary = cache.affinity().isPrimary(grid.localNode(), key);
+ boolean backup = cache.affinity().isBackup(grid.localNode(), key);
- boolean primary = cache.affinity().isPrimary(grid.localNode(), key);
- boolean backup = cache.affinity().isBackup(grid.localNode(), key);
+ assertEquals("Unexpected ttl [grid=" + i + ", nodeId=" + grid.getLocalNodeId() +
+ ", key=" + key + ", e=" + e + ", primary=" + primary + ", backup=" + backup + ']', ttl, e.ttl());
- assertEquals("Unexpected ttl [grid=" + i + ", key=" + key + ", e=" + e +
- ", primary=" + primary + ", backup=" + backup + ']', ttl, e.ttl());
+ if (ttl > 0)
+ assertTrue(e.expireTime() > 0);
+ else
+ assertEquals(0, e.expireTime());
+ }
- if (ttl > 0)
- assertTrue(e.expireTime() > 0);
- else
- assertEquals(0, e.expireTime());
+ break;
+ }
+ catch (GridCacheEntryRemovedException ignore) {
+ info("RETRY");
+ // Retry.
+ }
+ catch (GridDhtInvalidPartitionException ignore) {
+ // No need to check.
+ break;
+ }
}
}
@@ -1111,6 +1168,40 @@ public abstract class IgniteCacheExpiryPolicyAbstractTest extends IgniteCacheAbs
}
/**
+ * @param cache Cache.
+ * @param key Key.
+ * @param ttl TTL to wait.
+ * @throws IgniteInterruptedCheckedException If wait has been interrupted.
+ */
+ private void waitTtl(final GridCacheAdapter<Object, Object> cache, final Object key, final long ttl)
+ throws IgniteInterruptedCheckedException {
+ GridTestUtils.waitForCondition(new PAX() {
+ @Override public boolean applyx() throws IgniteCheckedException {
+ GridCacheEntryEx entry = null;
+
+ while (true) {
+ try {
+ entry = memoryMode() == CacheMemoryMode.ONHEAP_TIERED ?
+ cache.peekEx(key) : cache.entryEx(key);
+
+ assert entry != null;
+
+ entry.unswap();
+
+ return entry.ttl() == ttl;
+ }
+ catch (GridCacheEntryRemovedException ignore) {
+ // Retry.
+ }
+ catch (GridDhtInvalidPartitionException ignore) {
+ return true;
+ }
+ }
+ }
+ }, 3000);
+ }
+
+ /**
*
*/
private static class GetEntryProcessor implements EntryProcessor<Integer, Integer, Integer> {
http://git-wip-us.apache.org/repos/asf/ignite/blob/078689b2/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheExpiryPolicyTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheExpiryPolicyTestSuite.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheExpiryPolicyTestSuite.java
index 4f5419b..e6e2a0e 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheExpiryPolicyTestSuite.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheExpiryPolicyTestSuite.java
@@ -43,6 +43,19 @@ public class IgniteCacheExpiryPolicyTestSuite extends TestSuite {
suite.addTestSuite(IgniteCacheTxWithStoreExpiryPolicyTest.class);
suite.addTestSuite(IgniteCacheTxReplicatedExpiryPolicyTest.class);
+ // Offheap tests.
+ suite.addTestSuite(IgniteCacheAtomicLocalOffheapExpiryPolicyTest.class);
+ suite.addTestSuite(IgniteCacheAtomicOffheapExpiryPolicyTest.class);
+ suite.addTestSuite(IgniteCacheAtomicWithStoreOffheapExpiryPolicyTest.class);
+ suite.addTestSuite(IgniteCacheAtomicPrimaryWriteOrderOffheapExpiryPolicyTest.class);
+ suite.addTestSuite(IgniteCacheAtomicPrimaryWriteOrderWithStoreOffheapExpiryPolicyTest.class);
+ suite.addTestSuite(IgniteCacheAtomicReplicatedOffheapExpiryPolicyTest.class);
+
+ suite.addTestSuite(IgniteCacheTxLocalOffheapExpiryPolicyTest.class);
+ suite.addTestSuite(IgniteCacheTxOffheapExpiryPolicyTest.class);
+ suite.addTestSuite(IgniteCacheTxWithStoreOffheapExpiryPolicyTest.class);
+ suite.addTestSuite(IgniteCacheTxReplicatedOffheapExpiryPolicyTest.class);
+
suite.addTestSuite(IgniteCacheAtomicExpiryPolicyWithStoreTest.class);
suite.addTestSuite(IgniteCacheTxExpiryPolicyWithStoreTest.class);
http://git-wip-us.apache.org/repos/asf/ignite/blob/078689b2/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheTxLocalOffheapExpiryPolicyTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheTxLocalOffheapExpiryPolicyTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheTxLocalOffheapExpiryPolicyTest.java
new file mode 100644
index 0000000..60fa556
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheTxLocalOffheapExpiryPolicyTest.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.expiry;
+
+import org.apache.ignite.cache.CacheMemoryMode;
+
+/**
+ *
+ */
+public class IgniteCacheTxLocalOffheapExpiryPolicyTest extends IgniteCacheTxLocalExpiryPolicyTest {
+ /** {@inheritDoc} */
+ @Override protected CacheMemoryMode memoryMode() {
+ return CacheMemoryMode.OFFHEAP_TIERED;
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/078689b2/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheTxOffheapExpiryPolicyTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheTxOffheapExpiryPolicyTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheTxOffheapExpiryPolicyTest.java
new file mode 100644
index 0000000..c118457
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheTxOffheapExpiryPolicyTest.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.expiry;
+
+import org.apache.ignite.cache.CacheMemoryMode;
+
+/**
+ *
+ */
+public class IgniteCacheTxOffheapExpiryPolicyTest extends IgniteCacheTxExpiryPolicyTest {
+ /** {@inheritDoc} */
+ @Override protected CacheMemoryMode memoryMode() {
+ return CacheMemoryMode.OFFHEAP_TIERED;
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/078689b2/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheTxReplicatedOffheapExpiryPolicyTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheTxReplicatedOffheapExpiryPolicyTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheTxReplicatedOffheapExpiryPolicyTest.java
new file mode 100644
index 0000000..1ee7c3c
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheTxReplicatedOffheapExpiryPolicyTest.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.expiry;
+
+import org.apache.ignite.cache.CacheMemoryMode;
+
+/**
+ *
+ */
+public class IgniteCacheTxReplicatedOffheapExpiryPolicyTest extends IgniteCacheTxReplicatedExpiryPolicyTest {
+ /** {@inheritDoc} */
+ @Override protected CacheMemoryMode memoryMode() {
+ return CacheMemoryMode.OFFHEAP_TIERED;
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/078689b2/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheTxWithStoreOffheapExpiryPolicyTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheTxWithStoreOffheapExpiryPolicyTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheTxWithStoreOffheapExpiryPolicyTest.java
new file mode 100644
index 0000000..5ffd052
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheTxWithStoreOffheapExpiryPolicyTest.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.expiry;
+
+import org.apache.ignite.cache.CacheMemoryMode;
+
+/**
+ *
+ */
+public class IgniteCacheTxWithStoreOffheapExpiryPolicyTest extends IgniteCacheTxWithStoreExpiryPolicyTest {
+ /** {@inheritDoc} */
+ @Override protected CacheMemoryMode memoryMode() {
+ return CacheMemoryMode.OFFHEAP_TIERED;
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/078689b2/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBinaryCacheTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBinaryCacheTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBinaryCacheTestSuite.java
index 982bd4c..69ca67e 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBinaryCacheTestSuite.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBinaryCacheTestSuite.java
@@ -43,6 +43,7 @@ import org.apache.ignite.internal.processors.cache.binary.distributed.dht.GridCa
import org.apache.ignite.internal.processors.cache.binary.distributed.dht.GridCacheOffHeapTieredBinarySelfTest;
import org.apache.ignite.internal.processors.cache.binary.distributed.dht.GridCacheBinariesNearPartitionedByteArrayValuesSelfTest;
import org.apache.ignite.internal.processors.cache.binary.distributed.dht.GridCacheBinariesPartitionedOnlyByteArrayValuesSelfTest;
+import org.apache.ignite.internal.processors.cache.expiry.IgniteCacheAtomicLocalOffheapExpiryPolicyTest;
import org.apache.ignite.internal.processors.datastreamer.DataStreamProcessorSelfTest;
import org.apache.ignite.testframework.config.GridTestProperties;
@@ -69,6 +70,7 @@ public class IgniteBinaryCacheTestSuite extends TestSuite {
ignoredTests.add(GridCacheOffHeapTieredAtomicSelfTest.class);
ignoredTests.add(GridCacheAffinityRoutingSelfTest.class);
ignoredTests.add(IgniteCacheAtomicLocalExpiryPolicyTest.class);
+ ignoredTests.add(IgniteCacheAtomicLocalOffheapExpiryPolicyTest.class);
ignoredTests.add(GridCacheEntryMemorySizeSelfTest.class);
// Tests that are not ready to be used with BinaryMarshaller
http://git-wip-us.apache.org/repos/asf/ignite/blob/078689b2/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteClientReconnectQueriesTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteClientReconnectQueriesTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteClientReconnectQueriesTest.java
index 364c992..ad04a51 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteClientReconnectQueriesTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteClientReconnectQueriesTest.java
@@ -321,7 +321,14 @@ public class IgniteClientReconnectQueriesTest extends IgniteClientReconnectAbstr
QueryCursor<Cache.Entry<Integer, Person>> qryCursor2 = clnCache.query(scanQry);
- assertEquals(setPart ? 1 : 3, qryCursor2.getAll().size());
+ List<Cache.Entry<Integer, Person>> entries = qryCursor2.getAll();
+
+ assertEquals(setPart ? 1 : 3, entries.size());
+
+ for (Cache.Entry<Integer, Person> entry : entries) {
+ assertEquals(Integer.class, entry.getKey().getClass());
+ assertEquals(Person.class, entry.getValue().getClass());
+ }
}
/**