You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2017/02/07 15:14:05 UTC
[2/2] ignite git commit: ignite-4652 Atomic update refactoring to use
BPlusTree.invoke
ignite-4652 Atomic update refactoring to use BPlusTree.invoke
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/0cf2cf93
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/0cf2cf93
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/0cf2cf93
Branch: refs/heads/ignite-4652
Commit: 0cf2cf93228f146818874f7a287cabf58ce57f64
Parents: 42b7f47
Author: sboikov <sb...@gridgain.com>
Authored: Tue Feb 7 16:37:43 2017 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Tue Feb 7 18:13:34 2017 +0300
----------------------------------------------------------------------
.../internal/pagemem/wal/record/DataRecord.java | 10 +-
.../processors/cache/GridCacheMapEntry.java | 1699 ++++++++++--------
.../cache/GridCacheUpdateAtomicResult.java | 96 +-
.../cache/IgniteCacheOffheapManager.java | 42 +
.../cache/IgniteCacheOffheapManagerImpl.java | 214 ++-
.../distributed/dht/GridDhtCacheEntry.java | 5 +
6 files changed, 1289 insertions(+), 777 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/0cf2cf93/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/DataRecord.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/DataRecord.java b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/DataRecord.java
index 6592852..d2747f1 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/DataRecord.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/DataRecord.java
@@ -17,15 +17,10 @@
package org.apache.ignite.internal.pagemem.wal.record;
-import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
-import org.apache.ignite.internal.processors.cache.transactions.IgniteTxEntry;
-import org.apache.ignite.internal.util.tostring.GridToStringInclude;
-import org.apache.ignite.internal.util.typedef.internal.S;
-
-import java.util.ArrayList;
-import java.util.Collection;
import java.util.Collections;
import java.util.List;
+import org.apache.ignite.internal.util.tostring.GridToStringInclude;
+import org.apache.ignite.internal.util.typedef.internal.S;
/**
*
@@ -68,6 +63,7 @@ public class DataRecord extends WALRecord {
return writeEntries == null ? Collections.<DataEntry>emptyList() : writeEntries;
}
+ /** {@inheritDoc} */
@Override public String toString() {
return S.toString(DataRecord.class, this, super.toString());
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/0cf2cf93/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
index 6dc1d04..3406fb2 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
@@ -40,6 +40,7 @@ import org.apache.ignite.internal.pagemem.wal.StorageException;
import org.apache.ignite.internal.pagemem.wal.record.DataEntry;
import org.apache.ignite.internal.pagemem.wal.record.DataRecord;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.cache.GridCacheUpdateAtomicResult.UpdateOutcome;
import org.apache.ignite.internal.processors.cache.database.CacheDataRow;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheEntry;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalPartition;
@@ -60,6 +61,7 @@ import org.apache.ignite.internal.processors.cache.version.GridCacheVersionConfl
import org.apache.ignite.internal.processors.cache.version.GridCacheVersionEx;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersionedEntryEx;
import org.apache.ignite.internal.processors.dr.GridDrType;
+import org.apache.ignite.internal.util.IgniteTree;
import org.apache.ignite.internal.util.lang.GridClosureException;
import org.apache.ignite.internal.util.lang.GridMetadataAwareAdapter;
import org.apache.ignite.internal.util.lang.GridTuple;
@@ -83,6 +85,8 @@ import static org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_READ;
import static org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_REMOVED;
import static org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_UNLOCKED;
import static org.apache.ignite.internal.processors.cache.GridCacheOperation.DELETE;
+import static org.apache.ignite.internal.processors.cache.GridCacheOperation.TRANSFORM;
+import static org.apache.ignite.internal.processors.cache.GridCacheOperation.UPDATE;
import static org.apache.ignite.internal.processors.dr.GridDrType.DR_NONE;
/**
@@ -1535,11 +1539,11 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
/** {@inheritDoc} */
@SuppressWarnings("unchecked")
@Override public GridCacheUpdateAtomicResult innerUpdate(
- GridCacheVersion newVer,
+ final GridCacheVersion newVer,
final UUID evtNodeId,
final UUID affNodeId,
- GridCacheOperation op,
- @Nullable Object writeObj,
+ final GridCacheOperation op,
+ @Nullable final Object writeObj,
@Nullable final Object[] invokeArgs,
final boolean writeThrough,
final boolean readThrough,
@@ -1555,42 +1559,22 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
final GridDrType drType,
final long explicitTtl,
final long explicitExpireTime,
- @Nullable GridCacheVersion conflictVer,
+ @Nullable final GridCacheVersion conflictVer,
final boolean conflictResolve,
final boolean intercept,
@Nullable final UUID subjId,
final String taskName,
@Nullable final CacheObject prevVal,
@Nullable final Long updateCntr,
- @Nullable GridDhtAtomicAbstractUpdateFuture fut
+ @Nullable final GridDhtAtomicAbstractUpdateFuture fut
) throws IgniteCheckedException, GridCacheEntryRemovedException, GridClosureException {
- assert cctx.atomic();
-
- boolean res = true;
-
- CacheObject oldVal;
- CacheObject updated;
-
- GridCacheVersion enqueueVer = null;
+ assert cctx.atomic() && !detached();
- GridCacheVersionConflictContext<?, ?> conflictCtx = null;
-
- IgniteBiTuple<Object, Exception> invokeRes = null;
-
- // System TTL/ET which may have special values.
- long newSysTtl;
- long newSysExpireTime;
-
- // TTL/ET which will be passed to entry on update.
- long newTtl;
- long newExpireTime;
-
- Object key0 = null;
- Object updated0 = null;
-
- Long updateCntr0 = null;
+ AtomicCacheUpdateClosure c;
synchronized (this) {
+ checkObsolete();
+
boolean internal = isInternal() || !context().userCache();
Map<UUID, CacheContinuousQueryListener> lsnrs = cctx.continuousQueries().updateListeners(internal, false);
@@ -1598,750 +1582,337 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
boolean needVal = lsnrs != null || intercept || retval || op == GridCacheOperation.TRANSFORM
|| !F.isEmptyOrNulls(filter);
- checkObsolete();
-
- CacheDataRow oldRow = null;
+ // Possibly read value from store.
+ boolean readFromStore = readThrough && needVal && (cctx.readThrough() &&
+ (op == GridCacheOperation.TRANSFORM || cctx.loadPreviousValue()));
+
+ c = new AtomicCacheUpdateClosure(this,
+ newVer,
+ op,
+ writeObj,
+ invokeArgs,
+ readFromStore,
+ writeThrough,
+ keepBinary,
+ expiryPlc,
+ primary,
+ verCheck,
+ filter,
+ explicitTtl,
+ explicitExpireTime,
+ conflictVer,
+ conflictResolve,
+ intercept,
+ updateCntr);
+
+ key.valueBytes(cctx.cacheObjectContext());
+
+ cctx.offheap().invoke(key, localPartition(), c);
+
+ GridCacheUpdateAtomicResult updateRes = c.updateRes;
+
+ assert updateRes != null : c;
+
+ CacheObject oldVal = c.oldRow != null ? c.oldRow.value() : null;
+ CacheObject updateVal = null;
+ GridCacheVersion updateVer = c.newVer;
- // Load and remove from swap if it is new.
- if (isStartVersion())
- oldRow = unswap(retval, false);
+ // Apply metrics.
+ if (metrics &&
+ updateRes.outcome().updateReadMetrics() &&
+ cctx.cache().configuration().isStatisticsEnabled() &&
+ needVal) {
+ // PutIfAbsent methods must not update hit/miss statistics.
+ if (op != GridCacheOperation.UPDATE || F.isEmpty(filter) || !cctx.putIfAbsentFilter(filter))
+ cctx.cache().metrics0().onRead(oldVal != null);
+ }
- // Prepare old value.
- oldVal = val;
+ switch (updateRes.outcome()) {
+ case VERSION_CHECK_FAILED: {
+ if (!cctx.isNear()) {
+ CacheObject evtVal;
- // Possibly read value from store.
- boolean readFromStore = false;
+ if (op == GridCacheOperation.TRANSFORM) {
+ EntryProcessor<Object, Object, ?> entryProcessor =
+ (EntryProcessor<Object, Object, ?>)writeObj;
- Object old0 = null;
+ CacheInvokeEntry<Object, Object> entry =
+ new CacheInvokeEntry<>(key, prevVal, version(), keepBinary, this);
- if (readThrough && needVal && oldVal == null && (cctx.readThrough() &&
- (op == GridCacheOperation.TRANSFORM || cctx.loadPreviousValue()))) {
- old0 = readThrough(null, key, false, subjId, taskName);
+ try {
+ entryProcessor.process(entry, invokeArgs);
- oldVal = cctx.toCacheObject(old0);
+ evtVal = entry.modified() ?
+ cctx.toCacheObject(cctx.unwrapTemporary(entry.getValue())) : prevVal;
+ }
+ catch (Exception ignore) {
+ evtVal = prevVal;
+ }
+ }
+ else
+ evtVal = (CacheObject)writeObj;
- readFromStore = true;
+ long updateCntr0 = nextPartCounter();
- // Detach value before index update.
- oldVal = cctx.kernalContext().cacheObjects().prepareForCache(oldVal, cctx);
+ if (updateCntr != null)
+ updateCntr0 = updateCntr;
- // Calculate initial TTL and expire time.
- long initTtl;
- long initExpireTime;
+ onUpdateFinished(updateCntr0);
- if (expiryPlc != null && oldVal != null) {
- IgniteBiTuple<Long, Long> initTtlAndExpireTime = initialTtlAndExpireTime(expiryPlc);
+ cctx.continuousQueries().onEntryUpdated(
+ key,
+ evtVal,
+ prevVal,
+ isInternal() || !context().userCache(),
+ partition(),
+ primary,
+ false,
+ updateCntr0,
+ null,
+ topVer);
+ }
- initTtl = initTtlAndExpireTime.get1();
- initExpireTime = initTtlAndExpireTime.get2();
- }
- else {
- initTtl = CU.TTL_ETERNAL;
- initExpireTime = CU.EXPIRE_TIME_ETERNAL;
+ return updateRes;
}
- if (oldVal != null)
- storeValue(oldVal, initExpireTime, ver, oldRow);
- // else nothing to do, real old value was null.
-
- update(oldVal, initExpireTime, initTtl, ver, true);
-
- if (deletedUnlocked() && oldVal != null && !isInternal())
- deletedUnlocked(false);
+ case CONFLICT_USE_OLD:
+ case FILTER_FAILED:
+ case INVOKE_NO_OP:
+ case INTERCEPTOR_CANCEL:
+ return updateRes;
}
- Object transformClo = null;
-
- // Request-level conflict resolution is needed, i.e. we do not know who will win in advance.
- if (conflictResolve) {
- GridCacheVersion oldConflictVer = version().conflictVersion();
-
- // Cache is conflict-enabled.
- if (cctx.conflictNeedResolve()) {
- GridCacheVersionedEntryEx newEntry;
-
- GridTuple3<Long, Long, Boolean> expiration = ttlAndExpireTime(expiryPlc,
- explicitTtl,
- explicitExpireTime);
+ assert updateRes.outcome() == UpdateOutcome.SUCCESS || updateRes.outcome() == UpdateOutcome.REMOVE_NO_VAL;
- // Prepare old and new entries for conflict resolution.
- GridCacheVersionedEntryEx oldEntry = versionedEntry(keepBinary);
+ CacheObject evtOld = null;
- if (op == GridCacheOperation.TRANSFORM) {
- transformClo = writeObj;
+ if (evt) {
+ Object transformClo = op == TRANSFORM ? writeObj : null;
- EntryProcessor<Object, Object, ?> entryProcessor = (EntryProcessor<Object, Object, ?>)writeObj;
+ if (transformClo != null && cctx.events().isRecordable(EVT_CACHE_OBJECT_READ)) {
+ evtOld = cctx.unwrapTemporary(oldVal);
- oldVal = this.val;
-
- CacheInvokeEntry<Object, Object> entry = new CacheInvokeEntry(key, oldVal, version(),
- keepBinary, this);
-
- try {
- Object computed = entryProcessor.process(entry, invokeArgs);
-
- if (entry.modified())
- writeObj = cctx.toCacheObject(cctx.unwrapTemporary(entry.getValue()));
- else
- writeObj = oldVal;
-
- key0 = entry.key();
-
- if (computed != null)
- invokeRes = new IgniteBiTuple(cctx.unwrapTemporary(computed), null);
- }
- catch (Exception e) {
- invokeRes = new IgniteBiTuple(null, e);
-
- writeObj = oldVal;
- }
- }
+ transformClo = EntryProcessorResourceInjectorProxy.unwrap(transformClo);
- newEntry = new GridCacheLazyPlainVersionedEntry<>(
- cctx,
+ cctx.events().addEvent(partition(),
key,
- (CacheObject)writeObj,
- expiration.get1(),
- expiration.get2(),
- conflictVer != null ? conflictVer : newVer,
+ evtNodeId,
+ null,
+ newVer,
+ EVT_CACHE_OBJECT_READ,
+ evtOld, evtOld != null,
+ evtOld, evtOld != null,
+ subjId,
+ transformClo.getClass().getName(),
+ taskName,
keepBinary);
+ }
+ }
- // Resolve conflict.
- conflictCtx = cctx.conflictResolve(oldEntry, newEntry, verCheck);
-
- assert conflictCtx != null;
+ if (updateRes.success()) {
+ if (c.op == GridCacheOperation.UPDATE) {
+ assert c.newRow != null : c;
- boolean ignoreTime = cctx.config().getAtomicWriteOrderMode() == CacheAtomicWriteOrderMode.PRIMARY;
+ updateVal = c.newRow.value();
- // Use old value?
- if (conflictCtx.isUseOld()) {
- GridCacheVersion newConflictVer = conflictVer != null ? conflictVer : newVer;
+ assert updateVal != null : c;
- // Handle special case with atomic comparator.
- if (!isNew() && // Not initial value,
- verCheck && // and atomic version check,
- oldConflictVer.dataCenterId() == newConflictVer.dataCenterId() && // and data centers are equal,
- ATOMIC_VER_COMPARATOR.compare(oldConflictVer, newConflictVer, ignoreTime) == 0 && // and both versions are equal,
- cctx.writeThrough() && // and store is enabled,
- primary) // and we are primary.
- {
- CacheObject val = this.val;
+ drReplicate(drType, updateVal, updateVer, topVer);
- if (val == null) {
- assert deletedUnlocked();
+ recordNodeId(affNodeId, topVer);
- cctx.store().remove(null, key);
- }
- else
- cctx.store().put(null, key, val, ver);
- }
+ if (evt && cctx.events().isRecordable(EVT_CACHE_OBJECT_PUT)) {
+ if (evtOld == null)
+ evtOld = cctx.unwrapTemporary(oldVal);
- return new GridCacheUpdateAtomicResult(false,
- retval ? this.val : null,
- null,
- invokeRes,
- CU.TTL_ETERNAL,
- CU.EXPIRE_TIME_ETERNAL,
+ cctx.events().addEvent(partition(),
+ key,
+ evtNodeId,
null,
+ newVer,
+ EVT_CACHE_OBJECT_PUT,
+ updateVal,
+ true,
+ evtOld,
+ evtOld != null,
+ subjId,
null,
- false,
- updateCntr0 == null ? 0 : updateCntr0);
- }
- // Will update something.
- else {
- // Merge is a local update which override passed value bytes.
- if (conflictCtx.isMerge()) {
- writeObj = cctx.toCacheObject(conflictCtx.mergeValue());
-
- conflictVer = null;
- }
- else
- assert conflictCtx.isUseNew();
-
- // Update value is known at this point, so update operation type.
- op = writeObj != null ? GridCacheOperation.UPDATE : GridCacheOperation.DELETE;
+ taskName,
+ keepBinary);
}
}
- else
- // Nullify conflict version on this update, so that we will use regular version during next updates.
- conflictVer = null;
- }
-
- boolean ignoreTime = cctx.config().getAtomicWriteOrderMode() == CacheAtomicWriteOrderMode.PRIMARY;
+ else {
+ assert c.op == GridCacheOperation.DELETE : c.op;
- // Perform version check only in case there was no explicit conflict resolution.
- if (conflictCtx == null) {
- if (verCheck) {
- if (!isNew() && ATOMIC_VER_COMPARATOR.compare(ver, newVer, ignoreTime) >= 0) {
- if (ATOMIC_VER_COMPARATOR.compare(ver, newVer, ignoreTime) == 0 && cctx.writeThrough() && primary) {
- if (log.isDebugEnabled())
- log.debug("Received entry update with same version as current (will update store) " +
- "[entry=" + this + ", newVer=" + newVer + ']');
+ clearReaders();
- CacheObject val = this.val;
+ drReplicate(drType, null, newVer, topVer);
- if (val == null) {
- assert deletedUnlocked();
+ recordNodeId(affNodeId, topVer);
- cctx.store().remove(null, key);
- }
- else
- cctx.store().put(null, key, val, ver);
- }
- else {
- if (log.isDebugEnabled())
- log.debug("Received entry update with smaller version than current (will ignore) " +
- "[entry=" + this + ", newVer=" + newVer + ']');
- }
+ if (evt && cctx.events().isRecordable(EVT_CACHE_OBJECT_REMOVED)) {
+ if (evtOld == null)
+ evtOld = cctx.unwrapTemporary(oldVal);
- if (!cctx.isNear()) {
- CacheObject evtVal;
+ cctx.events().addEvent(partition(),
+ key,
+ evtNodeId,
+ null, newVer,
+ EVT_CACHE_OBJECT_REMOVED,
+ null, false,
+ evtOld, evtOld != null,
+ subjId,
+ null,
+ taskName,
+ keepBinary);
+ }
+ }
- if (op == GridCacheOperation.TRANSFORM) {
- EntryProcessor<Object, Object, ?> entryProcessor =
- (EntryProcessor<Object, Object, ?>)writeObj;
+ updateMetrics(c.op, metrics);
- CacheInvokeEntry<Object, Object> entry =
- new CacheInvokeEntry<>(key, prevVal, version(), keepBinary, this);
+ // Continuous query filter should be perform under lock.
+ if (lsnrs != null) {
+ CacheObject evtVal = cctx.unwrapTemporary(updateVal);
+ CacheObject evtOldVal = cctx.unwrapTemporary(oldVal);
- try {
- entryProcessor.process(entry, invokeArgs);
+ cctx.continuousQueries().onEntryUpdated(lsnrs,
+ key,
+ evtVal,
+ evtOldVal,
+ internal,
+ partition(),
+ primary,
+ false,
+ c.updateRes.updateCounter(),
+ fut,
+ topVer);
+ }
- evtVal = entry.modified() ?
- cctx.toCacheObject(cctx.unwrapTemporary(entry.getValue())) : prevVal;
- }
- catch (Exception ignore) {
- evtVal = prevVal;
- }
- }
- else
- evtVal = (CacheObject)writeObj;
-
- updateCntr0 = nextPartCounter(topVer);
-
- if (updateCntr != null)
- updateCntr0 = updateCntr;
-
- onUpdateFinished(updateCntr0);
-
- cctx.continuousQueries().onEntryUpdated(
- key,
- evtVal,
- prevVal,
- isInternal() || !context().userCache(),
- partition(),
- primary,
- false,
- updateCntr0,
- null,
- topVer);
- }
+ cctx.dataStructures().onEntryUpdated(key, c.op == GridCacheOperation.DELETE, keepBinary);
- return new GridCacheUpdateAtomicResult(false,
- retval ? this.val : null,
+ if (intercept) {
+ if (op == GridCacheOperation.UPDATE) {
+ cctx.config().getInterceptor().onAfterPut(new CacheLazyEntry(
+ cctx,
+ key,
+ null,
+ updateVal,
null,
- invokeRes,
- CU.TTL_ETERNAL,
- CU.EXPIRE_TIME_ETERNAL,
+ keepBinary,
+ c.updateRes.updateCounter()));
+ }
+ else {
+ cctx.config().getInterceptor().onAfterRemove(new CacheLazyEntry(
+ cctx,
+ key,
null,
+ oldVal,
null,
- false,
- 0);
+ keepBinary,
+ c.updateRes.updateCounter()));
}
}
- else
- assert isNew() || ATOMIC_VER_COMPARATOR.compare(ver, newVer, ignoreTime) <= 0 :
- "Invalid version for inner update [isNew=" + isNew() + ", entry=" + this + ", newVer=" + newVer + ']';
- }
-
- // Apply metrics.
- if (metrics && cctx.cache().configuration().isStatisticsEnabled() && needVal) {
- // PutIfAbsent methods mustn't update hit/miss statistics
- if (op != GridCacheOperation.UPDATE || F.isEmpty(filter) || !cctx.putIfAbsentFilter(filter))
- cctx.cache().metrics0().onRead(oldVal != null);
}
+ }
- // Check filter inside of synchronization.
- if (!F.isEmptyOrNulls(filter)) {
- boolean pass = cctx.isAllLocked(this, filter);
-
- if (!pass) {
- if (expiryPlc != null && !readFromStore && hasValueUnlocked() && !cctx.putIfAbsentFilter(filter))
- updateTtl(expiryPlc);
+ onUpdateFinished(c.updateRes.updateCounter());
- return new GridCacheUpdateAtomicResult(false,
- retval ? oldVal : null,
- null,
- invokeRes,
- CU.TTL_ETERNAL,
- CU.EXPIRE_TIME_ETERNAL,
- null,
- null,
- false,
- updateCntr0 == null ? 0 : updateCntr0);
- }
- }
+ return c.updateRes;
+ }
- // Calculate new value in case we met transform.
- if (op == GridCacheOperation.TRANSFORM) {
- assert conflictCtx == null : "Cannot be TRANSFORM here if conflict resolution was performed earlier.";
+ /**
+ * @param val Value.
+ * @param cacheObj Cache object.
+ * @param keepBinary Keep binary flag.
+ * @param cpy Copy flag.
+ * @return Cache object value.
+ */
+ @Nullable private Object value(@Nullable Object val, @Nullable CacheObject cacheObj, boolean keepBinary, boolean cpy) {
+ if (val != null)
+ return val;
- transformClo = writeObj;
+ return cctx.unwrapBinaryIfNeeded(cacheObj, keepBinary, cpy);
+ }
- EntryProcessor<Object, Object, ?> entryProcessor = (EntryProcessor<Object, Object, ?>)writeObj;
+ /**
+ * @param expiry Expiration policy.
+ * @return Tuple holding initial TTL and expire time with the given expiry.
+ */
+ private static IgniteBiTuple<Long, Long> initialTtlAndExpireTime(IgniteCacheExpiryPolicy expiry) {
+ assert expiry != null;
- CacheInvokeEntry<Object, Object> entry = new CacheInvokeEntry(key, oldVal, version(), keepBinary, this);
+ long initTtl = expiry.forCreate();
+ long initExpireTime;
- try {
- Object computed = entryProcessor.process(entry, invokeArgs);
+ if (initTtl == CU.TTL_ZERO) {
+ initTtl = CU.TTL_MINIMUM;
+ initExpireTime = CU.expireTimeInPast();
+ }
+ else if (initTtl == CU.TTL_NOT_CHANGED) {
+ initTtl = CU.TTL_ETERNAL;
+ initExpireTime = CU.EXPIRE_TIME_ETERNAL;
+ }
+ else
+ initExpireTime = CU.toExpireTime(initTtl);
- if (entry.modified()) {
- updated0 = cctx.unwrapTemporary(entry.getValue());
- updated = cctx.toCacheObject(updated0);
- }
- else
- updated = oldVal;
+ return F.t(initTtl, initExpireTime);
+ }
- key0 = entry.key();
+ /**
+ * Get TTL, expire time and remove flag for the given entry, expiration policy and explicit TTL and expire time.
+ *
+ * @param expiry Expiration policy.
+ * @param ttl Explicit TTL.
+ * @param expireTime Explicit expire time.
+ * @return Result.
+ */
+ private GridTuple3<Long, Long, Boolean> ttlAndExpireTime(IgniteCacheExpiryPolicy expiry, long ttl, long expireTime) {
+ boolean rmv = false;
- if (computed != null)
- invokeRes = new IgniteBiTuple(cctx.unwrapTemporary(computed), null);
- }
- catch (Exception e) {
- invokeRes = new IgniteBiTuple(null, e);
+ // 1. If TTL is not changed, then calculate it based on expiry.
+ if (ttl == CU.TTL_NOT_CHANGED) {
+ if (expiry != null)
+ ttl = hasValueUnlocked() ? expiry.forUpdate() : expiry.forCreate();
+ }
- updated = oldVal;
- }
+ // 2. If TTL is zero, then set delete marker.
+ if (ttl == CU.TTL_ZERO) {
+ rmv = true;
- if (!entry.modified()) {
- if (expiryPlc != null && !readFromStore && hasValueUnlocked())
- updateTtl(expiryPlc);
+ ttl = CU.TTL_ETERNAL;
+ }
- return new GridCacheUpdateAtomicResult(false,
- retval ? oldVal : null,
- null,
- invokeRes,
- CU.TTL_ETERNAL,
- CU.EXPIRE_TIME_ETERNAL,
- null,
- null,
- false,
- updateCntr0 == null ? 0 : updateCntr0);
- }
+ // 3. If TTL is still not changed, then either use old entry TTL or set it to "ETERNAL".
+ if (ttl == CU.TTL_NOT_CHANGED) {
+ if (isStartVersion())
+ ttl = CU.TTL_ETERNAL;
+ else {
+ ttl = ttlExtras();
+ expireTime = expireTimeExtras();
}
- else
- updated = (CacheObject)writeObj;
+ }
- op = updated == null ? GridCacheOperation.DELETE : GridCacheOperation.UPDATE;
+ // 4 If expire time was not set explicitly, then calculate it.
+ if (expireTime == CU.EXPIRE_TIME_CALCULATE)
+ expireTime = CU.toExpireTime(ttl);
- assert op == GridCacheOperation.UPDATE || (op == GridCacheOperation.DELETE && updated == null);
+ return F.t(ttl, expireTime, rmv);
+ }
- boolean hadVal = hasValueUnlocked();
-
- // Incorporate conflict version into new version if needed.
- if (conflictVer != null && conflictVer != newVer)
- newVer = new GridCacheVersionEx(newVer.topologyVersion(),
- newVer.globalTime(),
- newVer.order(),
- newVer.nodeOrder(),
- newVer.dataCenterId(),
- conflictVer);
-
- if (op == GridCacheOperation.UPDATE) {
- // Conflict context is null if there were no explicit conflict resolution.
- if (conflictCtx == null) {
- // Calculate TTL and expire time for local update.
- if (explicitTtl != CU.TTL_NOT_CHANGED) {
- // If conflict existed, expire time must be explicit.
- assert conflictVer == null || explicitExpireTime != CU.EXPIRE_TIME_CALCULATE;
-
- newSysTtl = newTtl = explicitTtl;
- newSysExpireTime = explicitExpireTime;
-
- newExpireTime = explicitExpireTime != CU.EXPIRE_TIME_CALCULATE ?
- explicitExpireTime : CU.toExpireTime(explicitTtl);
- }
- else {
- newSysTtl = expiryPlc == null ? CU.TTL_NOT_CHANGED :
- hadVal ? expiryPlc.forUpdate() : expiryPlc.forCreate();
-
- if (newSysTtl == CU.TTL_NOT_CHANGED) {
- newSysExpireTime = CU.EXPIRE_TIME_CALCULATE;
- newTtl = ttlExtras();
- newExpireTime = expireTimeExtras();
- }
- else if (newSysTtl == CU.TTL_ZERO) {
- op = GridCacheOperation.DELETE;
-
- newSysTtl = CU.TTL_NOT_CHANGED;
- newSysExpireTime = CU.EXPIRE_TIME_CALCULATE;
-
- newTtl = CU.TTL_ETERNAL;
- newExpireTime = CU.EXPIRE_TIME_ETERNAL;
-
- updated = null;
- }
- else {
- newSysExpireTime = CU.EXPIRE_TIME_CALCULATE;
- newTtl = newSysTtl;
- newExpireTime = CU.toExpireTime(newTtl);
- }
- }
- }
- else {
- newSysTtl = newTtl = conflictCtx.ttl();
- newSysExpireTime = newExpireTime = conflictCtx.expireTime();
- }
- }
- else {
- assert op == GridCacheOperation.DELETE;
-
- newSysTtl = CU.TTL_NOT_CHANGED;
- newSysExpireTime = CU.EXPIRE_TIME_CALCULATE;
-
- newTtl = CU.TTL_ETERNAL;
- newExpireTime = CU.EXPIRE_TIME_ETERNAL;
- }
-
- // TTL and expire time must be resolved at this point.
- assert newTtl != CU.TTL_NOT_CHANGED && newTtl != CU.TTL_ZERO && newTtl >= 0;
- assert newExpireTime != CU.EXPIRE_TIME_CALCULATE && newExpireTime >= 0;
-
- IgniteBiTuple<Boolean, Object> interceptRes = null;
-
- // Actual update.
- if (op == GridCacheOperation.UPDATE) {
- if (log.isTraceEnabled()) {
- log.trace("innerUpdate [key=" + key +
- ", entry=" + System.identityHashCode(this) + ']');
- }
-
- if (intercept) {
- updated0 = value(updated0, updated, keepBinary, false);
-
- Object interceptorVal = cctx.config().getInterceptor()
- .onBeforePut(new CacheLazyEntry(cctx, key, key0, oldVal, old0, keepBinary), updated0);
-
- if (interceptorVal == null)
- return new GridCacheUpdateAtomicResult(false,
- retval ? oldVal : null,
- null,
- invokeRes,
- CU.TTL_ETERNAL,
- CU.EXPIRE_TIME_ETERNAL,
- null,
- null,
- false,
- updateCntr0 == null ? 0 : updateCntr0);
- else if (interceptorVal != updated0) {
- updated0 = cctx.unwrapTemporary(interceptorVal);
-
- updated = cctx.toCacheObject(updated0);
- }
- }
-
- // Try write-through.
- if (writeThrough)
- // Must persist inside synchronization in non-tx mode.
- cctx.store().put(null, key, updated, newVer);
-
- if (!hadVal) {
- boolean new0 = isNew();
-
- assert deletedUnlocked() || new0 || isInternal(): "Invalid entry [entry=" + this + ", locNodeId=" +
- cctx.localNodeId() + ']';
-
- if (!new0 && !isInternal())
- deletedUnlocked(false);
- }
- else {
- assert !deletedUnlocked() : "Invalid entry [entry=" + this +
- ", locNodeId=" + cctx.localNodeId() + ']';
-
- // Do not change size.
- }
-
- updated = cctx.kernalContext().cacheObjects().prepareForCache(updated, cctx);
-
- updateCntr0 = nextPartCounter(topVer);
-
- if (updateCntr != null)
- updateCntr0 = updateCntr;
-
- logUpdate(op, updated, newVer, newExpireTime, updateCntr0);
-
- storeValue(updated, newExpireTime, newVer, oldRow);
-
- update(updated, newExpireTime, newTtl, newVer, true);
-
- drReplicate(drType, updated, newVer, topVer);
-
- recordNodeId(affNodeId, topVer);
-
- if (evt) {
- CacheObject evtOld = null;
-
- if (transformClo != null && cctx.events().isRecordable(EVT_CACHE_OBJECT_READ)) {
- evtOld = cctx.unwrapTemporary(oldVal);
-
- transformClo = EntryProcessorResourceInjectorProxy.unwrap(transformClo);
-
- cctx.events().addEvent(partition(), key, evtNodeId, null,
- newVer, EVT_CACHE_OBJECT_READ, evtOld, evtOld != null || hadVal, evtOld,
- evtOld != null || hadVal, subjId, transformClo.getClass().getName(), taskName,
- keepBinary);
- }
-
- if (newVer != null && cctx.events().isRecordable(EVT_CACHE_OBJECT_PUT)) {
- if (evtOld == null)
- evtOld = cctx.unwrapTemporary(oldVal);
-
- cctx.events().addEvent(partition(), key, evtNodeId, null,
- newVer, EVT_CACHE_OBJECT_PUT, updated, updated != null, evtOld,
- evtOld != null || hadVal, subjId, null, taskName, keepBinary);
- }
- }
- }
- else {
- if (intercept) {
- interceptRes = cctx.config().getInterceptor().onBeforeRemove(new CacheLazyEntry(cctx, key, key0,
- oldVal, old0, keepBinary, updateCntr0));
-
- if (cctx.cancelRemove(interceptRes))
- return new GridCacheUpdateAtomicResult(false,
- cctx.toCacheObject(cctx.unwrapTemporary(interceptRes.get2())),
- null,
- invokeRes,
- CU.TTL_ETERNAL,
- CU.EXPIRE_TIME_ETERNAL,
- null,
- null,
- false,
- updateCntr0 == null ? 0 : updateCntr0);
- }
-
- if (writeThrough)
- // Must persist inside synchronization in non-tx mode.
- cctx.store().remove(null, key);
-
- updateCntr0 = nextPartCounter(topVer);
-
- if (updateCntr != null)
- updateCntr0 = updateCntr;
-
- logUpdate(op, null, newVer, 0, updateCntr0);
-
- removeValue();
-
- if (hadVal) {
- assert !deletedUnlocked();
-
- if (!isInternal())
- deletedUnlocked(true);
- }
- else {
- boolean new0 = isNew();
-
- assert deletedUnlocked() || new0 || isInternal() : "Invalid entry [entry=" + this + ", locNodeId=" +
- cctx.localNodeId() + ']';
-
- if (new0) {
- if (!isInternal())
- deletedUnlocked(true);
- }
- }
-
- enqueueVer = newVer;
-
- // Clear value on backup. Entry will be removed from cache when it got evicted from queue.
- update(null, CU.TTL_ETERNAL, CU.EXPIRE_TIME_ETERNAL, newVer, true);
-
- assert newSysTtl == CU.TTL_NOT_CHANGED;
- assert newSysExpireTime == CU.EXPIRE_TIME_CALCULATE;
-
- clearReaders();
-
- recordNodeId(affNodeId, topVer);
-
- drReplicate(drType, null, newVer, topVer);
-
- if (evt) {
- CacheObject evtOld = null;
-
- if (transformClo != null && cctx.events().isRecordable(EVT_CACHE_OBJECT_READ)) {
- evtOld = cctx.unwrapTemporary(oldVal);
-
- transformClo = EntryProcessorResourceInjectorProxy.unwrap(transformClo);
-
- cctx.events().addEvent(partition(), key, evtNodeId, null,
- newVer, EVT_CACHE_OBJECT_READ, evtOld, evtOld != null || hadVal, evtOld,
- evtOld != null || hadVal, subjId, transformClo.getClass().getName(), taskName,
- keepBinary);
- }
-
- if (newVer != null && cctx.events().isRecordable(EVT_CACHE_OBJECT_REMOVED)) {
- if (evtOld == null)
- evtOld = cctx.unwrapTemporary(oldVal);
-
- cctx.events().addEvent(partition(), key, evtNodeId, null, newVer,
- EVT_CACHE_OBJECT_REMOVED, null, false, evtOld, evtOld != null || hadVal,
- subjId, null, taskName, keepBinary);
- }
- }
-
- res = hadVal;
- }
-
- if (res)
- updateMetrics(op, metrics);
-
- // Continuous query filter should be perform under lock.
- if (lsnrs != null) {
- CacheObject evtVal = cctx.unwrapTemporary(updated);
- CacheObject evtOldVal = cctx.unwrapTemporary(oldVal);
-
- cctx.continuousQueries().onEntryUpdated(lsnrs, key, evtVal, evtOldVal, internal,
- partition(), primary, false, updateCntr0, fut, topVer);
- }
-
- cctx.dataStructures().onEntryUpdated(key, op == GridCacheOperation.DELETE, keepBinary);
-
- if (intercept) {
- if (op == GridCacheOperation.UPDATE)
- cctx.config().getInterceptor().onAfterPut(new CacheLazyEntry(
- cctx,
- key,
- key0,
- updated,
- updated0,
- keepBinary,
- updateCntr0));
- else
- cctx.config().getInterceptor().onAfterRemove(new CacheLazyEntry(
- cctx,
- key,
- key0,
- oldVal,
- old0,
- keepBinary,
- updateCntr0));
-
- if (interceptRes != null)
- oldVal = cctx.toCacheObject(cctx.unwrapTemporary(interceptRes.get2()));
- }
- }
-
- onUpdateFinished(updateCntr0);
-
- if (log.isDebugEnabled())
- log.debug("Updated cache entry [val=" + val + ", old=" + oldVal + ", entry=" + this + ']');
-
- return new GridCacheUpdateAtomicResult(res,
- oldVal,
- updated,
- invokeRes,
- newSysTtl,
- newSysExpireTime,
- enqueueVer,
- conflictCtx,
- true,
- updateCntr0 == null ? 0 : updateCntr0);
- }
-
- /**
- * @param val Value.
- * @param cacheObj Cache object.
- * @param keepBinary Keep binary flag.
- * @param cpy Copy flag.
- * @return Cache object value.
- */
- @Nullable private Object value(@Nullable Object val, @Nullable CacheObject cacheObj, boolean keepBinary, boolean cpy) {
- if (val != null)
- return val;
-
- return cctx.unwrapBinaryIfNeeded(cacheObj, keepBinary, cpy);
- }
-
- /**
- * @param expiry Expiration policy.
- * @return Tuple holding initial TTL and expire time with the given expiry.
- */
- private static IgniteBiTuple<Long, Long> initialTtlAndExpireTime(IgniteCacheExpiryPolicy expiry) {
- assert expiry != null;
-
- long initTtl = expiry.forCreate();
- long initExpireTime;
-
- if (initTtl == CU.TTL_ZERO) {
- initTtl = CU.TTL_MINIMUM;
- initExpireTime = CU.expireTimeInPast();
- }
- else if (initTtl == CU.TTL_NOT_CHANGED) {
- initTtl = CU.TTL_ETERNAL;
- initExpireTime = CU.EXPIRE_TIME_ETERNAL;
- }
- else
- initExpireTime = CU.toExpireTime(initTtl);
-
- return F.t(initTtl, initExpireTime);
- }
-
- /**
- * Get TTL, expire time and remove flag for the given entry, expiration policy and explicit TTL and expire time.
- *
- * @param expiry Expiration policy.
- * @param ttl Explicit TTL.
- * @param expireTime Explicit expire time.
- * @return Result.
- */
- private GridTuple3<Long, Long, Boolean> ttlAndExpireTime(IgniteCacheExpiryPolicy expiry, long ttl, long expireTime)
- throws GridCacheEntryRemovedException {
- boolean rmv = false;
-
- // 1. If TTL is not changed, then calculate it based on expiry.
- if (ttl == CU.TTL_NOT_CHANGED) {
- if (expiry != null)
- ttl = hasValueUnlocked() ? expiry.forUpdate() : expiry.forCreate();
- }
-
- // 2. If TTL is zero, then set delete marker.
- if (ttl == CU.TTL_ZERO) {
- rmv = true;
-
- ttl = CU.TTL_ETERNAL;
- }
-
- // 3. If TTL is still not changed, then either use old entry TTL or set it to "ETERNAL".
- if (ttl == CU.TTL_NOT_CHANGED) {
- if (isNew())
- ttl = CU.TTL_ETERNAL;
- else {
- ttl = ttlExtras();
- expireTime = expireTimeExtras();
- }
- }
-
- // 4 If expire time was not set explicitly, then calculate it.
- if (expireTime == CU.EXPIRE_TIME_CALCULATE)
- expireTime = CU.toExpireTime(ttl);
-
- return F.t(ttl, expireTime, rmv);
- }
-
- /**
- * Perform DR if needed.
- *
- * @param drType DR type.
- * @param val Value.
- * @param ver Version.
- * @param topVer Topology version.
- * @throws IgniteCheckedException In case of exception.
- */
- private void drReplicate(GridDrType drType, @Nullable CacheObject val, GridCacheVersion ver, AffinityTopologyVersion topVer)
- throws IgniteCheckedException {
- if (cctx.isDrEnabled() && drType != DR_NONE && !isInternal())
- cctx.dr().replicate(key, val, rawTtl(), rawExpireTime(), ver.conflictVersion(), drType, topVer);
- }
+ /**
+ * Perform DR if needed.
+ *
+ * @param drType DR type.
+ * @param val Value.
+ * @param ver Version.
+ * @param topVer Topology version.
+ * @throws IgniteCheckedException In case of exception.
+ */
+ private void drReplicate(GridDrType drType, @Nullable CacheObject val, GridCacheVersion ver, AffinityTopologyVersion topVer)
+ throws IgniteCheckedException {
+ if (cctx.isDrEnabled() && drType != DR_NONE && !isInternal())
+ cctx.dr().replicate(key, val, rawTtl(), rawExpireTime(), ver.conflictVersion(), drType, topVer);
+ }
/**
* @return {@code true} if entry has readers. It makes sense only for dht entry.
@@ -3027,6 +2598,13 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
}
/**
+ * @return Update counter.
+ */
+ protected long nextPartCounter() {
+ return 0;
+ }
+
+ /**
* @param topVer Topology version.
* @return Update counter.
*/
@@ -4177,6 +3755,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
/**
* @param key Key.
+ * @param keepBinary Keep binary flag.
*/
private LazyValueEntry(KeyCacheObject key, boolean keepBinary) {
this.key = key;
@@ -4223,4 +3802,710 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
return "IteratorEntry [key=" + key + ']';
}
}
+
+ /**
+ *
+ */
+ private static class AtomicCacheUpdateClosure implements IgniteCacheOffheapManager.OffheapInvokeClosure {
+ /** */
+ private final GridCacheMapEntry entry;
+
+ /** */
+ private GridCacheVersion newVer;
+
+ /** */
+ private GridCacheOperation op;
+
+ /** */
+ private Object writeObj;
+
+ /** */
+ private Object[] invokeArgs;
+
+ /** */
+ private final boolean readThrough;
+
+ /** */
+ private final boolean writeThrough;
+
+ /** */
+ private final boolean keepBinary;
+
+ /** */
+ private final IgniteCacheExpiryPolicy expiryPlc;
+
+ /** */
+ private final boolean primary;
+
+ /** */
+ private final boolean verCheck;
+
+ /** */
+ private final CacheEntryPredicate[] filter;
+
+ /** */
+ private final long explicitTtl;
+
+ /** */
+ private final long explicitExpireTime;
+
+ /** */
+ private GridCacheVersion conflictVer;
+
+ /** */
+ private final boolean conflictResolve;
+
+ /** */
+ private final boolean intercept;
+
+ /** */
+ private final Long updateCntr;
+
+ /** */
+ private GridCacheUpdateAtomicResult updateRes;
+
+ /** */
+ private IgniteTree.OperationType treeOp;
+
+ /** */
+ private CacheDataRow newRow;
+
+ /** */
+ private CacheDataRow oldRow;
+
+ AtomicCacheUpdateClosure(GridCacheMapEntry entry,
+ GridCacheVersion newVer,
+ GridCacheOperation op,
+ Object writeObj,
+ Object[] invokeArgs,
+ boolean readThrough,
+ boolean writeThrough,
+ boolean keepBinary,
+ @Nullable IgniteCacheExpiryPolicy expiryPlc,
+ boolean primary,
+ boolean verCheck,
+ @Nullable CacheEntryPredicate[] filter,
+ long explicitTtl,
+ long explicitExpireTime,
+ @Nullable GridCacheVersion conflictVer,
+ boolean conflictResolve,
+ boolean intercept,
+ @Nullable Long updateCntr) {
+ assert op == UPDATE || op == DELETE || op == TRANSFORM : op;
+
+ this.entry = entry;
+ this.newVer = newVer;
+ this.op = op;
+ this.writeObj = writeObj;
+ this.invokeArgs = invokeArgs;
+ this.readThrough = readThrough;
+ this.writeThrough = writeThrough;
+ this.keepBinary = keepBinary;
+ this.expiryPlc = expiryPlc;
+ this.primary = primary;
+ this.verCheck = verCheck;
+ this.filter = filter;
+ this.explicitTtl = explicitTtl;
+ this.explicitExpireTime = explicitExpireTime;
+ this.conflictVer = conflictVer;
+ this.conflictResolve = conflictResolve;
+ this.intercept = intercept;
+ this.updateCntr = updateCntr;
+
+ switch (op) {
+ case UPDATE:
+ treeOp = IgniteTree.OperationType.PUT;
+
+ break;
+
+ case DELETE:
+ treeOp = IgniteTree.OperationType.REMOVE;
+
+ break;
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Nullable @Override public CacheDataRow oldRow() {
+ return oldRow;
+ }
+
+ /** {@inheritDoc} */
+ @Override public CacheDataRow newRow() {
+ return newRow;
+ }
+
+ /** {@inheritDoc} */
+ @Override public IgniteTree.OperationType operationType() {
+ return treeOp;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void call(@Nullable CacheDataRow oldRow) throws IgniteCheckedException {
+ assert oldRow == null || oldRow.link() != 0 : oldRow;
+
+ this.oldRow = oldRow;
+
+ GridCacheContext cctx = entry.context();
+
+ CacheObject oldVal;
+ CacheObject storeLoadedVal = null;
+
+ if (oldRow != null) {
+ oldVal = oldRow.value();
+
+ entry.update(oldVal, oldRow.expireTime(), 0, oldRow.version(), false);
+ }
+ else
+ oldVal = null;
+
+ if (oldVal == null && readThrough) {
+ storeLoadedVal = cctx.toCacheObject(cctx.store().load(null, entry.key));
+
+ if (storeLoadedVal != null) {
+ oldVal = cctx.kernalContext().cacheObjects().prepareForCache(storeLoadedVal, cctx);
+
+ entry.val = oldVal;
+
+ if (entry.deletedUnlocked())
+ entry.deletedUnlocked(false);
+ }
+ }
+
+ CacheInvokeEntry<Object, Object> invokeEntry = null;
+ IgniteBiTuple<Object, Exception> invokeRes = null;
+
+ if (op == TRANSFORM) {
+ invokeEntry = new CacheInvokeEntry<>(entry.key, oldVal, entry.ver, keepBinary, entry);
+
+ invokeRes = runEntryProcessor(invokeEntry);
+
+ op = writeObj == null ? DELETE : UPDATE;
+ }
+
+ CacheObject newVal = (CacheObject)writeObj;
+
+ GridCacheVersionConflictContext<?, ?> conflictCtx;
+
+ if (conflictResolve) {
+ conflictCtx = resolveConflict(newVal, invokeRes);
+
+ if (updateRes != null) {
+ assert conflictCtx != null && conflictCtx.isUseOld() : conflictCtx;
+ assert treeOp == IgniteTree.OperationType.NOOP : treeOp;
+
+ return;
+ }
+ }
+ else {
+ conflictCtx = null;
+
+ // Perform version check only in case there was no explicit conflict resolution.
+ versionCheck(invokeRes);
+
+ if (updateRes != null) {
+ assert treeOp == IgniteTree.OperationType.NOOP : treeOp;
+
+ return;
+ }
+ }
+
+ if (!F.isEmptyOrNulls(filter)) {
+ boolean pass = cctx.isAllLocked(entry, filter);
+
+ if (!pass) {
+ // TODO
+// if (expiryPlc != null && !readFromStore && entry.val != null && !cctx.putIfAbsentFilter(filter))
+// updateTtl(expiryPlc);
+ treeOp = IgniteTree.OperationType.NOOP;
+
+ updateRes = new GridCacheUpdateAtomicResult(UpdateOutcome.FILTER_FAILED,
+ oldVal,
+ null,
+ invokeRes,
+ CU.TTL_ETERNAL,
+ CU.EXPIRE_TIME_ETERNAL,
+ null,
+ null,
+ 0);
+
+ return;
+ }
+ }
+
+ if (op == TRANSFORM) {
+ assert invokeEntry != null;
+
+ if (!invokeEntry.modified()) {
+ // TODO
+// if (expiryPlc != null && !readFromStore && entry.val != null)
+// updateTtl(expiryPlc);
+ treeOp = IgniteTree.OperationType.NOOP;
+
+ updateRes = new GridCacheUpdateAtomicResult(UpdateOutcome.INVOKE_NO_OP,
+ oldVal,
+ null,
+ invokeRes,
+ CU.TTL_ETERNAL,
+ CU.EXPIRE_TIME_ETERNAL,
+ null,
+ null,
+ 0);
+
+ return;
+ }
+
+ op = writeObj == null ? DELETE : UPDATE;
+ }
+
+ // Incorporate conflict version into new version if needed.
+ if (conflictVer != null && conflictVer != newVer) {
+ newVer = new GridCacheVersionEx(newVer.topologyVersion(),
+ newVer.globalTime(),
+ newVer.order(),
+ newVer.nodeOrder(),
+ newVer.dataCenterId(),
+ conflictVer);
+ }
+
+ if (op == UPDATE)
+ update(conflictCtx, invokeRes);
+ else {
+ assert op == DELETE && writeObj == null : op;
+
+ remove(conflictCtx, invokeRes);
+ }
+
+ assert updateRes != null && treeOp != null;
+ }
+
+ /**
+ * @param conflictCtx Conflict context.
+ * @param invokeRes Entry processor result (for invoke operation).
+ * @throws IgniteCheckedException If failed.
+ */
+ private void update(@Nullable GridCacheVersionConflictContext<?, ?> conflictCtx,
+ @Nullable IgniteBiTuple<Object, Exception> invokeRes)
+ throws IgniteCheckedException
+ {
+ GridCacheContext cctx = entry.context();
+
+ final CacheObject oldVal = entry.val;
+ CacheObject updated = (CacheObject)writeObj;
+
+ long newSysTtl;
+ long newSysExpireTime;
+
+ long newTtl;
+ long newExpireTime;
+
+ // Conflict context is null if there were no explicit conflict resolution.
+ if (conflictCtx == null) {
+ // Calculate TTL and expire time for local update.
+ if (explicitTtl != CU.TTL_NOT_CHANGED) {
+ // If conflict existed, expire time must be explicit.
+ assert conflictVer == null || explicitExpireTime != CU.EXPIRE_TIME_CALCULATE;
+
+ newSysTtl = newTtl = explicitTtl;
+ newSysExpireTime = explicitExpireTime;
+
+ newExpireTime = explicitExpireTime != CU.EXPIRE_TIME_CALCULATE ?
+ explicitExpireTime : CU.toExpireTime(explicitTtl);
+ }
+ else {
+ newSysTtl = expiryPlc == null ? CU.TTL_NOT_CHANGED :
+ entry.val != null ? expiryPlc.forUpdate() : expiryPlc.forCreate();
+
+ if (newSysTtl == CU.TTL_NOT_CHANGED) {
+ newSysExpireTime = CU.EXPIRE_TIME_CALCULATE;
+ newTtl = entry.ttlExtras();
+ newExpireTime = entry.expireTimeExtras();
+ }
+ else if (newSysTtl == CU.TTL_ZERO) {
+ op = GridCacheOperation.DELETE;
+
+ newSysTtl = CU.TTL_NOT_CHANGED;
+ newSysExpireTime = CU.EXPIRE_TIME_CALCULATE;
+
+ newTtl = CU.TTL_ETERNAL;
+ newExpireTime = CU.EXPIRE_TIME_ETERNAL;
+
+ updated = null;
+ }
+ else {
+ newSysExpireTime = CU.EXPIRE_TIME_CALCULATE;
+ newTtl = newSysTtl;
+ newExpireTime = CU.toExpireTime(newTtl);
+ }
+ }
+ }
+ else {
+ newSysTtl = newTtl = conflictCtx.ttl();
+ newSysExpireTime = newExpireTime = conflictCtx.expireTime();
+ }
+
+ if (intercept) {
+ Object updated0 = entry.value(null, updated, keepBinary, false);
+
+ CacheLazyEntry<Object, Object> interceptEntry = new CacheLazyEntry<>(cctx,
+ entry.key,
+ null,
+ entry.val,
+ null,
+ keepBinary);
+
+ Object interceptorVal = cctx.config().getInterceptor().onBeforePut(interceptEntry, updated0);
+
+ if (interceptorVal == null) {
+ treeOp = IgniteTree.OperationType.NOOP;
+
+ updateRes = new GridCacheUpdateAtomicResult(UpdateOutcome.INTERCEPTOR_CANCEL,
+ oldVal,
+ null,
+ invokeRes,
+ CU.TTL_ETERNAL,
+ CU.EXPIRE_TIME_ETERNAL,
+ null,
+ null,
+ 0);
+ }
+ else if (interceptorVal != updated0) {
+ updated0 = cctx.unwrapTemporary(interceptorVal);
+
+ updated = cctx.toCacheObject(updated0);
+ }
+ }
+
+ updated = cctx.kernalContext().cacheObjects().prepareForCache(updated, cctx);
+
+ if (writeThrough)
+ // Must persist inside synchronization in non-tx mode.
+ cctx.store().put(null, entry.key, updated, newVer);
+
+ if (entry.val == null) {
+ boolean new0 = entry.isStartVersion();
+
+ assert entry.deletedUnlocked() || new0 || entry.isInternal(): "Invalid entry [entry=" + entry +
+ ", locNodeId=" + cctx.localNodeId() + ']';
+
+ if (!new0 && !entry.isInternal())
+ entry.deletedUnlocked(false);
+ }
+ else {
+ assert !entry.deletedUnlocked() : "Invalid entry [entry=" + this +
+ ", locNodeId=" + cctx.localNodeId() + ']';
+ }
+
+ long updateCntr0 = entry.nextPartCounter();
+
+ if (updateCntr != null)
+ updateCntr0 = updateCntr;
+
+ entry.logUpdate(op, updated, newVer, newExpireTime, updateCntr0);
+
+ newRow = entry.localPartition().dataStore().createRow(entry.key,
+ updated,
+ newVer,
+ newExpireTime,
+ oldRow);
+
+ entry.update(updated, newExpireTime, newTtl, newVer, true);
+
+ treeOp = oldRow != null && oldRow.link() == newRow.link() ?
+ IgniteTree.OperationType.NOOP : IgniteTree.OperationType.PUT;
+
+ updateRes = new GridCacheUpdateAtomicResult(UpdateOutcome.SUCCESS,
+ oldVal,
+ updated,
+ invokeRes,
+ newSysTtl,
+ newSysExpireTime,
+ null,
+ conflictCtx,
+ updateCntr0);
+ }
+
+ /**
+ * @param conflictCtx Conflict context.
+ * @param invokeRes Entry processor result (for invoke operation).
+ * @throws IgniteCheckedException If failed.
+ */
+ @SuppressWarnings("unchecked")
+ private void remove(@Nullable GridCacheVersionConflictContext<?, ?> conflictCtx,
+ @Nullable IgniteBiTuple<Object, Exception> invokeRes)
+ throws IgniteCheckedException
+ {
+ GridCacheContext cctx = entry.context();
+
+ CacheObject oldVal = entry.val;
+
+ IgniteBiTuple<Boolean, Object> interceptRes = null;
+
+ if (intercept) {
+ CacheLazyEntry<Object, Object> intercepEntry = new CacheLazyEntry<>(cctx,
+ entry.key,
+ null,
+ oldVal, null,
+ keepBinary);
+
+ interceptRes = cctx.config().getInterceptor().onBeforeRemove(intercepEntry);
+
+ if (cctx.cancelRemove(interceptRes)) {
+ treeOp = IgniteTree.OperationType.NOOP;
+
+ updateRes = new GridCacheUpdateAtomicResult(UpdateOutcome.INTERCEPTOR_CANCEL,
+ cctx.toCacheObject(cctx.unwrapTemporary(interceptRes.get2())),
+ null,
+ invokeRes,
+ CU.TTL_ETERNAL,
+ CU.EXPIRE_TIME_ETERNAL,
+ null,
+ null,
+ 0);
+
+ return;
+ }
+ }
+
+ if (writeThrough)
+ // Must persist inside synchronization in non-tx mode.
+ cctx.store().remove(null, entry.key);
+
+ long updateCntr0 = entry.nextPartCounter();
+
+ if (updateCntr != null)
+ updateCntr0 = updateCntr;
+
+ if (oldVal != null) {
+ assert !entry.deletedUnlocked();
+
+ if (!entry.isInternal())
+ entry.deletedUnlocked(true);
+ }
+ else {
+ boolean new0 = entry.isStartVersion();
+
+ assert entry.deletedUnlocked() || new0 || entry.isInternal() : "Invalid entry [entry=" + this +
+ ", locNodeId=" + cctx.localNodeId() + ']';
+
+ if (new0) {
+ if (!entry.isInternal())
+ entry.deletedUnlocked(true);
+ }
+ }
+
+ GridCacheVersion enqueueVer = newVer;
+
+ entry.update(null, CU.TTL_ETERNAL, CU.EXPIRE_TIME_ETERNAL, newVer, true);
+
+ treeOp = oldVal == null ? IgniteTree.OperationType.NOOP : IgniteTree.OperationType.REMOVE;
+
+ UpdateOutcome outcome = oldVal != null ? UpdateOutcome.SUCCESS : UpdateOutcome.REMOVE_NO_VAL;
+
+ if (interceptRes != null)
+ oldVal = cctx.toCacheObject(cctx.unwrapTemporary(interceptRes.get2()));
+
+ updateRes = new GridCacheUpdateAtomicResult(outcome,
+ oldVal,
+ null,
+ invokeRes,
+ CU.TTL_NOT_CHANGED,
+ CU.EXPIRE_TIME_CALCULATE,
+ enqueueVer,
+ conflictCtx,
+ updateCntr0);
+ }
+
+ /**
+ * @param newVal New entry value.
+ * @param invokeRes Entry processor result (for invoke operation).
+ * @return Conflict context.
+ * @throws IgniteCheckedException If failed.
+ */
+ private GridCacheVersionConflictContext<?, ?> resolveConflict(
+ CacheObject newVal,
+ @Nullable IgniteBiTuple<Object, Exception> invokeRes)
+ throws IgniteCheckedException
+ {
+ GridCacheContext cctx = entry.context();
+
+ // Cache is conflict-enabled.
+ if (cctx.conflictNeedResolve()) {
+ GridCacheVersion oldConflictVer = entry.ver.conflictVersion();
+
+ // Prepare old and new entries for conflict resolution.
+ GridCacheVersionedEntryEx oldEntry = new GridCacheLazyPlainVersionedEntry<>(cctx,
+ entry.key,
+ entry.val,
+ entry.ttlExtras(),
+ entry.expireTimeExtras(),
+ entry.ver.conflictVersion(),
+ entry.isStartVersion(),
+ keepBinary);
+
+ GridTuple3<Long, Long, Boolean> expiration = entry.ttlAndExpireTime(expiryPlc,
+ explicitTtl,
+ explicitExpireTime);
+
+ GridCacheVersionedEntryEx newEntry = new GridCacheLazyPlainVersionedEntry<>(
+ cctx,
+ entry.key,
+ newVal,
+ expiration.get1(),
+ expiration.get2(),
+ conflictVer != null ? conflictVer : newVer,
+ keepBinary);
+
+ // Resolve conflict.
+ GridCacheVersionConflictContext<?, ?> conflictCtx = cctx.conflictResolve(oldEntry, newEntry, verCheck);
+
+ assert conflictCtx != null;
+
+ boolean ignoreTime = cctx.config().getAtomicWriteOrderMode() == CacheAtomicWriteOrderMode.PRIMARY;
+
+ // Use old value?
+ if (conflictCtx.isUseOld()) {
+ GridCacheVersion newConflictVer = conflictVer != null ? conflictVer : newVer;
+
+ // Handle special case with atomic comparator.
+ if (!entry.isStartVersion() && // Not initial value,
+ verCheck && // and atomic version check,
+ oldConflictVer.dataCenterId() == newConflictVer.dataCenterId() && // and data centers are equal,
+ ATOMIC_VER_COMPARATOR.compare(oldConflictVer, newConflictVer, ignoreTime) == 0 && // and both versions are equal,
+ cctx.writeThrough() && // and store is enabled,
+ primary) // and we are primary.
+ {
+ CacheObject val = entry.val;
+
+ if (val == null) {
+ assert entry.deletedUnlocked();
+
+ cctx.store().remove(null, entry.key);
+ }
+ else
+ cctx.store().put(null, entry.key, val, entry.ver);
+ }
+
+ treeOp = IgniteTree.OperationType.NOOP;
+
+ updateRes = new GridCacheUpdateAtomicResult(UpdateOutcome.CONFLICT_USE_OLD,
+ entry.val,
+ null,
+ invokeRes,
+ CU.TTL_ETERNAL,
+ CU.EXPIRE_TIME_ETERNAL,
+ null,
+ null,
+ 0);
+ }
+ // Will update something.
+ else {
+ // Merge is a local update which override passed value bytes.
+ if (conflictCtx.isMerge()) {
+ writeObj = cctx.toCacheObject(conflictCtx.mergeValue());
+
+ conflictVer = null;
+ }
+ else
+ assert conflictCtx.isUseNew();
+
+ // Update value is known at this point, so update operation type.
+ op = writeObj != null ? GridCacheOperation.UPDATE : GridCacheOperation.DELETE;
+ }
+
+ return conflictCtx;
+ }
+ else
+ // Nullify conflict version on this update, so that we will use regular version during next updates.
+ conflictVer = null;
+
+ return null;
+ }
+
+ /**
+ * @param invokeRes Entry processor result (for invoke operation).
+ * @throws IgniteCheckedException If failed.
+ */
+ private void versionCheck(@Nullable IgniteBiTuple<Object, Exception> invokeRes) throws IgniteCheckedException {
+ GridCacheContext cctx = entry.context();
+
+ boolean ignoreTime = cctx.config().getAtomicWriteOrderMode() == CacheAtomicWriteOrderMode.PRIMARY;
+
+ if (verCheck) {
+ if (!entry.isStartVersion() && ATOMIC_VER_COMPARATOR.compare(entry.ver, newVer, ignoreTime) >= 0) {
+ if (ATOMIC_VER_COMPARATOR.compare(entry.ver, newVer, ignoreTime) == 0 && cctx.writeThrough() && primary) {
+ if (log.isDebugEnabled())
+ log.debug("Received entry update with same version as current (will update store) " +
+ "[entry=" + this + ", newVer=" + newVer + ']');
+
+ CacheObject val = entry.val;
+
+ if (val == null) {
+ assert entry.deletedUnlocked();
+
+ cctx.store().remove(null, entry.key);
+ }
+ else
+ cctx.store().put(null, entry.key, val, entry.ver);
+ }
+ else {
+ if (log.isDebugEnabled())
+ log.debug("Received entry update with smaller version than current (will ignore) " +
+ "[entry=" + this + ", newVer=" + newVer + ']');
+ }
+
+ treeOp = IgniteTree.OperationType.NOOP;
+
+ updateRes = new GridCacheUpdateAtomicResult(UpdateOutcome.VERSION_CHECK_FAILED,
+ entry.val,
+ null,
+ invokeRes,
+ CU.TTL_ETERNAL,
+ CU.EXPIRE_TIME_ETERNAL,
+ null,
+ null,
+ 0);
+ }
+ }
+ else
+ assert entry.isStartVersion() || ATOMIC_VER_COMPARATOR.compare(entry.ver, newVer, ignoreTime) <= 0 :
+ "Invalid version for inner update [isNew=" + entry.isStartVersion() + ", entry=" + this + ", newVer=" + newVer + ']';
+ }
+
+ /**
+ * @param invokeEntry Entry for {@link EntryProcessor}.
+ * @return Entry processor return value.
+ */
+ @SuppressWarnings("unchecked")
+ private IgniteBiTuple<Object, Exception> runEntryProcessor(CacheInvokeEntry<Object, Object> invokeEntry) {
+ EntryProcessor<Object, Object, ?> entryProcessor = (EntryProcessor<Object, Object, ?>)writeObj;
+
+ try {
+ Object computed = entryProcessor.process(invokeEntry, invokeArgs);
+
+ if (invokeEntry.modified()) {
+ GridCacheContext cctx = entry.context();
+
+ writeObj = cctx.toCacheObject(cctx.unwrapTemporary(invokeEntry.getValue()));
+ }
+ else
+ writeObj = invokeEntry.valObj;
+
+ if (computed != null)
+ return new IgniteBiTuple<>(entry.cctx.unwrapTemporary(computed), null);
+
+ return null;
+ }
+ catch (Exception e) {
+ writeObj = invokeEntry.valObj;
+
+ return new IgniteBiTuple<>(null, e);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(AtomicCacheUpdateClosure.class, this);
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/0cf2cf93/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUpdateAtomicResult.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUpdateAtomicResult.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUpdateAtomicResult.java
index 2355b7c..97cb534 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUpdateAtomicResult.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUpdateAtomicResult.java
@@ -29,8 +29,8 @@ import org.jetbrains.annotations.Nullable;
* Cache entry atomic update result.
*/
public class GridCacheUpdateAtomicResult {
- /** Success flag.*/
- private final boolean success;
+ /** Update operation outcome. */
+ private final UpdateOutcome outcome;
/** Old value. */
@GridToStringInclude
@@ -54,9 +54,6 @@ public class GridCacheUpdateAtomicResult {
@GridToStringInclude
private final GridCacheVersionConflictContext<?, ?> conflictRes;
- /** Whether update should be propagated to DHT node. */
- private final boolean sndToDht;
-
/** */
private final long updateCntr;
@@ -66,7 +63,7 @@ public class GridCacheUpdateAtomicResult {
/**
* Constructor.
*
- * @param success Success flag.
+ * @param outcome Update outcome.
* @param oldVal Old value.
* @param newVal New value.
* @param res Value computed by the {@link EntryProcessor}.
@@ -74,10 +71,9 @@ public class GridCacheUpdateAtomicResult {
* @param conflictExpireTime Explicit DR expire time (if any).
* @param rmvVer Version for deferred delete.
* @param conflictRes DR resolution result.
- * @param sndToDht Whether update should be propagated to DHT node.
* @param updateCntr Partition update counter.
*/
- public GridCacheUpdateAtomicResult(boolean success,
+ GridCacheUpdateAtomicResult(UpdateOutcome outcome,
@Nullable CacheObject oldVal,
@Nullable CacheObject newVal,
@Nullable IgniteBiTuple<Object, Exception> res,
@@ -85,9 +81,10 @@ public class GridCacheUpdateAtomicResult {
long conflictExpireTime,
@Nullable GridCacheVersion rmvVer,
@Nullable GridCacheVersionConflictContext<?, ?> conflictRes,
- boolean sndToDht,
long updateCntr) {
- this.success = success;
+ assert outcome != null;
+
+ this.outcome = outcome;
this.oldVal = oldVal;
this.newVal = newVal;
this.res = res;
@@ -95,11 +92,17 @@ public class GridCacheUpdateAtomicResult {
this.conflictExpireTime = conflictExpireTime;
this.rmvVer = rmvVer;
this.conflictRes = conflictRes;
- this.sndToDht = sndToDht;
this.updateCntr = updateCntr;
}
/**
+ * @return Update operation outcome.
+ */
+ UpdateOutcome outcome() {
+ return outcome;
+ }
+
+ /**
* @return Value computed by the {@link EntryProcessor}.
*/
@Nullable public IgniteBiTuple<Object, Exception> computedResult() {
@@ -110,7 +113,7 @@ public class GridCacheUpdateAtomicResult {
* @return Success flag.
*/
public boolean success() {
- return success;
+ return outcome.success();
}
/**
@@ -167,7 +170,74 @@ public class GridCacheUpdateAtomicResult {
* @return Whether update should be propagated to DHT node.
*/
public boolean sendToDht() {
- return sndToDht;
+ return outcome.sendToDht();
+ }
+
+ /**
+ *
+ */
+ public enum UpdateOutcome {
+ /** */
+ CONFLICT_USE_OLD(false, false, false),
+
+ /** */
+ VERSION_CHECK_FAILED(false, false, false),
+
+ /** */
+ FILTER_FAILED(false, false, true),
+
+ /** */
+ INVOKE_NO_OP(false, false, true),
+
+ /** */
+ INTERCEPTOR_CANCEL(false, false, true),
+
+ /** */
+ REMOVE_NO_VAL(false, true, true),
+
+ /** */
+ SUCCESS(true, true, true);
+
+ /** */
+ private final boolean success;
+
+ /** */
+ private final boolean sndToDht;
+
+ /** */
+ private final boolean updateReadMetrics;
+
+ /**
+ * @param success Success flag.
+ * @param sndToDht Whether update should be propagated to DHT node.
+ * @param updateReadMetrics Metrics update flag.
+ */
+ UpdateOutcome(boolean success, boolean sndToDht, boolean updateReadMetrics) {
+ this.success = success;
+ this.sndToDht = sndToDht;
+ this.updateReadMetrics = updateReadMetrics;
+ }
+
+ /**
+ * @return Success flag.
+ */
+ public boolean success() {
+ return success;
+ }
+
+ /**
+ * @return Whether update should be propagated to DHT node.
+ */
+ public boolean sendToDht() {
+ return sndToDht;
+ }
+
+ /**
+ * @return Metrics update flag.
+ */
+ public boolean updateReadMetrics() {
+ return updateReadMetrics;
+ }
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/ignite/blob/0cf2cf93/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java
index a869b21..6143752 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java
@@ -27,6 +27,7 @@ import org.apache.ignite.internal.processors.cache.database.tree.reuse.ReuseList
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalPartition;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
import org.apache.ignite.internal.util.GridAtomicLong;
+import org.apache.ignite.internal.util.IgniteTree;
import org.apache.ignite.internal.util.lang.GridCloseableIterator;
import org.apache.ignite.internal.util.lang.GridCursor;
import org.apache.ignite.internal.util.lang.GridIterator;
@@ -107,6 +108,15 @@ public interface IgniteCacheOffheapManager extends GridCacheManager {
public long expiredSize() throws IgniteCheckedException;
/**
+ * @param key Key.
+ * @param part Partition.
+ * @param c Tree update closure.
+ * @throws IgniteCheckedException If failed.
+ */
+ public void invoke(KeyCacheObject key, GridDhtLocalPartition part, OffheapInvokeClosure c)
+ throws IgniteCheckedException;
+
+ /**
* @param key Key.
* @param val Value.
* @param ver Version.
@@ -253,6 +263,16 @@ public interface IgniteCacheOffheapManager extends GridCacheManager {
/**
*
*/
+ interface OffheapInvokeClosure extends IgniteTree.InvokeClosure<CacheDataRow> {
+ /**
+ * @return Old row.
+ */
+ @Nullable public CacheDataRow oldRow();
+ }
+
+ /**
+ *
+ */
interface CacheDataStore {
/**
* @return Partition ID.
@@ -297,6 +317,21 @@ public interface IgniteCacheOffheapManager extends GridCacheManager {
/**
* @param key Key.
+ * @param val Value.
+ * @param ver Version.
+ * @param expireTime Expire time.
+ * @param oldRow Old row.
+ * @return New row.
+ * @throws IgniteCheckedException If failed.
+ */
+ CacheDataRow createRow(KeyCacheObject key,
+ CacheObject val,
+ GridCacheVersion ver,
+ long expireTime,
+ @Nullable CacheDataRow oldRow) throws IgniteCheckedException;
+
+ /**
+ * @param key Key.
* @param part Partition.
* @param val Value.
* @param ver Version.
@@ -313,6 +348,13 @@ public interface IgniteCacheOffheapManager extends GridCacheManager {
/**
* @param key Key.
+ * @param c Closure.
+ * @throws IgniteCheckedException If failed.
+ */
+ public void invoke(KeyCacheObject key, OffheapInvokeClosure c) throws IgniteCheckedException;
+
+ /**
+ * @param key Key.
* @param partId Partition number.
* @throws IgniteCheckedException If failed.
*/