You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ak...@apache.org on 2018/04/09 02:19:04 UTC
[2/9] ignite git commit: IGNITE-8018 Optimized GridCacheMapEntry
initialValue() - Fixes #3686.
IGNITE-8018 Optimized GridCacheMapEntry initialValue() - Fixes #3686.
Signed-off-by: Alexey Goncharuk <al...@gmail.com>
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/57aecd7a
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/57aecd7a
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/57aecd7a
Branch: refs/heads/ignite-8159
Commit: 57aecd7ab2bf5f0836aabf40e4b4051fd07228d2
Parents: 84a40e5
Author: Ilya Lantukh <il...@gridgain.com>
Authored: Fri Apr 6 13:49:10 2018 +0300
Committer: Alexey Goncharuk <al...@gmail.com>
Committed: Fri Apr 6 13:49:10 2018 +0300
----------------------------------------------------------------------
.../processors/cache/GridCacheMapEntry.java | 158 +++++++++++++++----
.../colocated/GridDhtDetachedCacheEntry.java | 3 +-
.../distributed/near/GridNearCacheEntry.java | 3 +-
3 files changed, 131 insertions(+), 33 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/57aecd7a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
index 74dabe9..a6ef0d2 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
@@ -77,6 +77,7 @@ import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteBiTuple;
+import org.apache.ignite.lang.IgnitePredicate;
import org.jetbrains.annotations.Nullable;
import static org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_EXPIRED;
@@ -2699,40 +2700,80 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
) throws IgniteCheckedException, GridCacheEntryRemovedException {
ensureFreeSpace();
+ boolean deferred = false;
+ boolean obsolete = false;
+
+ GridCacheVersion oldVer = null;
+
lockEntry();
try {
checkObsolete();
+ boolean walEnabled = !cctx.isNear() && cctx.group().persistenceEnabled() && cctx.group().walEnabled();
+
+ long expTime = expireTime < 0 ? CU.toExpireTime(ttl) : expireTime;
+
+ val = cctx.kernalContext().cacheObjects().prepareForCache(val, cctx);
+
+ final boolean unswapped = ((flags & IS_UNSWAPPED_MASK) != 0);
+
boolean update;
- boolean walEnabled = !cctx.isNear() && cctx.group().persistenceEnabled() && cctx.group().walEnabled();
+ IgnitePredicate<CacheDataRow> p = new IgnitePredicate<CacheDataRow>() {
+ @Override public boolean apply(@Nullable CacheDataRow row) {
+ boolean update0;
+
+ GridCacheVersion currentVer = row != null ? row.version() : GridCacheMapEntry.this.ver;
- if (cctx.group().persistenceEnabled()) {
- unswap(false);
+ boolean isStartVer = currentVer.nodeOrder() == cctx.localNode().order()
+ && currentVer.order() == startVer;
- if (!isNew()) {
- if (cctx.atomic())
- update = ATOMIC_VER_COMPARATOR.compare(this.ver, ver) < 0;
+ if (cctx.group().persistenceEnabled()) {
+ if (!isStartVer) {
+ if (cctx.atomic())
+ update0 = ATOMIC_VER_COMPARATOR.compare(currentVer, ver) < 0;
+ else
+ update0 = currentVer.compareTo(ver) < 0;
+ }
+ else
+ update0 = true;
+ }
else
- update = this.ver.compareTo(ver) < 0;
+ update0 = isStartVer;
+
+ update0 |= (!preload && deletedUnlocked());
+
+ return update0;
}
- else
- update = true;
- }
- else
- update = isNew() && !cctx.offheap().containsKey(this);
+ };
- update |= !preload && deletedUnlocked();
+ if (unswapped) {
+ update = p.apply(null);
- if (update) {
- long expTime = expireTime < 0 ? CU.toExpireTime(ttl) : expireTime;
+ if (update) {
+ // If entry is already unswapped and we are modifying it, we must run deletion callbacks for old value.
+ long oldExpTime = expireTimeUnlocked();
+ long delta = (oldExpTime == 0 ? 0 : oldExpTime - U.currentTimeMillis());
- val = cctx.kernalContext().cacheObjects().prepareForCache(val, cctx);
+ if (delta < 0) {
+ if (onExpired(this.val, null)) {
+ if (cctx.deferredDelete()) {
+ deferred = true;
+ oldVer = this.ver;
+ }
+ else if (val == null)
+ obsolete = true;
+ }
+ }
- if (val != null)
storeValue(val, expTime, ver, null);
+ }
+ }
+ else // Optimization to access storage only once.
+ update = storeValue(val, expTime, ver, null, p);
+ if (update) {
update(val, expTime, ttl, ver, true);
boolean skipQryNtf = false;
@@ -2797,6 +2838,20 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
}
finally {
unlockEntry();
+
+ // It is necessary to execute these callbacks outside of lock to avoid deadlocks.
+
+ if (obsolete) {
+ onMarkedObsolete();
+
+ cctx.cache().removeEntry(this);
+ }
+
+ if (deferred) {
+ assert oldVer != null;
+
+ cctx.onDeferredDelete(this, oldVer);
+ }
}
}
@@ -3516,14 +3571,39 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
* @param oldRow Old row if available.
* @throws IgniteCheckedException If update failed.
*/
- protected void storeValue(CacheObject val,
+ protected boolean storeValue(CacheObject val,
long expireTime,
GridCacheVersion ver,
@Nullable CacheDataRow oldRow) throws IgniteCheckedException {
- assert lock.isHeldByCurrentThread();
assert val != null : "null values in update for key: " + key;
- cctx.offheap().invoke(cctx, key, localPartition(), new UpdateClosure(this, val, ver, expireTime));
+ return storeValue(val, expireTime, ver, oldRow, null);
+ }
+
+ /**
+ * Stores value in offheap.
+ *
+ * @param val Value.
+ * @param expireTime Expire time.
+ * @param ver New entry version.
+ * @param oldRow Old row if available.
+ * @param predicate Optional predicate.
+ * @throws IgniteCheckedException If update failed.
+ * @return {@code True} if storage was modified.
+ */
+ protected boolean storeValue(
+ @Nullable CacheObject val,
+ long expireTime,
+ GridCacheVersion ver,
+ @Nullable CacheDataRow oldRow,
+ @Nullable IgnitePredicate<CacheDataRow> predicate) throws IgniteCheckedException {
+ assert lock.isHeldByCurrentThread();
+
+ UpdateClosure closure = new UpdateClosure(this, val, ver, expireTime, predicate);
+
+ cctx.offheap().invoke(cctx, key, localPartition(), closure);
+
+ return closure.treeOp != IgniteTree.OperationType.NOOP;
}
/**
@@ -4295,7 +4375,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
private final GridCacheMapEntry entry;
/** */
- private final CacheObject val;
+ @Nullable private final CacheObject val;
/** */
private final GridCacheVersion ver;
@@ -4304,6 +4384,9 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
private final long expireTime;
/** */
+ @Nullable private final IgnitePredicate<CacheDataRow> predicate;
+
+ /** */
private CacheDataRow newRow;
/** */
@@ -4317,31 +4400,44 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
* @param val New value.
* @param ver New version.
* @param expireTime New expire time.
+ * @param predicate Optional predicate.
*/
- UpdateClosure(GridCacheMapEntry entry, CacheObject val, GridCacheVersion ver, long expireTime) {
+ UpdateClosure(GridCacheMapEntry entry, @Nullable CacheObject val, GridCacheVersion ver, long expireTime,
+ @Nullable IgnitePredicate<CacheDataRow> predicate) {
this.entry = entry;
this.val = val;
this.ver = ver;
this.expireTime = expireTime;
+ this.predicate = predicate;
}
/** {@inheritDoc} */
@Override public void call(@Nullable CacheDataRow oldRow) throws IgniteCheckedException {
this.oldRow = oldRow;
+ if (predicate != null && !predicate.apply(oldRow)) {
+ treeOp = IgniteTree.OperationType.NOOP;
+
+ return;
+ }
+
if (oldRow != null)
oldRow.key(entry.key);
- newRow = entry.cctx.offheap().dataStore(entry.localPartition()).createRow(
- entry.cctx,
- entry.key,
- val,
- ver,
- expireTime,
- oldRow);
+ if (val != null) {
+ newRow = entry.cctx.offheap().dataStore(entry.localPartition()).createRow(
+ entry.cctx,
+ entry.key,
+ val,
+ ver,
+ expireTime,
+ oldRow);
- treeOp = oldRow != null && oldRow.link() == newRow.link() ?
- IgniteTree.OperationType.NOOP : IgniteTree.OperationType.PUT;
+ treeOp = oldRow != null && oldRow.link() == newRow.link() ?
+ IgniteTree.OperationType.NOOP : IgniteTree.OperationType.PUT;
+ }
+ else
+ treeOp = oldRow != null ? IgniteTree.OperationType.REMOVE : IgniteTree.OperationType.NOOP;
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/ignite/blob/57aecd7a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtDetachedCacheEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtDetachedCacheEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtDetachedCacheEntry.java
index 3536908..d02015b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtDetachedCacheEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtDetachedCacheEntry.java
@@ -65,10 +65,11 @@ public class GridDhtDetachedCacheEntry extends GridDistributedCacheEntry {
}
/** {@inheritDoc} */
- @Override protected void storeValue(CacheObject val,
+ @Override protected boolean storeValue(CacheObject val,
long expireTime,
GridCacheVersion ver,
CacheDataRow oldRow) throws IgniteCheckedException {
+ return false;
// No-op for detached entries, index is updated on primary nodes.
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/57aecd7a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheEntry.java
index 322e63c..fb41f5c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheEntry.java
@@ -458,7 +458,8 @@ public class GridNearCacheEntry extends GridDistributedCacheEntry {
}
/** {@inheritDoc} */
- @Override protected void storeValue(CacheObject val, long expireTime, GridCacheVersion ver, CacheDataRow oldRow) {
+ @Override protected boolean storeValue(CacheObject val, long expireTime, GridCacheVersion ver, CacheDataRow oldRow) {
+ return false;
// No-op: queries are disabled for near cache.
}