You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2017/02/08 09:29:24 UTC

[6/7] ignite git commit: ignite-4652 Atomic update refactoring to use BPlusTree.invoke

ignite-4652 Atomic update refactoring to use BPlusTree.invoke


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/b8785c00
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/b8785c00
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/b8785c00

Branch: refs/heads/ignite-4652
Commit: b8785c00d67625903f54e349d9c2071ee5f6870f
Parents: 0cf2cf9
Author: sboikov <sb...@gridgain.com>
Authored: Wed Feb 8 11:32:32 2017 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Wed Feb 8 12:19:33 2017 +0300

----------------------------------------------------------------------
 .../processors/cache/GridCacheMapEntry.java     | 82 +++++++++++---------
 .../cache/IgniteCacheOffheapManagerImpl.java    | 13 +++-
 .../cache/database/CacheDataRowAdapter.java     | 13 ++++
 3 files changed, 69 insertions(+), 39 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/b8785c00/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
index 3406fb2..d1e07c6 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
@@ -42,6 +42,7 @@ import org.apache.ignite.internal.pagemem.wal.record.DataRecord;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.GridCacheUpdateAtomicResult.UpdateOutcome;
 import org.apache.ignite.internal.processors.cache.database.CacheDataRow;
+import org.apache.ignite.internal.processors.cache.database.CacheDataRowAdapter;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheEntry;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalPartition;
 import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicAbstractUpdateFuture;
@@ -1607,7 +1608,13 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
 
             key.valueBytes(cctx.cacheObjectContext());
 
-            cctx.offheap().invoke(key, localPartition(), c);
+            if (isNear()) {
+                CacheDataRow dataRow = val != null ? new CacheDataRowAdapter(key, val, ver, expireTimeExtras()) : null;
+
+                c.call(dataRow);
+            }
+            else
+                cctx.offheap().invoke(key, localPartition(), c);
 
             GridCacheUpdateAtomicResult updateRes = c.updateRes;
 
@@ -1686,34 +1693,32 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
 
             CacheObject evtOld = null;
 
-            if (evt) {
-                Object transformClo = op == TRANSFORM ? writeObj : null;
+            if (evt && op == TRANSFORM && cctx.events().isRecordable(EVT_CACHE_OBJECT_READ)) {
+                assert writeObj instanceof EntryProcessor : writeObj;
 
-                if (transformClo != null && cctx.events().isRecordable(EVT_CACHE_OBJECT_READ)) {
-                    evtOld = cctx.unwrapTemporary(oldVal);
+                evtOld = cctx.unwrapTemporary(oldVal);
 
-                    transformClo = EntryProcessorResourceInjectorProxy.unwrap(transformClo);
+                Object transformClo = EntryProcessorResourceInjectorProxy.unwrap(writeObj);
 
-                    cctx.events().addEvent(partition(),
-                        key,
-                        evtNodeId,
-                        null,
-                        newVer,
-                        EVT_CACHE_OBJECT_READ,
-                        evtOld, evtOld != null,
-                        evtOld, evtOld != null,
-                        subjId,
-                        transformClo.getClass().getName(),
-                        taskName,
-                        keepBinary);
-                }
+                cctx.events().addEvent(partition(),
+                    key,
+                    evtNodeId,
+                    null,
+                    newVer,
+                    EVT_CACHE_OBJECT_READ,
+                    evtOld, evtOld != null,
+                    evtOld, evtOld != null,
+                    subjId,
+                    transformClo.getClass().getName(),
+                    taskName,
+                    keepBinary);
             }
 
             if (updateRes.success()) {
                 if (c.op == GridCacheOperation.UPDATE) {
-                    assert c.newRow != null : c;
+                    assert (isNear() && val != null) || c.newRow != null : c;
 
-                    updateVal = c.newRow.value();
+                    updateVal = isNear() ? val : c.newRow.value();
 
                     assert updateVal != null : c;
 
@@ -3942,7 +3947,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
 
         /** {@inheritDoc} */
         @Override public void call(@Nullable CacheDataRow oldRow) throws IgniteCheckedException {
-            assert oldRow == null || oldRow.link() != 0 : oldRow;
+            assert entry.isNear() || oldRow == null || oldRow.link() != 0 : oldRow;
 
             this.oldRow = oldRow;
 
@@ -4068,8 +4073,11 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
                     conflictVer);
             }
 
-            if (op == UPDATE)
+            if (op == UPDATE) {
+                assert writeObj != null;
+
                 update(conflictCtx, invokeRes);
+            }
             else {
                 assert op == DELETE && writeObj == null : op;
 
@@ -4124,13 +4132,11 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
                     else if (newSysTtl == CU.TTL_ZERO) {
                         op = GridCacheOperation.DELETE;
 
-                        newSysTtl = CU.TTL_NOT_CHANGED;
-                        newSysExpireTime = CU.EXPIRE_TIME_CALCULATE;
+                        writeObj = null;
 
-                        newTtl = CU.TTL_ETERNAL;
-                        newExpireTime = CU.EXPIRE_TIME_ETERNAL;
+                        remove(conflictCtx, invokeRes);
 
-                        updated = null;
+                        return;
                     }
                     else {
                         newSysExpireTime = CU.EXPIRE_TIME_CALCULATE;
@@ -4203,16 +4209,20 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
 
             entry.logUpdate(op, updated, newVer, newExpireTime, updateCntr0);
 
-            newRow = entry.localPartition().dataStore().createRow(entry.key,
-                updated,
-                newVer,
-                newExpireTime,
-                oldRow);
+            if (!entry.isNear()) {
+                newRow = entry.localPartition().dataStore().createRow(entry.key,
+                    updated,
+                    newVer,
+                    newExpireTime,
+                    oldRow);
 
-            entry.update(updated, newExpireTime, newTtl, newVer, true);
+                treeOp = oldRow != null && oldRow.link() == newRow.link() ?
+                    IgniteTree.OperationType.NOOP : IgniteTree.OperationType.PUT;
+            }
+            else
+                treeOp = IgniteTree.OperationType.PUT;
 
-            treeOp = oldRow != null && oldRow.link() == newRow.link() ?
-                IgniteTree.OperationType.NOOP : IgniteTree.OperationType.PUT;
+            entry.update(updated, newExpireTime, newTtl, newVer, true);
 
             updateRes = new GridCacheUpdateAtomicResult(UpdateOutcome.SUCCESS,
                 oldVal,

http://git-wip-us.apache.org/repos/asf/ignite/blob/b8785c00/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
index 99bd134..62a5cc3 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
@@ -907,6 +907,9 @@ public class IgniteCacheOffheapManagerImpl extends GridCacheManagerAdapter imple
             if (oldRow == null || indexingEnabled)
                 return false;
 
+            if (oldRow.expireTime() != dataRow.expireTime())
+                return false;
+
             CacheObjectContext coCtx = cctx.cacheObjectContext();
 
             int oldLen = oldRow.key().valueBytesLength(coCtx) + oldRow.value().valueBytesLength(coCtx);
@@ -926,7 +929,7 @@ public class IgniteCacheOffheapManagerImpl extends GridCacheManagerAdapter imple
                 throw new NodeStoppingException("Operation has been cancelled (node is stopping).");
 
             try {
-                // FIXME IGNITE-4652..
+                // FIXME IGNITE-4652.
                 final boolean FAKE_INVOKE = true;
 
                 if (FAKE_INVOKE) {
@@ -975,8 +978,12 @@ public class IgniteCacheOffheapManagerImpl extends GridCacheManagerAdapter imple
         }
 
         /** {@inheritDoc} */
-        @Override public CacheDataRow createRow(KeyCacheObject key, CacheObject val, GridCacheVersion ver, long expireTime, @Nullable CacheDataRow oldRow)
-            throws IgniteCheckedException {
+        @Override public CacheDataRow createRow(KeyCacheObject key,
+            CacheObject val,
+            GridCacheVersion ver,
+            long expireTime,
+            @Nullable CacheDataRow oldRow) throws IgniteCheckedException
+        {
             DataRow dataRow = new DataRow(key, val, ver, partId, expireTime);
 
             if (canUpdateOldRow(oldRow, dataRow) && rowStore.updateRow(oldRow.link(), dataRow))

http://git-wip-us.apache.org/repos/asf/ignite/blob/b8785c00/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/CacheDataRowAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/CacheDataRowAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/CacheDataRowAdapter.java
index 4bfdd99..5a62e75 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/CacheDataRowAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/CacheDataRowAdapter.java
@@ -73,6 +73,19 @@ public class CacheDataRowAdapter implements CacheDataRow {
     }
 
     /**
+     * @param key Key.
+     * @param val Value.
+     * @param expireTime Expire time.
+     * @param ver Version.
+     */
+    public CacheDataRowAdapter(KeyCacheObject key, CacheObject val, GridCacheVersion ver, long expireTime) {
+        this.key = key;
+        this.val = val;
+        this.ver = ver;
+        this.expireTime = expireTime;
+    }
+
+    /**
      * Read row from data pages.
      *
      * @param cctx Cache context.