You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2017/02/09 11:46:55 UTC

ignite git commit: ignite-4652 Atomic update refactoring to use BPlusTree.invoke.

Repository: ignite
Updated Branches:
  refs/heads/ignite-4652 f979979a1 -> c4976dd26


ignite-4652 Atomic update refactoring to use BPlusTree.invoke.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/c4976dd2
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/c4976dd2
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/c4976dd2

Branch: refs/heads/ignite-4652
Commit: c4976dd2603461f359485c2a8189ce839ab0b1c1
Parents: f979979
Author: sboikov <sb...@gridgain.com>
Authored: Thu Feb 9 13:29:40 2017 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Thu Feb 9 14:46:26 2017 +0300

----------------------------------------------------------------------
 .../processors/cache/GridCacheMapEntry.java     | 281 ++++++++++++-------
 .../cache/IgniteCacheOffheapManagerImpl.java    |   5 +-
 2 files changed, 176 insertions(+), 110 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/c4976dd2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
index d1e07c6..5b7f4a5 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
@@ -1714,108 +1714,106 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
                     keepBinary);
             }
 
-            if (updateRes.success()) {
-                if (c.op == GridCacheOperation.UPDATE) {
-                    assert (isNear() && val != null) || c.newRow != null : c;
-
-                    updateVal = isNear() ? val : c.newRow.value();
+            if (c.op == GridCacheOperation.UPDATE) {
+                updateVal = val;
 
-                    assert updateVal != null : c;
+                assert updateVal != null : c;
 
-                    drReplicate(drType, updateVal, updateVer, topVer);
+                drReplicate(drType, updateVal, updateVer, topVer);
 
-                    recordNodeId(affNodeId, topVer);
+                recordNodeId(affNodeId, topVer);
 
-                    if (evt && cctx.events().isRecordable(EVT_CACHE_OBJECT_PUT)) {
-                        if (evtOld == null)
-                            evtOld = cctx.unwrapTemporary(oldVal);
+                if (evt && cctx.events().isRecordable(EVT_CACHE_OBJECT_PUT)) {
+                    if (evtOld == null)
+                        evtOld = cctx.unwrapTemporary(oldVal);
 
-                        cctx.events().addEvent(partition(),
-                            key,
-                            evtNodeId,
-                            null,
-                            newVer,
-                            EVT_CACHE_OBJECT_PUT,
-                            updateVal,
-                            true,
-                            evtOld,
-                            evtOld != null,
-                            subjId,
-                            null,
-                            taskName,
-                            keepBinary);
-                    }
+                    cctx.events().addEvent(partition(),
+                        key,
+                        evtNodeId,
+                        null,
+                        newVer,
+                        EVT_CACHE_OBJECT_PUT,
+                        updateVal,
+                        true,
+                        evtOld,
+                        evtOld != null,
+                        subjId,
+                        null,
+                        taskName,
+                        keepBinary);
                 }
-                else {
-                    assert c.op == GridCacheOperation.DELETE : c.op;
+            }
+            else {
+                assert c.op == GridCacheOperation.DELETE : c.op;
 
-                    clearReaders();
+                clearReaders();
 
-                    drReplicate(drType, null, newVer, topVer);
+                drReplicate(drType, null, newVer, topVer);
 
-                    recordNodeId(affNodeId, topVer);
+                recordNodeId(affNodeId, topVer);
 
-                    if (evt && cctx.events().isRecordable(EVT_CACHE_OBJECT_REMOVED)) {
-                        if (evtOld == null)
-                            evtOld = cctx.unwrapTemporary(oldVal);
+                if (evt && cctx.events().isRecordable(EVT_CACHE_OBJECT_REMOVED)) {
+                    if (evtOld == null)
+                        evtOld = cctx.unwrapTemporary(oldVal);
 
-                        cctx.events().addEvent(partition(),
-                            key,
-                            evtNodeId,
-                            null, newVer,
-                            EVT_CACHE_OBJECT_REMOVED,
-                            null, false,
-                            evtOld, evtOld != null,
-                            subjId,
-                            null,
-                            taskName,
-                            keepBinary);
-                    }
+                    cctx.events().addEvent(partition(),
+                        key,
+                        evtNodeId,
+                        null, newVer,
+                        EVT_CACHE_OBJECT_REMOVED,
+                        null, false,
+                        evtOld, evtOld != null,
+                        subjId,
+                        null,
+                        taskName,
+                        keepBinary);
                 }
+            }
 
-                updateMetrics(c.op, metrics);
+            updateMetrics(c.op, metrics);
 
-                // Continuous query filter should be perform under lock.
-                if (lsnrs != null) {
-                    CacheObject evtVal = cctx.unwrapTemporary(updateVal);
-                    CacheObject evtOldVal = cctx.unwrapTemporary(oldVal);
+            // Continuous query filter should be perform under lock.
+            if (lsnrs != null) {
+                CacheObject evtVal = cctx.unwrapTemporary(updateVal);
+                CacheObject evtOldVal = cctx.unwrapTemporary(oldVal);
 
-                    cctx.continuousQueries().onEntryUpdated(lsnrs,
+                cctx.continuousQueries().onEntryUpdated(lsnrs,
+                    key,
+                    evtVal,
+                    evtOldVal,
+                    internal,
+                    partition(),
+                    primary,
+                    false,
+                    c.updateRes.updateCounter(),
+                    fut,
+                    topVer);
+            }
+
+            cctx.dataStructures().onEntryUpdated(key, c.op == GridCacheOperation.DELETE, keepBinary);
+
+            if (intercept) {
+                if (c.op == GridCacheOperation.UPDATE) {
+                    cctx.config().getInterceptor().onAfterPut(new CacheLazyEntry(
+                        cctx,
                         key,
-                        evtVal,
-                        evtOldVal,
-                        internal,
-                        partition(),
-                        primary,
-                        false,
-                        c.updateRes.updateCounter(),
-                        fut,
-                        topVer);
+                        null,
+                        updateVal,
+                        null,
+                        keepBinary,
+                        c.updateRes.updateCounter()));
                 }
+                else {
+                    assert c.op == GridCacheOperation.DELETE : c.op;
 
-                cctx.dataStructures().onEntryUpdated(key, c.op == GridCacheOperation.DELETE, keepBinary);
-
-                if (intercept) {
-                    if (op == GridCacheOperation.UPDATE) {
-                        cctx.config().getInterceptor().onAfterPut(new CacheLazyEntry(
-                            cctx,
-                            key,
-                            null,
-                            updateVal,
-                            null,
-                            keepBinary,
-                            c.updateRes.updateCounter()));
-                    }
-                    else {
-                        cctx.config().getInterceptor().onAfterRemove(new CacheLazyEntry(
-                            cctx,
-                            key,
-                            null,
-                            oldVal,
-                            null,
-                            keepBinary,
-                            c.updateRes.updateCounter()));
-                    }
+                    cctx.config().getInterceptor().onAfterRemove(new CacheLazyEntry(
+                        cctx,
+                        key,
+                        null,
+                        oldVal,
+                        null,
+                        keepBinary,
+                        c.updateRes.updateCounter()));
                 }
             }
         }
@@ -1872,6 +1870,8 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
      * @return Result.
      */
     private GridTuple3<Long, Long, Boolean> ttlAndExpireTime(IgniteCacheExpiryPolicy expiry, long ttl, long expireTime) {
+        assert !obsolete();
+
         boolean rmv = false;
 
         // 1. If TTL is not changed, then calculate it based on expiry.
@@ -3980,7 +3980,9 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
             CacheInvokeEntry<Object, Object> invokeEntry = null;
             IgniteBiTuple<Object, Exception> invokeRes = null;
 
-            if (op == TRANSFORM) {
+            boolean invoke = op == TRANSFORM;
+
+            if (invoke) {
                 invokeEntry = new CacheInvokeEntry<>(entry.key, oldVal, entry.ver, keepBinary, entry);
 
                 invokeRes = runEntryProcessor(invokeEntry);
@@ -3990,7 +3992,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
 
             CacheObject newVal = (CacheObject)writeObj;
 
-            GridCacheVersionConflictContext<?, ?> conflictCtx;
+            GridCacheVersionConflictContext<?, ?> conflictCtx = null;
 
             if (conflictResolve) {
                 conflictCtx = resolveConflict(newVal, invokeRes);
@@ -4002,9 +4004,8 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
                     return;
                 }
             }
-            else {
-                conflictCtx = null;
 
+            if (conflictCtx == null) {
                 // Perform version check only in case there was no explicit conflict resolution.
                 versionCheck(invokeRes);
 
@@ -4019,10 +4020,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
                 boolean pass = cctx.isAllLocked(entry, filter);
 
                 if (!pass) {
-                    // TODO
-//                        if (expiryPlc != null && !readFromStore && entry.val != null && !cctx.putIfAbsentFilter(filter))
-//                            updateTtl(expiryPlc);
-                    treeOp = IgniteTree.OperationType.NOOP;
+                    initResultOnCancelUpdate(storeLoadedVal, !cctx.putIfAbsentFilter(filter));
 
                     updateRes = new GridCacheUpdateAtomicResult(UpdateOutcome.FILTER_FAILED,
                         oldVal,
@@ -4038,14 +4036,9 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
                 }
             }
 
-            if (op == TRANSFORM) {
-                assert invokeEntry != null;
-
+            if (invoke) {
                 if (!invokeEntry.modified()) {
-                    // TODO
-//                        if (expiryPlc != null && !readFromStore && entry.val != null)
-//                            updateTtl(expiryPlc);
-                    treeOp = IgniteTree.OperationType.NOOP;
+                    initResultOnCancelUpdate(storeLoadedVal, true);
 
                     updateRes = new GridCacheUpdateAtomicResult(UpdateOutcome.INVOKE_NO_OP,
                         oldVal,
@@ -4076,24 +4069,90 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
             if (op == UPDATE) {
                 assert writeObj != null;
 
-                update(conflictCtx, invokeRes);
+                update(conflictCtx, invokeRes, storeLoadedVal != null);
             }
             else {
                 assert op == DELETE && writeObj == null : op;
 
-                remove(conflictCtx, invokeRes);
+                remove(conflictCtx, invokeRes, storeLoadedVal != null);
             }
 
             assert updateRes != null && treeOp != null;
         }
 
         /**
+         * @param storeLoadedVal Value loaded from store.
+         * @param updateExpireTime {@code True} if need update expire time.
+         * @throws IgniteCheckedException If failed.
+         */
+        private void initResultOnCancelUpdate(@Nullable CacheObject storeLoadedVal, boolean updateExpireTime)
+            throws IgniteCheckedException {
+            boolean needUpdate = false;
+
+            if (storeLoadedVal != null) {
+                long initTtl;
+                long initExpireTime;
+
+                if (expiryPlc != null) {
+                    IgniteBiTuple<Long, Long> initTtlAndExpireTime = initialTtlAndExpireTime(expiryPlc);
+
+                    initTtl = initTtlAndExpireTime.get1();
+                    initExpireTime = initTtlAndExpireTime.get2();
+                }
+                else {
+                    initTtl = CU.TTL_ETERNAL;
+                    initExpireTime = CU.EXPIRE_TIME_ETERNAL;
+                }
+
+                entry.update(storeLoadedVal, initExpireTime, initTtl, entry.ver, true);
+
+                needUpdate = true;
+            }
+            else if (updateExpireTime && expiryPlc != null && entry.val != null){
+                long ttl = expiryPlc.forAccess();
+
+                if (ttl != CU.TTL_NOT_CHANGED) {
+                    long expireTime;
+
+                    if (ttl == CU.TTL_ZERO) {
+                        ttl = CU.TTL_MINIMUM;
+                        expireTime = CU.expireTimeInPast();
+                    }
+                    else
+                        expireTime = CU.toExpireTime(ttl);
+
+                    if (entry.expireTimeExtras() != expireTime) {
+                        entry.update(entry.val, expireTime, ttl, entry.ver, true);
+
+                        expiryPlc.ttlUpdated(entry.key, entry.ver, null);
+
+                        needUpdate = true;
+                    }
+                }
+            }
+
+            if (needUpdate) {
+                newRow = entry.localPartition().dataStore().createRow(entry.key,
+                    storeLoadedVal,
+                    newVer,
+                    entry.expireTimeExtras(),
+                    oldRow);
+
+                treeOp = IgniteTree.OperationType.PUT;
+            }
+            else
+                treeOp = IgniteTree.OperationType.NOOP;
+        }
+
+        /**
          * @param conflictCtx Conflict context.
          * @param invokeRes Entry processor result (for invoke operation).
+         * @param readFromStore {@code True} if initial entry value was {@code null} and it was read from store.
          * @throws IgniteCheckedException If failed.
          */
         private void update(@Nullable GridCacheVersionConflictContext<?, ?> conflictCtx,
-            @Nullable IgniteBiTuple<Object, Exception> invokeRes)
+            @Nullable IgniteBiTuple<Object, Exception> invokeRes,
+            boolean readFromStore)
             throws IgniteCheckedException
         {
             GridCacheContext cctx = entry.context();
@@ -4134,7 +4193,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
 
                         writeObj = null;
 
-                        remove(conflictCtx, invokeRes);
+                        remove(conflictCtx, invokeRes, readFromStore);
 
                         return;
                     }
@@ -4151,12 +4210,12 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
             }
 
             if (intercept) {
-                Object updated0 = entry.value(null, updated, keepBinary, false);
+                Object updated0 = cctx.unwrapBinaryIfNeeded(updated, keepBinary, false);
 
                 CacheLazyEntry<Object, Object> interceptEntry = new CacheLazyEntry<>(cctx,
                     entry.key,
                     null,
-                    entry.val,
+                    oldVal,
                     null,
                     keepBinary);
 
@@ -4174,6 +4233,8 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
                         null,
                         null,
                         0);
+
+                    return;
                 }
                 else if (interceptorVal != updated0) {
                     updated0 = cctx.unwrapTemporary(interceptorVal);
@@ -4238,11 +4299,13 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
         /**
          * @param conflictCtx Conflict context.
          * @param invokeRes Entry processor result (for invoke operation).
+         * @param readFromStore {@code True} if initial entry value was {@code null} and it was read from store.
          * @throws IgniteCheckedException If failed.
          */
         @SuppressWarnings("unchecked")
         private void remove(@Nullable GridCacheVersionConflictContext<?, ?> conflictCtx,
-            @Nullable IgniteBiTuple<Object, Exception> invokeRes)
+            @Nullable IgniteBiTuple<Object, Exception> invokeRes,
+            boolean readFromStore)
             throws IgniteCheckedException
         {
             GridCacheContext cctx = entry.context();
@@ -4255,7 +4318,8 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
                 CacheLazyEntry<Object, Object> intercepEntry = new CacheLazyEntry<>(cctx,
                     entry.key,
                     null,
-                    oldVal, null,
+                    oldVal,
+                    null,
                     keepBinary);
 
                 interceptRes = cctx.config().getInterceptor().onBeforeRemove(intercepEntry);
@@ -4308,7 +4372,8 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
 
             entry.update(null, CU.TTL_ETERNAL, CU.EXPIRE_TIME_ETERNAL, newVer, true);
 
-            treeOp = oldVal == null ? IgniteTree.OperationType.NOOP : IgniteTree.OperationType.REMOVE;
+            treeOp = (oldVal == null || readFromStore) ? IgniteTree.OperationType.NOOP :
+                IgniteTree.OperationType.REMOVE;
 
             UpdateOutcome outcome = oldVal != null ? UpdateOutcome.SUCCESS : UpdateOutcome.REMOVE_NO_VAL;
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/c4976dd2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
index 62a5cc3..eed9f09 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
@@ -1057,6 +1057,9 @@ public class IgniteCacheOffheapManagerImpl extends GridCacheManagerAdapter imple
          * @throws IgniteCheckedException If failed.
          */
         private void finishUpdate(CacheDataRow newRow, @Nullable CacheDataRow oldRow) throws IgniteCheckedException {
+            if (oldRow == null)
+                storageSize.incrementAndGet();
+
             KeyCacheObject key = newRow.key();
 
             long expireTime = newRow.expireTime();
@@ -1093,8 +1096,6 @@ public class IgniteCacheOffheapManagerImpl extends GridCacheManagerAdapter imple
                 if (newRow.link() != oldRow.link())
                     rowStore.removeRow(oldRow.link());
             }
-            else
-                storageSize.incrementAndGet();
 
             if (pendingEntries != null && expireTime != 0)
                 pendingEntries.putx(new PendingRow(expireTime, newRow.link()));