You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vo...@apache.org on 2016/03/04 20:26:34 UTC
ignite git commit: Speculation: path for single key update.
Repository: ignite
Updated Branches:
refs/heads/ignite-atomic-good-lock-bench d4cff4545 -> 573145090
Speculation: path for single key update.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/57314509
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/57314509
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/57314509
Branch: refs/heads/ignite-atomic-good-lock-bench
Commit: 5731450902d936d2b34bfb05469397b2cc65b9f7
Parents: d4cff45
Author: vozerov-gridgain <vo...@gridgain.com>
Authored: Fri Mar 4 22:26:25 2016 +0300
Committer: vozerov-gridgain <vo...@gridgain.com>
Committed: Fri Mar 4 22:26:25 2016 +0300
----------------------------------------------------------------------
.../dht/atomic/GridDhtAtomicCache.java | 485 ++++++++++++++++++-
1 file changed, 483 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/57314509/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
index dad00ed..d7c0e72 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
@@ -1321,6 +1321,185 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
}
}
+ public void updateAllAsyncInternal1(
+ UUID nodeId,
+ GridNearAtomicUpdateRequest req,
+ CI2<GridNearAtomicUpdateRequest, GridNearAtomicUpdateResponse> completionCb
+ ) {
+ KeyCacheObject key = req.keys().get(0);
+
+ GridNearAtomicUpdateResponse res = new GridNearAtomicUpdateResponse(ctx.cacheId(), nodeId, req.futureVersion(),
+ ctx.deploymentEnabled());
+
+ assert !req.returnValue() || (req.operation() == TRANSFORM);
+
+ GridDhtAtomicUpdateFuture dhtFut = null;
+
+ boolean remap = false;
+
+ String taskName = ctx.kernalContext().task().resolveTaskName(req.taskNameHash());
+
+ IgniteCacheExpiryPolicy expiry = null;
+
+ try {
+ // If batch store update is enabled, we need to lock all entries.
+ // First, need to acquire locks on cache entries, then check filter.
+ GridDhtCacheEntry locked = lockEntry(key, req.topologyVersion());
+
+ Collection<IgniteBiTuple<GridDhtCacheEntry, GridCacheVersion>> deleted = null;
+
+ try {
+ GridDhtPartitionTopology top = topology();
+
+ top.readLock();
+
+ try {
+ if (top.stopping()) {
+ res.addFailedKeys(req.keys(), new IgniteCheckedException("Failed to perform cache operation " +
+ "(cache is stopped): " + name()));
+
+ completionCb.apply(req, res);
+
+ return;
+ }
+
+ // Do not check topology version for CLOCK versioning since
+ // partition exchange will wait for near update future (if future is on server node).
+ // Also do not check topology version if topology was locked on near node by
+ // external transaction or explicit lock.
+ if ((req.fastMap() && !req.clientRequest()) || req.topologyLocked() ||
+ !needRemap(req.topologyVersion(), top.topologyVersion())) {
+ ClusterNode node = ctx.discovery().node(nodeId);
+
+ if (node == null) {
+ U.warn(log, "Node originated update request left grid: " + nodeId);
+
+ return;
+ }
+
+ boolean hasNear = ctx.discovery().cacheNearNode(node, name());
+
+ GridCacheVersion ver = req.updateVersion();
+
+ if (ver == null) {
+ // Assign next version for update inside entries lock.
+ ver = ctx.versions().next(top.topologyVersion());
+
+ if (hasNear)
+ res.nearVersion(ver);
+ }
+
+ assert ver != null : "Got null version for update request: " + req;
+
+ if (log.isDebugEnabled())
+ log.debug("Using cache version for update request on primary node [ver=" + ver +
+ ", req=" + req + ']');
+
+ boolean sndPrevVal = !top.rebalanceFinished(req.topologyVersion());
+
+ dhtFut = createDhtFuture(ver, req, res, completionCb, false);
+
+ expiry = expiryPolicy(req.expiry());
+
+ GridCacheReturn retVal = null;
+
+ UpdateSingleResult updRes = updateSingle(node,
+ hasNear,
+ req,
+ res,
+ locked,
+ ver,
+ dhtFut,
+ completionCb,
+ ctx.isDrEnabled(),
+ taskName,
+ expiry,
+ sndPrevVal);
+
+ retVal = updRes.returnValue();
+ deleted = updRes.deleted();
+ dhtFut = updRes.dhtFuture();
+
+ if (retVal == null)
+ retVal = new GridCacheReturn(ctx, node.isLocal(), true, null, true);
+
+ res.returnValue(retVal);
+
+ if (req.writeSynchronizationMode() != FULL_ASYNC)
+ req.cleanup(!node.isLocal());
+
+ if (dhtFut != null)
+ ctx.mvcc().addAtomicFuture(dhtFut.version(), dhtFut);
+ }
+ else
+ // Should remap all keys.
+ remap = true;
+ }
+ finally {
+ top.readUnlock();
+ }
+ }
+ catch (GridCacheEntryRemovedException e) {
+ assert false : "Entry should not become obsolete while holding lock.";
+
+ e.printStackTrace();
+ }
+ finally {
+ if (locked != null)
+ unlockEntry(locked, req.topologyVersion());
+
+ // Enqueue if necessary after locks release.
+ if (deleted != null) {
+ assert !deleted.isEmpty();
+ assert ctx.deferredDelete() : this;
+
+ for (IgniteBiTuple<GridDhtCacheEntry, GridCacheVersion> e : deleted)
+ ctx.onDeferredDelete(e.get1(), e.get2());
+ }
+ }
+ }
+ catch (GridDhtInvalidPartitionException ignore) {
+ assert !req.fastMap() || req.clientRequest() : req;
+
+ if (log.isDebugEnabled())
+ log.debug("Caught invalid partition exception for cache entry (will remap update request): " + req);
+
+ remap = true;
+ }
+ catch (Throwable e) {
+ // At least RuntimeException can be thrown by the code above when GridCacheContext is cleaned and there is
+ // an attempt to use cleaned resources.
+ U.error(log, "Unexpected exception during cache update", e);
+
+ res.addFailedKeys(req.keys(), e);
+
+ completionCb.apply(req, res);
+
+ if (e instanceof Error)
+ throw e;
+
+ return;
+ }
+
+ if (remap) {
+ assert dhtFut == null;
+
+ res.remapKeys(req.keys());
+
+ completionCb.apply(req, res);
+ }
+ else {
+ // If there are backups, map backup update future.
+ if (dhtFut != null)
+ dhtFut.map();
+ // Otherwise, complete the call.
+ else
+ completionCb.apply(req, res);
+ }
+
+ sendTtlUpdateRequest(expiry);
+ }
+
/**
* Executes local update after preloader fetched values.
*
@@ -1333,11 +1512,17 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
GridNearAtomicUpdateRequest req,
CI2<GridNearAtomicUpdateRequest, GridNearAtomicUpdateResponse> completionCb
) {
+ List<KeyCacheObject> keys = req.keys();
+
+ if (keys.size() == 1) {
+ updateAllAsyncInternal1(nodeId, req, completionCb);
+
+ return;
+ }
+
GridNearAtomicUpdateResponse res = new GridNearAtomicUpdateResponse(ctx.cacheId(), nodeId, req.futureVersion(),
ctx.deploymentEnabled());
- List<KeyCacheObject> keys = req.keys();
-
assert !req.returnValue() || (req.operation() == TRANSFORM || keys.size() == 1);
GridDhtAtomicUpdateFuture dhtFut = null;
@@ -2202,6 +2387,248 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
}
/**
+ * Updates locked entries one-by-one.
+ *
+ * @param node Originating node.
+ * @param hasNear {@code True} if originating node has near cache.
+ * @param req Update request.
+ * @param res Update response.
+ * @param entry Locked entries.
+ * @param ver Assigned update version.
+ * @param dhtFut Optional DHT future.
+ * @param completionCb Completion callback to invoke when DHT future is completed.
+ * @param replicate Whether DR is enabled for that cache.
+ * @param taskName Task name.
+ * @param expiry Expiry policy.
+ * @param sndPrevVal If {@code true} sends previous value to backups.
+ * @return Return value.
+ * @throws GridCacheEntryRemovedException Should be never thrown.
+ */
+ private UpdateSingleResult updateSingle(
+ ClusterNode node,
+ boolean hasNear,
+ GridNearAtomicUpdateRequest req,
+ GridNearAtomicUpdateResponse res,
+ GridDhtCacheEntry entry,
+ GridCacheVersion ver,
+ @Nullable GridDhtAtomicUpdateFuture dhtFut,
+ CI2<GridNearAtomicUpdateRequest, GridNearAtomicUpdateResponse> completionCb,
+ boolean replicate,
+ String taskName,
+ @Nullable IgniteCacheExpiryPolicy expiry,
+ boolean sndPrevVal
+ ) throws GridCacheEntryRemovedException {
+ GridCacheReturn retVal = null;
+ Collection<IgniteBiTuple<GridDhtCacheEntry, GridCacheVersion>> deleted = null;
+
+ KeyCacheObject k = req.keys().get(0);
+
+ AffinityTopologyVersion topVer = req.topologyVersion();
+
+ boolean checkReaders = hasNear || ctx.discovery().hasNearCache(name(), topVer);
+
+ boolean readersOnly = false;
+
+ boolean intercept = ctx.config().getInterceptor() != null;
+
+ boolean initLsnrs = false;
+ Map<UUID, CacheContinuousQueryListener> lsnrs = null;
+ boolean internal = false;
+
+ GridCacheOperation op = req.operation();
+
+ // We are holding java-level locks on entries at this point.
+ // No GridCacheEntryRemovedException can be thrown.
+ try {
+ if (entry != null) {
+ if (!initLsnrs) {
+ internal = entry.isInternal() || !context().userCache();
+
+ lsnrs = ctx.continuousQueries().updateListeners(internal, false);
+
+ initLsnrs = true;
+ }
+
+ GridCacheVersion newConflictVer = req.conflictVersion(0);
+ long newConflictTtl = req.conflictTtl(0);
+ long newConflictExpireTime = req.conflictExpireTime(0);
+
+ assert !(newConflictVer instanceof GridCacheVersionEx) : newConflictVer;
+
+ boolean primary = !req.fastMap() || ctx.affinity().primary(ctx.localNode(), entry.key(),
+ req.topologyVersion());
+
+ Object writeVal = op == TRANSFORM ? req.entryProcessor(0) : req.writeValue(0);
+
+ Collection<UUID> readers = null;
+ Collection<UUID> filteredReaders = null;
+
+ if (checkReaders) {
+ readers = entry.readers();
+ filteredReaders = F.view(entry.readers(), F.notEqualTo(node.id()));
+ }
+
+ GridCacheUpdateAtomicResult updRes = entry.innerUpdate(
+ ver,
+ node.id(),
+ locNodeId,
+ op,
+ writeVal,
+ req.invokeArguments(),
+ primary && writeThrough() && !req.skipStore(),
+ !req.skipStore(),
+ lsnrs != null || sndPrevVal || req.returnValue(),
+ req.keepBinary(),
+ expiry,
+ true,
+ true,
+ primary,
+ ctx.config().getAtomicWriteOrderMode() == CLOCK, // Check version in CLOCK mode on primary node.
+ topVer,
+ req.filter(),
+ replicate ? primary ? DR_PRIMARY : DR_BACKUP : DR_NONE,
+ newConflictTtl,
+ newConflictExpireTime,
+ newConflictVer,
+ true,
+ intercept,
+ req.subjectId(),
+ taskName,
+ null,
+ null);
+
+ if (dhtFut == null && !F.isEmpty(filteredReaders)) {
+ dhtFut = createDhtFuture(ver, req, res, completionCb, true);
+
+ readersOnly = true;
+ }
+
+ if (dhtFut != null) {
+ dhtFut.listeners(lsnrs);
+
+ if (updRes.sendToDht()) { // Send to backups even in case of remove-remove scenarios.
+ GridCacheVersionConflictContext<?, ?> conflictCtx = updRes.conflictResolveResult();
+
+ if (conflictCtx == null)
+ newConflictVer = null;
+ else if (conflictCtx.isMerge())
+ newConflictVer = null; // Conflict version is discarded in case of merge.
+
+ EntryProcessor<Object, Object, Object> entryProcessor = null;
+
+ if (!readersOnly) {
+ dhtFut.addWriteEntry(entry,
+ updRes.newValue(),
+ entryProcessor,
+ updRes.newTtl(),
+ updRes.conflictExpireTime(),
+ newConflictVer,
+ sndPrevVal,
+ updRes.oldValue(),
+ updRes.updateCounter());
+ }
+
+ if (!F.isEmpty(filteredReaders))
+ dhtFut.addNearWriteEntries(filteredReaders,
+ entry,
+ updRes.newValue(),
+ entryProcessor,
+ updRes.newTtl(),
+ updRes.conflictExpireTime());
+ }
+ else {
+ if (log.isDebugEnabled())
+ log.debug("Entry did not pass the filter or conflict resolution (will skip write) " +
+ "[entry=" + entry + ", filter=" + Arrays.toString(req.filter()) + ']');
+ }
+ }
+ else if (lsnrs != null && updRes.success()) {
+ ctx.continuousQueries().onEntryUpdated(
+ lsnrs,
+ entry.key(),
+ updRes.newValue(),
+ updRes.oldValue(),
+ internal,
+ entry.partition(),
+ primary,
+ false,
+ updRes.updateCounter(),
+ topVer);
+ }
+
+ if (hasNear) {
+ if (primary && updRes.sendToDht()) {
+ if (!ctx.affinity().belongs(node, entry.partition(), topVer)) {
+ // If put the same value as in request then do not need to send it back.
+ if (op == TRANSFORM || writeVal != updRes.newValue()) {
+ res.addNearValue(0,
+ updRes.newValue(),
+ updRes.newTtl(),
+ updRes.conflictExpireTime());
+ }
+ else
+ res.addNearTtl(0, updRes.newTtl(), updRes.conflictExpireTime());
+
+ if (updRes.newValue() != null) {
+ IgniteInternalFuture<Boolean> f = entry.addReader(node.id(), req.messageId(), topVer);
+
+ assert f == null : f;
+ }
+ }
+ else if (F.contains(readers, node.id())) // Reader became primary or backup.
+ entry.removeReader(node.id(), req.messageId());
+ else
+ res.addSkippedIndex(0);
+ }
+ else
+ res.addSkippedIndex(0);
+ }
+
+ if (updRes.removeVersion() != null) {
+ if (deleted == null)
+ deleted = new ArrayList<>(1);
+
+ deleted.add(F.t(entry, updRes.removeVersion()));
+ }
+
+ if (op == TRANSFORM) {
+ assert !req.returnValue();
+
+ IgniteBiTuple<Object, Exception> compRes = updRes.computedResult();
+
+ if (compRes != null && (compRes.get1() != null || compRes.get2() != null)) {
+ if (retVal == null)
+ retVal = new GridCacheReturn(node.isLocal());
+
+ retVal.addEntryProcessResult(ctx,
+ k,
+ null,
+ compRes.get1(),
+ compRes.get2());
+ }
+ }
+ else {
+ // Create only once.
+ if (retVal == null) {
+ CacheObject ret = updRes.oldValue();
+
+ retVal = new GridCacheReturn(ctx,
+ node.isLocal(),
+ req.keepBinary(),
+ req.returnValue() ? ret : null,
+ updRes.success());
+ }
+ }
+ }
+ }
+ catch (IgniteCheckedException e) {
+ res.addFailedKey(k, e);
+ }
+
+ return new UpdateSingleResult(retVal, deleted, dhtFut);
+ }
+
+ /**
* @param hasNear {@code True} if originating node has near cache.
* @param firstEntryIdx Index of the first entry in the request keys collection.
* @param entries Entries to update.
@@ -2567,6 +2994,60 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
}
}
+ private GridDhtCacheEntry lockEntry(KeyCacheObject key, AffinityTopologyVersion topVer)
+ throws GridDhtInvalidPartitionException {
+ while (true) {
+ try {
+ GridDhtCacheEntry entry = entryExx(key, topVer);
+
+ GridUnsafe.monitorEnter(entry);
+
+ if (entry.obsolete())
+ GridUnsafe.monitorExit(entry);
+ else
+ return entry;
+ }
+ catch (GridDhtInvalidPartitionException e) {
+ // Ignore invalid partition exception in CLOCK ordering mode.
+ if (ctx.config().getAtomicWriteOrderMode() == CLOCK)
+ return null;
+ else
+ throw e;
+ }
+ }
+ }
+
+ /**
+ * Releases java-level locks on cache entries.
+ *
+ * @param entry Locked entries.
+ * @param topVer Topology version.
+ */
+ private void unlockEntry(GridDhtCacheEntry entry, AffinityTopologyVersion topVer) {
+ if (entry == null)
+ return;
+
+ // Process deleted entries before locks release.
+ assert ctx.deferredDelete() : this;
+
+ // Entries to skip eviction manager notification for.
+ // Enqueue entries while holding locks.
+ boolean skip = false;
+
+ try {
+ if (entry.deleted())
+ skip = true;
+ }
+ finally {
+ GridUnsafe.monitorExit(entry);
+ }
+
+ entry.onUnlock();
+
+ if (!skip)
+ ctx.evicts().touch(entry, topVer);
+ }
+
/**
* Releases java-level locks on cache entries.
*