You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2017/02/09 11:46:55 UTC
ignite git commit: ignite-4652 Atomic update refactoring to use
BPlusTree.invoke.
Repository: ignite
Updated Branches:
refs/heads/ignite-4652 f979979a1 -> c4976dd26
ignite-4652 Atomic update refactoring to use BPlusTree.invoke.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/c4976dd2
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/c4976dd2
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/c4976dd2
Branch: refs/heads/ignite-4652
Commit: c4976dd2603461f359485c2a8189ce839ab0b1c1
Parents: f979979
Author: sboikov <sb...@gridgain.com>
Authored: Thu Feb 9 13:29:40 2017 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Thu Feb 9 14:46:26 2017 +0300
----------------------------------------------------------------------
.../processors/cache/GridCacheMapEntry.java | 281 ++++++++++++-------
.../cache/IgniteCacheOffheapManagerImpl.java | 5 +-
2 files changed, 176 insertions(+), 110 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/c4976dd2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
index d1e07c6..5b7f4a5 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
@@ -1714,108 +1714,106 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
keepBinary);
}
- if (updateRes.success()) {
- if (c.op == GridCacheOperation.UPDATE) {
- assert (isNear() && val != null) || c.newRow != null : c;
-
- updateVal = isNear() ? val : c.newRow.value();
+ if (c.op == GridCacheOperation.UPDATE) {
+ updateVal = val;
- assert updateVal != null : c;
+ assert updateVal != null : c;
- drReplicate(drType, updateVal, updateVer, topVer);
+ drReplicate(drType, updateVal, updateVer, topVer);
- recordNodeId(affNodeId, topVer);
+ recordNodeId(affNodeId, topVer);
- if (evt && cctx.events().isRecordable(EVT_CACHE_OBJECT_PUT)) {
- if (evtOld == null)
- evtOld = cctx.unwrapTemporary(oldVal);
+ if (evt && cctx.events().isRecordable(EVT_CACHE_OBJECT_PUT)) {
+ if (evtOld == null)
+ evtOld = cctx.unwrapTemporary(oldVal);
- cctx.events().addEvent(partition(),
- key,
- evtNodeId,
- null,
- newVer,
- EVT_CACHE_OBJECT_PUT,
- updateVal,
- true,
- evtOld,
- evtOld != null,
- subjId,
- null,
- taskName,
- keepBinary);
- }
+ cctx.events().addEvent(partition(),
+ key,
+ evtNodeId,
+ null,
+ newVer,
+ EVT_CACHE_OBJECT_PUT,
+ updateVal,
+ true,
+ evtOld,
+ evtOld != null,
+ subjId,
+ null,
+ taskName,
+ keepBinary);
}
- else {
- assert c.op == GridCacheOperation.DELETE : c.op;
+ }
+ else {
+ assert c.op == GridCacheOperation.DELETE : c.op;
- clearReaders();
+ clearReaders();
- drReplicate(drType, null, newVer, topVer);
+ drReplicate(drType, null, newVer, topVer);
- recordNodeId(affNodeId, topVer);
+ recordNodeId(affNodeId, topVer);
- if (evt && cctx.events().isRecordable(EVT_CACHE_OBJECT_REMOVED)) {
- if (evtOld == null)
- evtOld = cctx.unwrapTemporary(oldVal);
+ if (evt && cctx.events().isRecordable(EVT_CACHE_OBJECT_REMOVED)) {
+ if (evtOld == null)
+ evtOld = cctx.unwrapTemporary(oldVal);
- cctx.events().addEvent(partition(),
- key,
- evtNodeId,
- null, newVer,
- EVT_CACHE_OBJECT_REMOVED,
- null, false,
- evtOld, evtOld != null,
- subjId,
- null,
- taskName,
- keepBinary);
- }
+ cctx.events().addEvent(partition(),
+ key,
+ evtNodeId,
+ null, newVer,
+ EVT_CACHE_OBJECT_REMOVED,
+ null, false,
+ evtOld, evtOld != null,
+ subjId,
+ null,
+ taskName,
+ keepBinary);
}
+ }
- updateMetrics(c.op, metrics);
+ updateMetrics(c.op, metrics);
- // Continuous query filter should be perform under lock.
- if (lsnrs != null) {
- CacheObject evtVal = cctx.unwrapTemporary(updateVal);
- CacheObject evtOldVal = cctx.unwrapTemporary(oldVal);
+ // Continuous query filter should be perform under lock.
+ if (lsnrs != null) {
+ CacheObject evtVal = cctx.unwrapTemporary(updateVal);
+ CacheObject evtOldVal = cctx.unwrapTemporary(oldVal);
- cctx.continuousQueries().onEntryUpdated(lsnrs,
+ cctx.continuousQueries().onEntryUpdated(lsnrs,
+ key,
+ evtVal,
+ evtOldVal,
+ internal,
+ partition(),
+ primary,
+ false,
+ c.updateRes.updateCounter(),
+ fut,
+ topVer);
+ }
+
+ cctx.dataStructures().onEntryUpdated(key, c.op == GridCacheOperation.DELETE, keepBinary);
+
+ if (intercept) {
+ if (c.op == GridCacheOperation.UPDATE) {
+ cctx.config().getInterceptor().onAfterPut(new CacheLazyEntry(
+ cctx,
key,
- evtVal,
- evtOldVal,
- internal,
- partition(),
- primary,
- false,
- c.updateRes.updateCounter(),
- fut,
- topVer);
+ null,
+ updateVal,
+ null,
+ keepBinary,
+ c.updateRes.updateCounter()));
}
+ else {
+ assert c.op == GridCacheOperation.DELETE : c.op;
- cctx.dataStructures().onEntryUpdated(key, c.op == GridCacheOperation.DELETE, keepBinary);
-
- if (intercept) {
- if (op == GridCacheOperation.UPDATE) {
- cctx.config().getInterceptor().onAfterPut(new CacheLazyEntry(
- cctx,
- key,
- null,
- updateVal,
- null,
- keepBinary,
- c.updateRes.updateCounter()));
- }
- else {
- cctx.config().getInterceptor().onAfterRemove(new CacheLazyEntry(
- cctx,
- key,
- null,
- oldVal,
- null,
- keepBinary,
- c.updateRes.updateCounter()));
- }
+ cctx.config().getInterceptor().onAfterRemove(new CacheLazyEntry(
+ cctx,
+ key,
+ null,
+ oldVal,
+ null,
+ keepBinary,
+ c.updateRes.updateCounter()));
}
}
}
@@ -1872,6 +1870,8 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
* @return Result.
*/
private GridTuple3<Long, Long, Boolean> ttlAndExpireTime(IgniteCacheExpiryPolicy expiry, long ttl, long expireTime) {
+ assert !obsolete();
+
boolean rmv = false;
// 1. If TTL is not changed, then calculate it based on expiry.
@@ -3980,7 +3980,9 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
CacheInvokeEntry<Object, Object> invokeEntry = null;
IgniteBiTuple<Object, Exception> invokeRes = null;
- if (op == TRANSFORM) {
+ boolean invoke = op == TRANSFORM;
+
+ if (invoke) {
invokeEntry = new CacheInvokeEntry<>(entry.key, oldVal, entry.ver, keepBinary, entry);
invokeRes = runEntryProcessor(invokeEntry);
@@ -3990,7 +3992,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
CacheObject newVal = (CacheObject)writeObj;
- GridCacheVersionConflictContext<?, ?> conflictCtx;
+ GridCacheVersionConflictContext<?, ?> conflictCtx = null;
if (conflictResolve) {
conflictCtx = resolveConflict(newVal, invokeRes);
@@ -4002,9 +4004,8 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
return;
}
}
- else {
- conflictCtx = null;
+ if (conflictCtx == null) {
// Perform version check only in case there was no explicit conflict resolution.
versionCheck(invokeRes);
@@ -4019,10 +4020,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
boolean pass = cctx.isAllLocked(entry, filter);
if (!pass) {
- // TODO
-// if (expiryPlc != null && !readFromStore && entry.val != null && !cctx.putIfAbsentFilter(filter))
-// updateTtl(expiryPlc);
- treeOp = IgniteTree.OperationType.NOOP;
+ initResultOnCancelUpdate(storeLoadedVal, !cctx.putIfAbsentFilter(filter));
updateRes = new GridCacheUpdateAtomicResult(UpdateOutcome.FILTER_FAILED,
oldVal,
@@ -4038,14 +4036,9 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
}
}
- if (op == TRANSFORM) {
- assert invokeEntry != null;
-
+ if (invoke) {
if (!invokeEntry.modified()) {
- // TODO
-// if (expiryPlc != null && !readFromStore && entry.val != null)
-// updateTtl(expiryPlc);
- treeOp = IgniteTree.OperationType.NOOP;
+ initResultOnCancelUpdate(storeLoadedVal, true);
updateRes = new GridCacheUpdateAtomicResult(UpdateOutcome.INVOKE_NO_OP,
oldVal,
@@ -4076,24 +4069,90 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
if (op == UPDATE) {
assert writeObj != null;
- update(conflictCtx, invokeRes);
+ update(conflictCtx, invokeRes, storeLoadedVal != null);
}
else {
assert op == DELETE && writeObj == null : op;
- remove(conflictCtx, invokeRes);
+ remove(conflictCtx, invokeRes, storeLoadedVal != null);
}
assert updateRes != null && treeOp != null;
}
/**
+ * @param storeLoadedVal Value loaded from store.
+ * @param updateExpireTime {@code True} if need update expire time.
+ * @throws IgniteCheckedException If failed.
+ */
+ private void initResultOnCancelUpdate(@Nullable CacheObject storeLoadedVal, boolean updateExpireTime)
+ throws IgniteCheckedException {
+ boolean needUpdate = false;
+
+ if (storeLoadedVal != null) {
+ long initTtl;
+ long initExpireTime;
+
+ if (expiryPlc != null) {
+ IgniteBiTuple<Long, Long> initTtlAndExpireTime = initialTtlAndExpireTime(expiryPlc);
+
+ initTtl = initTtlAndExpireTime.get1();
+ initExpireTime = initTtlAndExpireTime.get2();
+ }
+ else {
+ initTtl = CU.TTL_ETERNAL;
+ initExpireTime = CU.EXPIRE_TIME_ETERNAL;
+ }
+
+ entry.update(storeLoadedVal, initExpireTime, initTtl, entry.ver, true);
+
+ needUpdate = true;
+ }
+ else if (updateExpireTime && expiryPlc != null && entry.val != null){
+ long ttl = expiryPlc.forAccess();
+
+ if (ttl != CU.TTL_NOT_CHANGED) {
+ long expireTime;
+
+ if (ttl == CU.TTL_ZERO) {
+ ttl = CU.TTL_MINIMUM;
+ expireTime = CU.expireTimeInPast();
+ }
+ else
+ expireTime = CU.toExpireTime(ttl);
+
+ if (entry.expireTimeExtras() != expireTime) {
+ entry.update(entry.val, expireTime, ttl, entry.ver, true);
+
+ expiryPlc.ttlUpdated(entry.key, entry.ver, null);
+
+ needUpdate = true;
+ }
+ }
+ }
+
+ if (needUpdate) {
+ newRow = entry.localPartition().dataStore().createRow(entry.key,
+ storeLoadedVal,
+ newVer,
+ entry.expireTimeExtras(),
+ oldRow);
+
+ treeOp = IgniteTree.OperationType.PUT;
+ }
+ else
+ treeOp = IgniteTree.OperationType.NOOP;
+ }
+
+ /**
* @param conflictCtx Conflict context.
* @param invokeRes Entry processor result (for invoke operation).
+ * @param readFromStore {@code True} if initial entry value was {@code null} and it was read from store.
* @throws IgniteCheckedException If failed.
*/
private void update(@Nullable GridCacheVersionConflictContext<?, ?> conflictCtx,
- @Nullable IgniteBiTuple<Object, Exception> invokeRes)
+ @Nullable IgniteBiTuple<Object, Exception> invokeRes,
+ boolean readFromStore)
throws IgniteCheckedException
{
GridCacheContext cctx = entry.context();
@@ -4134,7 +4193,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
writeObj = null;
- remove(conflictCtx, invokeRes);
+ remove(conflictCtx, invokeRes, readFromStore);
return;
}
@@ -4151,12 +4210,12 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
}
if (intercept) {
- Object updated0 = entry.value(null, updated, keepBinary, false);
+ Object updated0 = cctx.unwrapBinaryIfNeeded(updated, keepBinary, false);
CacheLazyEntry<Object, Object> interceptEntry = new CacheLazyEntry<>(cctx,
entry.key,
null,
- entry.val,
+ oldVal,
null,
keepBinary);
@@ -4174,6 +4233,8 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
null,
null,
0);
+
+ return;
}
else if (interceptorVal != updated0) {
updated0 = cctx.unwrapTemporary(interceptorVal);
@@ -4238,11 +4299,13 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
/**
* @param conflictCtx Conflict context.
* @param invokeRes Entry processor result (for invoke operation).
+ * @param readFromStore {@code True} if initial entry value was {@code null} and it was read from store.
* @throws IgniteCheckedException If failed.
*/
@SuppressWarnings("unchecked")
private void remove(@Nullable GridCacheVersionConflictContext<?, ?> conflictCtx,
- @Nullable IgniteBiTuple<Object, Exception> invokeRes)
+ @Nullable IgniteBiTuple<Object, Exception> invokeRes,
+ boolean readFromStore)
throws IgniteCheckedException
{
GridCacheContext cctx = entry.context();
@@ -4255,7 +4318,8 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
CacheLazyEntry<Object, Object> intercepEntry = new CacheLazyEntry<>(cctx,
entry.key,
null,
- oldVal, null,
+ oldVal,
+ null,
keepBinary);
interceptRes = cctx.config().getInterceptor().onBeforeRemove(intercepEntry);
@@ -4308,7 +4372,8 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
entry.update(null, CU.TTL_ETERNAL, CU.EXPIRE_TIME_ETERNAL, newVer, true);
- treeOp = oldVal == null ? IgniteTree.OperationType.NOOP : IgniteTree.OperationType.REMOVE;
+ treeOp = (oldVal == null || readFromStore) ? IgniteTree.OperationType.NOOP :
+ IgniteTree.OperationType.REMOVE;
UpdateOutcome outcome = oldVal != null ? UpdateOutcome.SUCCESS : UpdateOutcome.REMOVE_NO_VAL;
http://git-wip-us.apache.org/repos/asf/ignite/blob/c4976dd2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
index 62a5cc3..eed9f09 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
@@ -1057,6 +1057,9 @@ public class IgniteCacheOffheapManagerImpl extends GridCacheManagerAdapter imple
* @throws IgniteCheckedException If failed.
*/
private void finishUpdate(CacheDataRow newRow, @Nullable CacheDataRow oldRow) throws IgniteCheckedException {
+ if (oldRow == null)
+ storageSize.incrementAndGet();
+
KeyCacheObject key = newRow.key();
long expireTime = newRow.expireTime();
@@ -1093,8 +1096,6 @@ public class IgniteCacheOffheapManagerImpl extends GridCacheManagerAdapter imple
if (newRow.link() != oldRow.link())
rowStore.removeRow(oldRow.link());
}
- else
- storageSize.incrementAndGet();
if (pendingEntries != null && expireTime != 0)
pendingEntries.putx(new PendingRow(expireTime, newRow.link()));