You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2015/05/13 11:52:07 UTC
incubator-ignite git commit: # ignite-876
Repository: incubator-ignite
Updated Branches:
refs/heads/ignite-876 [created] b16fd589c
# ignite-876
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/b16fd589
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/b16fd589
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/b16fd589
Branch: refs/heads/ignite-876
Commit: b16fd589c7ab493f60ffb77a9fa2930fbedaabd5
Parents: bbc21a6
Author: sboikov <sb...@gridgain.com>
Authored: Wed May 13 12:51:34 2015 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Wed May 13 12:51:34 2015 +0300
----------------------------------------------------------------------
.../processors/cache/GridCacheMapEntry.java | 149 ++++++++++++-------
1 file changed, 96 insertions(+), 53 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b16fd589/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
index 71858d1..bfff2d6 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
@@ -488,34 +488,34 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx {
if (cctx.isSwapOrOffheapEnabled() && !deletedUnlocked() && hasValueUnlocked() && !detached()) {
assert Thread.holdsLock(this);
- long expireTime = expireTimeExtras();
-
- if (expireTime > 0 && U.currentTimeMillis() >= expireTime) { // Don't swap entry if it's expired.
- // Entry might have been updated.
- if (cctx.offheapTiered()) {
- cctx.swap().removeOffheap(key);
+ if (cctx.offheapTiered()) {
+ if (val == null && valPtr != 0) {
+ if (log.isDebugEnabled())
+ log.debug("Value did not change, skip write swap entry: " + this);
- valPtr = 0;
+ if (cctx.swap().offheapEvictionEnabled())
+ cctx.swap().enableOffheapEviction(key());
}
return;
}
- if (val == null && cctx.offheapTiered() && valPtr != 0) {
- if (log.isDebugEnabled())
- log.debug("Value did not change, skip write swap entry: " + this);
-
- if (cctx.swap().offheapEvictionEnabled())
- cctx.swap().enableOffheapEviction(key());
+ long expireTime = expireTimeExtras();
+ if (expireTime > 0 && U.currentTimeMillis() >= expireTime) // Don't swap entry if it's expired.
return;
- }
IgniteUuid valClsLdrId = null;
+ IgniteUuid keyClsLdrId = null;
- if (val != null) {
- valClsLdrId = cctx.deploy().getClassLoaderId(
- val.value(cctx.cacheObjectContext(), false).getClass().getClassLoader());
+ if (cctx.kernalContext().config().isPeerClassLoadingEnabled()) {
+ if (val != null) {
+ valClsLdrId = cctx.deploy().getClassLoaderId(
+ U.detectObjectClassLoader(val.value(cctx.cacheObjectContext(), false)));
+ }
+
+ keyClsLdrId = cctx.deploy().getClassLoaderId(
+ U.detectObjectClassLoader(key.value(cctx.cacheObjectContext(), false)));
}
IgniteBiTuple<byte[], Byte> valBytes = valueBytes0();
@@ -526,7 +526,7 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx {
ver,
ttlExtras(),
expireTime,
- cctx.deploy().getClassLoaderId(U.detectObjectClassLoader(key.value(cctx.cacheObjectContext(), false))),
+ keyClsLdrId,
valClsLdrId);
if (log.isDebugEnabled())
@@ -816,13 +816,10 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx {
// Update indexes before actual write to entry.
updateIndex(ret, expTime, nextVer, prevVal);
- boolean hadValPtr = valPtr != 0;
-
// Don't change version for read-through.
update(ret, expTime, ttl, nextVer);
- if (hadValPtr && cctx.offheapTiered())
- cctx.swap().removeOffheap(key);
+ updateOffheap(ret, expTime, ttl);
if (cctx.deferredDelete() && deletedUnlocked() && !isInternal() && !detached())
deletedUnlocked(false);
@@ -903,6 +900,8 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx {
update(ret, expTime, ttl, nextVer);
+ updateOffheap(ret, expTime, ttl);
+
touch = true;
// If value was set - return, otherwise try again.
@@ -1037,6 +1036,8 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx {
update(val, expireTime, ttl, newVer);
+ updateOffheap(val, expireTime, ttl);
+
drReplicate(drType, val, newVer);
recordNodeId(affNodeId, topVer);
@@ -1164,15 +1165,10 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx {
// can be updated without actually holding entry lock.
clearIndex(old);
- boolean hadValPtr = valPtr != 0;
-
update(null, 0, 0, newVer);
- if (cctx.offheapTiered() && hadValPtr) {
- boolean rmv = cctx.swap().removeOffheap(key);
-
- assert rmv;
- }
+ if (cctx.offheapTiered())
+ cctx.swap().removeOffheap(key);
if (cctx.deferredDelete() && !detached() && !isInternal()) {
if (!deletedUnlocked()) {
@@ -1351,6 +1347,8 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx {
clearIndex(null);
update(old, expireTime, ttl, ver);
+
+ updateOffheap(old, expireTime, ttl);
}
// Apply metrics.
@@ -1495,6 +1493,8 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx {
update(updated, expireTime, ttl, ver);
+ updateOffheap(updated, expireTime, ttl);
+
if (evt) {
CacheObject evtOld = null;
@@ -1521,19 +1521,14 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx {
// Must persist inside synchronization in non-tx mode.
cctx.store().remove(null, keyValue(false));
- boolean hasValPtr = valPtr != 0;
-
// Update index inside synchronization since it can be updated
// in load methods without actually holding entry lock.
clearIndex(old);
update(null, CU.TTL_ETERNAL, CU.EXPIRE_TIME_ETERNAL, ver);
- if (cctx.offheapTiered() && hasValPtr) {
- boolean rmv = cctx.swap().removeOffheap(key);
-
- assert rmv;
- }
+ if (cctx.offheapTiered())
+ cctx.swap().removeOffheap(key);
if (evt) {
CacheObject evtOld = null;
@@ -1841,6 +1836,8 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx {
update(oldVal, initExpireTime, initTtl, ver);
+ updateOffheap(oldVal, initExpireTime, initTtl);
+
if (deletedUnlocked() && oldVal != null && !isInternal())
deletedUnlocked(false);
}
@@ -2052,6 +2049,8 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx {
update(updated, newExpireTime, newTtl, newVer);
+ updateOffheap(updated, newExpireTime, newTtl);
+
drReplicate(drType, updated, newVer);
recordNodeId(affNodeId, topVer);
@@ -2122,19 +2121,14 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx {
enqueueVer = newVer;
- boolean hasValPtr = valPtr != 0;
-
// Clear value on backup. Entry will be removed from cache when it got evicted from queue.
update(null, CU.TTL_ETERNAL, CU.EXPIRE_TIME_ETERNAL, newVer);
assert newSysTtl == CU.TTL_NOT_CHANGED;
assert newSysExpireTime == CU.EXPIRE_TIME_CALCULATE;
- if (cctx.offheapTiered() && hasValPtr) {
- boolean rmv = cctx.swap().removeOffheap(key);
-
- assert rmv;
- }
+ if (cctx.offheapTiered())
+ cctx.swap().removeOffheap(key);
clearReaders();
@@ -2989,6 +2983,8 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx {
// Version does not change for load ops.
update(val, expTime, ttl, ver);
+ updateOffheap(val, expTime, ttl);
+
boolean skipQryNtf = false;
if (val == null) {
@@ -3036,11 +3032,7 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx {
val = cctx.kernalContext().cacheObjects().prepareForCache(val, cctx);
// Version does not change for load ops.
- update(val,
- unswapped.expireTime(),
- unswapped.ttl(),
- unswapped.version()
- );
+ update(val, unswapped.expireTime(), unswapped.ttl(), unswapped.version());
return true;
}
@@ -3093,6 +3085,8 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx {
// Version does not change for load ops.
update(val, expTime, ttl, newVer);
+
+ updateOffheap(val, expTime, ttl);
}
return true;
@@ -3647,6 +3641,8 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx {
return true;
}
+ else if (cctx.offheapTiered())
+ cctx.swap().removeOffheap(key);
}
}
else {
@@ -3690,8 +3686,12 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx {
return true;
}
- else
+ else {
+ if (cctx.offheapTiered())
+ cctx.swap().removeOffheap(key);
+
return false;
+ }
}
}
}
@@ -3722,10 +3722,17 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx {
if (!hasReaders() && markObsolete0(obsoleteVer, false)) {
if (!isStartVersion() && hasValueUnlocked()) {
IgniteUuid valClsLdrId = null;
+ IgniteUuid keyClsLdrId = null;
- if (val != null)
- valClsLdrId = cctx.deploy().getClassLoaderId(
- U.detectObjectClassLoader(val.value(cctx.cacheObjectContext(), false)));
+ if (cctx.kernalContext().config().isPeerClassLoadingEnabled()) {
+ if (val != null) {
+ valClsLdrId = cctx.deploy().getClassLoaderId(
+ U.detectObjectClassLoader(val.value(cctx.cacheObjectContext(), false)));
+ }
+
+ keyClsLdrId = cctx.deploy().getClassLoaderId(
+ U.detectObjectClassLoader(key.value(cctx.cacheObjectContext(), false)));
+ }
IgniteBiTuple<byte[], Byte> valBytes = valueBytes0();
@@ -3736,7 +3743,7 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx {
ver,
ttlExtras(),
expireTimeExtras(),
- cctx.deploy().getClassLoaderId(U.detectObjectClassLoader(key.value(cctx.cacheObjectContext(), false))),
+ keyClsLdrId,
valClsLdrId);
}
@@ -4108,6 +4115,42 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx {
return extras != null ? extras.size() : 0;
}
+ /**
+ * @param val Value.
+ * @param expireTime Expire time.
+ * @param ttl TTL.
+ * @throws IgniteCheckedException If failed.
+ */
+ private void updateOffheap(@Nullable CacheObject val, long expireTime, long ttl) throws IgniteCheckedException {
+ if (cctx.offheapTiered() && val != null) {
+ IgniteUuid valClsLdrId;
+ IgniteUuid keyClsLdrId;
+
+ if (cctx.kernalContext().config().isPeerClassLoadingEnabled()) {
+ valClsLdrId = cctx.deploy().getClassLoaderId(
+ U.detectObjectClassLoader(val.value(cctx.cacheObjectContext(), false)));
+
+ keyClsLdrId = cctx.deploy().getClassLoaderId(
+ U.detectObjectClassLoader(key.value(cctx.cacheObjectContext(), false)));
+ }
+ else {
+ valClsLdrId = null;
+ keyClsLdrId = null;
+ }
+
+ byte[] valBytes = val.valueBytes(cctx.cacheObjectContext());
+
+ cctx.swap().write(key(),
+ ByteBuffer.wrap(valBytes),
+ val.type(),
+ ver,
+ ttl,
+ expireTime,
+ keyClsLdrId,
+ valClsLdrId);
+ }
+ }
+
/** {@inheritDoc} */
@Override public boolean equals(Object o) {
// Identity comparison left on purpose.