You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2017/02/08 09:29:24 UTC
[6/7] ignite git commit: ignite-4652 Atomic update refactoring to use
BPlusTree.invoke
ignite-4652 Atomic update refactoring to use BPlusTree.invoke
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/b8785c00
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/b8785c00
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/b8785c00
Branch: refs/heads/ignite-4652
Commit: b8785c00d67625903f54e349d9c2071ee5f6870f
Parents: 0cf2cf9
Author: sboikov <sb...@gridgain.com>
Authored: Wed Feb 8 11:32:32 2017 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Wed Feb 8 12:19:33 2017 +0300
----------------------------------------------------------------------
.../processors/cache/GridCacheMapEntry.java | 82 +++++++++++---------
.../cache/IgniteCacheOffheapManagerImpl.java | 13 +++-
.../cache/database/CacheDataRowAdapter.java | 13 ++++
3 files changed, 69 insertions(+), 39 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/b8785c00/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
index 3406fb2..d1e07c6 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
@@ -42,6 +42,7 @@ import org.apache.ignite.internal.pagemem.wal.record.DataRecord;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.GridCacheUpdateAtomicResult.UpdateOutcome;
import org.apache.ignite.internal.processors.cache.database.CacheDataRow;
+import org.apache.ignite.internal.processors.cache.database.CacheDataRowAdapter;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheEntry;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalPartition;
import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicAbstractUpdateFuture;
@@ -1607,7 +1608,13 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
key.valueBytes(cctx.cacheObjectContext());
- cctx.offheap().invoke(key, localPartition(), c);
+ if (isNear()) {
+ CacheDataRow dataRow = val != null ? new CacheDataRowAdapter(key, val, ver, expireTimeExtras()) : null;
+
+ c.call(dataRow);
+ }
+ else
+ cctx.offheap().invoke(key, localPartition(), c);
GridCacheUpdateAtomicResult updateRes = c.updateRes;
@@ -1686,34 +1693,32 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
CacheObject evtOld = null;
- if (evt) {
- Object transformClo = op == TRANSFORM ? writeObj : null;
+ if (evt && op == TRANSFORM && cctx.events().isRecordable(EVT_CACHE_OBJECT_READ)) {
+ assert writeObj instanceof EntryProcessor : writeObj;
- if (transformClo != null && cctx.events().isRecordable(EVT_CACHE_OBJECT_READ)) {
- evtOld = cctx.unwrapTemporary(oldVal);
+ evtOld = cctx.unwrapTemporary(oldVal);
- transformClo = EntryProcessorResourceInjectorProxy.unwrap(transformClo);
+ Object transformClo = EntryProcessorResourceInjectorProxy.unwrap(writeObj);
- cctx.events().addEvent(partition(),
- key,
- evtNodeId,
- null,
- newVer,
- EVT_CACHE_OBJECT_READ,
- evtOld, evtOld != null,
- evtOld, evtOld != null,
- subjId,
- transformClo.getClass().getName(),
- taskName,
- keepBinary);
- }
+ cctx.events().addEvent(partition(),
+ key,
+ evtNodeId,
+ null,
+ newVer,
+ EVT_CACHE_OBJECT_READ,
+ evtOld, evtOld != null,
+ evtOld, evtOld != null,
+ subjId,
+ transformClo.getClass().getName(),
+ taskName,
+ keepBinary);
}
if (updateRes.success()) {
if (c.op == GridCacheOperation.UPDATE) {
- assert c.newRow != null : c;
+ assert (isNear() && val != null) || c.newRow != null : c;
- updateVal = c.newRow.value();
+ updateVal = isNear() ? val : c.newRow.value();
assert updateVal != null : c;
@@ -3942,7 +3947,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
/** {@inheritDoc} */
@Override public void call(@Nullable CacheDataRow oldRow) throws IgniteCheckedException {
- assert oldRow == null || oldRow.link() != 0 : oldRow;
+ assert entry.isNear() || oldRow == null || oldRow.link() != 0 : oldRow;
this.oldRow = oldRow;
@@ -4068,8 +4073,11 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
conflictVer);
}
- if (op == UPDATE)
+ if (op == UPDATE) {
+ assert writeObj != null;
+
update(conflictCtx, invokeRes);
+ }
else {
assert op == DELETE && writeObj == null : op;
@@ -4124,13 +4132,11 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
else if (newSysTtl == CU.TTL_ZERO) {
op = GridCacheOperation.DELETE;
- newSysTtl = CU.TTL_NOT_CHANGED;
- newSysExpireTime = CU.EXPIRE_TIME_CALCULATE;
+ writeObj = null;
- newTtl = CU.TTL_ETERNAL;
- newExpireTime = CU.EXPIRE_TIME_ETERNAL;
+ remove(conflictCtx, invokeRes);
- updated = null;
+ return;
}
else {
newSysExpireTime = CU.EXPIRE_TIME_CALCULATE;
@@ -4203,16 +4209,20 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
entry.logUpdate(op, updated, newVer, newExpireTime, updateCntr0);
- newRow = entry.localPartition().dataStore().createRow(entry.key,
- updated,
- newVer,
- newExpireTime,
- oldRow);
+ if (!entry.isNear()) {
+ newRow = entry.localPartition().dataStore().createRow(entry.key,
+ updated,
+ newVer,
+ newExpireTime,
+ oldRow);
- entry.update(updated, newExpireTime, newTtl, newVer, true);
+ treeOp = oldRow != null && oldRow.link() == newRow.link() ?
+ IgniteTree.OperationType.NOOP : IgniteTree.OperationType.PUT;
+ }
+ else
+ treeOp = IgniteTree.OperationType.PUT;
- treeOp = oldRow != null && oldRow.link() == newRow.link() ?
- IgniteTree.OperationType.NOOP : IgniteTree.OperationType.PUT;
+ entry.update(updated, newExpireTime, newTtl, newVer, true);
updateRes = new GridCacheUpdateAtomicResult(UpdateOutcome.SUCCESS,
oldVal,
http://git-wip-us.apache.org/repos/asf/ignite/blob/b8785c00/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
index 99bd134..62a5cc3 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
@@ -907,6 +907,9 @@ public class IgniteCacheOffheapManagerImpl extends GridCacheManagerAdapter imple
if (oldRow == null || indexingEnabled)
return false;
+ if (oldRow.expireTime() != dataRow.expireTime())
+ return false;
+
CacheObjectContext coCtx = cctx.cacheObjectContext();
int oldLen = oldRow.key().valueBytesLength(coCtx) + oldRow.value().valueBytesLength(coCtx);
@@ -926,7 +929,7 @@ public class IgniteCacheOffheapManagerImpl extends GridCacheManagerAdapter imple
throw new NodeStoppingException("Operation has been cancelled (node is stopping).");
try {
- // FIXME IGNITE-4652..
+ // FIXME IGNITE-4652.
final boolean FAKE_INVOKE = true;
if (FAKE_INVOKE) {
@@ -975,8 +978,12 @@ public class IgniteCacheOffheapManagerImpl extends GridCacheManagerAdapter imple
}
/** {@inheritDoc} */
- @Override public CacheDataRow createRow(KeyCacheObject key, CacheObject val, GridCacheVersion ver, long expireTime, @Nullable CacheDataRow oldRow)
- throws IgniteCheckedException {
+ @Override public CacheDataRow createRow(KeyCacheObject key,
+ CacheObject val,
+ GridCacheVersion ver,
+ long expireTime,
+ @Nullable CacheDataRow oldRow) throws IgniteCheckedException
+ {
DataRow dataRow = new DataRow(key, val, ver, partId, expireTime);
if (canUpdateOldRow(oldRow, dataRow) && rowStore.updateRow(oldRow.link(), dataRow))
http://git-wip-us.apache.org/repos/asf/ignite/blob/b8785c00/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/CacheDataRowAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/CacheDataRowAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/CacheDataRowAdapter.java
index 4bfdd99..5a62e75 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/CacheDataRowAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/CacheDataRowAdapter.java
@@ -73,6 +73,19 @@ public class CacheDataRowAdapter implements CacheDataRow {
}
/**
+ * @param key Key.
+ * @param val Value.
+ * @param expireTime Expire time.
+ * @param ver Version.
+ */
+ public CacheDataRowAdapter(KeyCacheObject key, CacheObject val, GridCacheVersion ver, long expireTime) {
+ this.key = key;
+ this.val = val;
+ this.ver = ver;
+ this.expireTime = expireTime;
+ }
+
+ /**
* Read row from data pages.
*
* @param cctx Cache context.