You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vo...@apache.org on 2016/03/04 20:26:34 UTC

ignite git commit: Speculation: path for single key update.

Repository: ignite
Updated Branches:
  refs/heads/ignite-atomic-good-lock-bench d4cff4545 -> 573145090


Speculation: path for single key update.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/57314509
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/57314509
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/57314509

Branch: refs/heads/ignite-atomic-good-lock-bench
Commit: 5731450902d936d2b34bfb05469397b2cc65b9f7
Parents: d4cff45
Author: vozerov-gridgain <vo...@gridgain.com>
Authored: Fri Mar 4 22:26:25 2016 +0300
Committer: vozerov-gridgain <vo...@gridgain.com>
Committed: Fri Mar 4 22:26:25 2016 +0300

----------------------------------------------------------------------
 .../dht/atomic/GridDhtAtomicCache.java          | 485 ++++++++++++++++++-
 1 file changed, 483 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/57314509/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
index dad00ed..d7c0e72 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
@@ -1321,6 +1321,185 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
         }
     }
 
+    public void updateAllAsyncInternal1(
+        UUID nodeId,
+        GridNearAtomicUpdateRequest req,
+        CI2<GridNearAtomicUpdateRequest, GridNearAtomicUpdateResponse> completionCb
+    ) {
+        KeyCacheObject key = req.keys().get(0);
+
+        GridNearAtomicUpdateResponse res = new GridNearAtomicUpdateResponse(ctx.cacheId(), nodeId, req.futureVersion(),
+            ctx.deploymentEnabled());
+
+        assert !req.returnValue() || (req.operation() == TRANSFORM);
+
+        GridDhtAtomicUpdateFuture dhtFut = null;
+
+        boolean remap = false;
+
+        String taskName = ctx.kernalContext().task().resolveTaskName(req.taskNameHash());
+
+        IgniteCacheExpiryPolicy expiry = null;
+
+        try {
+            // If batch store update is enabled, we need to lock all entries.
+            // First, need to acquire locks on cache entries, then check filter.
+            GridDhtCacheEntry locked = lockEntry(key, req.topologyVersion());
+
+            Collection<IgniteBiTuple<GridDhtCacheEntry, GridCacheVersion>> deleted = null;
+
+            try {
+                GridDhtPartitionTopology top = topology();
+
+                top.readLock();
+
+                try {
+                    if (top.stopping()) {
+                        res.addFailedKeys(req.keys(), new IgniteCheckedException("Failed to perform cache operation " +
+                            "(cache is stopped): " + name()));
+
+                        completionCb.apply(req, res);
+
+                        return;
+                    }
+
+                    // Do not check topology version for CLOCK versioning since
+                    // partition exchange will wait for near update future (if future is on server node).
+                    // Also do not check topology version if topology was locked on near node by
+                    // external transaction or explicit lock.
+                    if ((req.fastMap() && !req.clientRequest()) || req.topologyLocked() ||
+                        !needRemap(req.topologyVersion(), top.topologyVersion())) {
+                        ClusterNode node = ctx.discovery().node(nodeId);
+
+                        if (node == null) {
+                            U.warn(log, "Node originated update request left grid: " + nodeId);
+
+                            return;
+                        }
+
+                        boolean hasNear = ctx.discovery().cacheNearNode(node, name());
+
+                        GridCacheVersion ver = req.updateVersion();
+
+                        if (ver == null) {
+                            // Assign next version for update inside entries lock.
+                            ver = ctx.versions().next(top.topologyVersion());
+
+                            if (hasNear)
+                                res.nearVersion(ver);
+                        }
+
+                        assert ver != null : "Got null version for update request: " + req;
+
+                        if (log.isDebugEnabled())
+                            log.debug("Using cache version for update request on primary node [ver=" + ver +
+                                ", req=" + req + ']');
+
+                        boolean sndPrevVal = !top.rebalanceFinished(req.topologyVersion());
+
+                        dhtFut = createDhtFuture(ver, req, res, completionCb, false);
+
+                        expiry = expiryPolicy(req.expiry());
+
+                        GridCacheReturn retVal = null;
+
+                        UpdateSingleResult updRes = updateSingle(node,
+                            hasNear,
+                            req,
+                            res,
+                            locked,
+                            ver,
+                            dhtFut,
+                            completionCb,
+                            ctx.isDrEnabled(),
+                            taskName,
+                            expiry,
+                            sndPrevVal);
+
+                        retVal = updRes.returnValue();
+                        deleted = updRes.deleted();
+                        dhtFut = updRes.dhtFuture();
+
+                        if (retVal == null)
+                            retVal = new GridCacheReturn(ctx, node.isLocal(), true, null, true);
+
+                        res.returnValue(retVal);
+
+                        if (req.writeSynchronizationMode() != FULL_ASYNC)
+                            req.cleanup(!node.isLocal());
+
+                        if (dhtFut != null)
+                            ctx.mvcc().addAtomicFuture(dhtFut.version(), dhtFut);
+                    }
+                    else
+                        // Should remap all keys.
+                        remap = true;
+                }
+                finally {
+                    top.readUnlock();
+                }
+            }
+            catch (GridCacheEntryRemovedException e) {
+                assert false : "Entry should not become obsolete while holding lock.";
+
+                e.printStackTrace();
+            }
+            finally {
+                if (locked != null)
+                    unlockEntry(locked, req.topologyVersion());
+
+                // Enqueue if necessary after locks release.
+                if (deleted != null) {
+                    assert !deleted.isEmpty();
+                    assert ctx.deferredDelete() : this;
+
+                    for (IgniteBiTuple<GridDhtCacheEntry, GridCacheVersion> e : deleted)
+                        ctx.onDeferredDelete(e.get1(), e.get2());
+                }
+            }
+        }
+        catch (GridDhtInvalidPartitionException ignore) {
+            assert !req.fastMap() || req.clientRequest() : req;
+
+            if (log.isDebugEnabled())
+                log.debug("Caught invalid partition exception for cache entry (will remap update request): " + req);
+
+            remap = true;
+        }
+        catch (Throwable e) {
+            // At least RuntimeException can be thrown by the code above when GridCacheContext is cleaned and there is
+            // an attempt to use cleaned resources.
+            U.error(log, "Unexpected exception during cache update", e);
+
+            res.addFailedKeys(req.keys(), e);
+
+            completionCb.apply(req, res);
+
+            if (e instanceof Error)
+                throw e;
+
+            return;
+        }
+
+        if (remap) {
+            assert dhtFut == null;
+
+            res.remapKeys(req.keys());
+
+            completionCb.apply(req, res);
+        }
+        else {
+            // If there are backups, map backup update future.
+            if (dhtFut != null)
+                dhtFut.map();
+                // Otherwise, complete the call.
+            else
+                completionCb.apply(req, res);
+        }
+
+        sendTtlUpdateRequest(expiry);
+    }
+
     /**
      * Executes local update after preloader fetched values.
      *
@@ -1333,11 +1512,17 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
         GridNearAtomicUpdateRequest req,
         CI2<GridNearAtomicUpdateRequest, GridNearAtomicUpdateResponse> completionCb
     ) {
+        List<KeyCacheObject> keys = req.keys();
+
+        if (keys.size() == 1) {
+            updateAllAsyncInternal1(nodeId, req, completionCb);
+
+            return;
+        }
+
         GridNearAtomicUpdateResponse res = new GridNearAtomicUpdateResponse(ctx.cacheId(), nodeId, req.futureVersion(),
             ctx.deploymentEnabled());
 
-        List<KeyCacheObject> keys = req.keys();
-
         assert !req.returnValue() || (req.operation() == TRANSFORM || keys.size() == 1);
 
         GridDhtAtomicUpdateFuture dhtFut = null;
@@ -2202,6 +2387,248 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
     }
 
     /**
+     * Updates locked entries one-by-one.
+     *
+     * @param node Originating node.
+     * @param hasNear {@code True} if originating node has near cache.
+     * @param req Update request.
+     * @param res Update response.
+     * @param entry Locked entries.
+     * @param ver Assigned update version.
+     * @param dhtFut Optional DHT future.
+     * @param completionCb Completion callback to invoke when DHT future is completed.
+     * @param replicate Whether DR is enabled for that cache.
+     * @param taskName Task name.
+     * @param expiry Expiry policy.
+     * @param sndPrevVal If {@code true} sends previous value to backups.
+     * @return Return value.
+     * @throws GridCacheEntryRemovedException Should be never thrown.
+     */
+    private UpdateSingleResult updateSingle(
+        ClusterNode node,
+        boolean hasNear,
+        GridNearAtomicUpdateRequest req,
+        GridNearAtomicUpdateResponse res,
+        GridDhtCacheEntry entry,
+        GridCacheVersion ver,
+        @Nullable GridDhtAtomicUpdateFuture dhtFut,
+        CI2<GridNearAtomicUpdateRequest, GridNearAtomicUpdateResponse> completionCb,
+        boolean replicate,
+        String taskName,
+        @Nullable IgniteCacheExpiryPolicy expiry,
+        boolean sndPrevVal
+    ) throws GridCacheEntryRemovedException {
+        GridCacheReturn retVal = null;
+        Collection<IgniteBiTuple<GridDhtCacheEntry, GridCacheVersion>> deleted = null;
+
+        KeyCacheObject k = req.keys().get(0);
+
+        AffinityTopologyVersion topVer = req.topologyVersion();
+
+        boolean checkReaders = hasNear || ctx.discovery().hasNearCache(name(), topVer);
+
+        boolean readersOnly = false;
+
+        boolean intercept = ctx.config().getInterceptor() != null;
+
+        boolean initLsnrs = false;
+        Map<UUID, CacheContinuousQueryListener> lsnrs = null;
+        boolean internal = false;
+
+        GridCacheOperation op = req.operation();
+
+        // We are holding java-level locks on entries at this point.
+        // No GridCacheEntryRemovedException can be thrown.
+        try {
+            if (entry != null) {
+                if (!initLsnrs) {
+                    internal = entry.isInternal() || !context().userCache();
+
+                    lsnrs = ctx.continuousQueries().updateListeners(internal, false);
+
+                    initLsnrs = true;
+                }
+
+                GridCacheVersion newConflictVer = req.conflictVersion(0);
+                long newConflictTtl = req.conflictTtl(0);
+                long newConflictExpireTime = req.conflictExpireTime(0);
+
+                assert !(newConflictVer instanceof GridCacheVersionEx) : newConflictVer;
+
+                boolean primary = !req.fastMap() || ctx.affinity().primary(ctx.localNode(), entry.key(),
+                    req.topologyVersion());
+
+                Object writeVal = op == TRANSFORM ? req.entryProcessor(0) : req.writeValue(0);
+
+                Collection<UUID> readers = null;
+                Collection<UUID> filteredReaders = null;
+
+                if (checkReaders) {
+                    readers = entry.readers();
+                    filteredReaders = F.view(entry.readers(), F.notEqualTo(node.id()));
+                }
+
+                GridCacheUpdateAtomicResult updRes = entry.innerUpdate(
+                    ver,
+                    node.id(),
+                    locNodeId,
+                    op,
+                    writeVal,
+                    req.invokeArguments(),
+                    primary && writeThrough() && !req.skipStore(),
+                    !req.skipStore(),
+                    lsnrs != null || sndPrevVal || req.returnValue(),
+                    req.keepBinary(),
+                    expiry,
+                    true,
+                    true,
+                    primary,
+                    ctx.config().getAtomicWriteOrderMode() == CLOCK, // Check version in CLOCK mode on primary node.
+                    topVer,
+                    req.filter(),
+                    replicate ? primary ? DR_PRIMARY : DR_BACKUP : DR_NONE,
+                    newConflictTtl,
+                    newConflictExpireTime,
+                    newConflictVer,
+                    true,
+                    intercept,
+                    req.subjectId(),
+                    taskName,
+                    null,
+                    null);
+
+                if (dhtFut == null && !F.isEmpty(filteredReaders)) {
+                    dhtFut = createDhtFuture(ver, req, res, completionCb, true);
+
+                    readersOnly = true;
+                }
+
+                if (dhtFut != null) {
+                    dhtFut.listeners(lsnrs);
+
+                    if (updRes.sendToDht()) { // Send to backups even in case of remove-remove scenarios.
+                        GridCacheVersionConflictContext<?, ?> conflictCtx = updRes.conflictResolveResult();
+
+                        if (conflictCtx == null)
+                            newConflictVer = null;
+                        else if (conflictCtx.isMerge())
+                            newConflictVer = null; // Conflict version is discarded in case of merge.
+
+                        EntryProcessor<Object, Object, Object> entryProcessor = null;
+
+                        if (!readersOnly) {
+                            dhtFut.addWriteEntry(entry,
+                                updRes.newValue(),
+                                entryProcessor,
+                                updRes.newTtl(),
+                                updRes.conflictExpireTime(),
+                                newConflictVer,
+                                sndPrevVal,
+                                updRes.oldValue(),
+                                updRes.updateCounter());
+                        }
+
+                        if (!F.isEmpty(filteredReaders))
+                            dhtFut.addNearWriteEntries(filteredReaders,
+                                entry,
+                                updRes.newValue(),
+                                entryProcessor,
+                                updRes.newTtl(),
+                                updRes.conflictExpireTime());
+                    }
+                    else {
+                        if (log.isDebugEnabled())
+                            log.debug("Entry did not pass the filter or conflict resolution (will skip write) " +
+                                "[entry=" + entry + ", filter=" + Arrays.toString(req.filter()) + ']');
+                    }
+                }
+                else if (lsnrs != null && updRes.success()) {
+                    ctx.continuousQueries().onEntryUpdated(
+                        lsnrs,
+                        entry.key(),
+                        updRes.newValue(),
+                        updRes.oldValue(),
+                        internal,
+                        entry.partition(),
+                        primary,
+                        false,
+                        updRes.updateCounter(),
+                        topVer);
+                }
+
+                if (hasNear) {
+                    if (primary && updRes.sendToDht()) {
+                        if (!ctx.affinity().belongs(node, entry.partition(), topVer)) {
+                            // If put the same value as in request then do not need to send it back.
+                            if (op == TRANSFORM || writeVal != updRes.newValue()) {
+                                res.addNearValue(0,
+                                    updRes.newValue(),
+                                    updRes.newTtl(),
+                                    updRes.conflictExpireTime());
+                            }
+                            else
+                                res.addNearTtl(0, updRes.newTtl(), updRes.conflictExpireTime());
+
+                            if (updRes.newValue() != null) {
+                                IgniteInternalFuture<Boolean> f = entry.addReader(node.id(), req.messageId(), topVer);
+
+                                assert f == null : f;
+                            }
+                        }
+                        else if (F.contains(readers, node.id())) // Reader became primary or backup.
+                            entry.removeReader(node.id(), req.messageId());
+                        else
+                            res.addSkippedIndex(0);
+                    }
+                    else
+                        res.addSkippedIndex(0);
+                }
+
+                if (updRes.removeVersion() != null) {
+                    if (deleted == null)
+                        deleted = new ArrayList<>(1);
+
+                    deleted.add(F.t(entry, updRes.removeVersion()));
+                }
+
+                if (op == TRANSFORM) {
+                    assert !req.returnValue();
+
+                    IgniteBiTuple<Object, Exception> compRes = updRes.computedResult();
+
+                    if (compRes != null && (compRes.get1() != null || compRes.get2() != null)) {
+                        if (retVal == null)
+                            retVal = new GridCacheReturn(node.isLocal());
+
+                        retVal.addEntryProcessResult(ctx,
+                            k,
+                            null,
+                            compRes.get1(),
+                            compRes.get2());
+                    }
+                }
+                else {
+                    // Create only once.
+                    if (retVal == null) {
+                        CacheObject ret = updRes.oldValue();
+
+                        retVal = new GridCacheReturn(ctx,
+                            node.isLocal(),
+                            req.keepBinary(),
+                            req.returnValue() ? ret : null,
+                            updRes.success());
+                    }
+                }
+            }
+        }
+        catch (IgniteCheckedException e) {
+            res.addFailedKey(k, e);
+        }
+
+        return new UpdateSingleResult(retVal, deleted, dhtFut);
+    }
+
+    /**
      * @param hasNear {@code True} if originating node has near cache.
      * @param firstEntryIdx Index of the first entry in the request keys collection.
      * @param entries Entries to update.
@@ -2567,6 +2994,60 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
         }
     }
 
+    private GridDhtCacheEntry lockEntry(KeyCacheObject key, AffinityTopologyVersion topVer)
+        throws GridDhtInvalidPartitionException {
+        while (true) {
+            try {
+                GridDhtCacheEntry entry = entryExx(key, topVer);
+
+                GridUnsafe.monitorEnter(entry);
+
+                if (entry.obsolete())
+                    GridUnsafe.monitorExit(entry);
+                else
+                    return entry;
+            }
+            catch (GridDhtInvalidPartitionException e) {
+                // Ignore invalid partition exception in CLOCK ordering mode.
+                if (ctx.config().getAtomicWriteOrderMode() == CLOCK)
+                    return null;
+                else
+                    throw e;
+            }
+        }
+    }
+
+    /**
+     * Releases java-level locks on cache entries.
+     *
+     * @param entry Locked entries.
+     * @param topVer Topology version.
+     */
+    private void unlockEntry(GridDhtCacheEntry entry, AffinityTopologyVersion topVer) {
+        if (entry == null)
+            return;
+
+        // Process deleted entries before locks release.
+        assert ctx.deferredDelete() : this;
+
+        // Entries to skip eviction manager notification for.
+        // Enqueue entries while holding locks.
+        boolean skip = false;
+
+        try {
+            if (entry.deleted())
+                skip = true;
+        }
+        finally {
+            GridUnsafe.monitorExit(entry);
+        }
+
+        entry.onUnlock();
+
+        if (!skip)
+            ctx.evicts().touch(entry, topVer);
+    }
+
     /**
      * Releases java-level locks on cache entries.
      *