You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2017/03/17 15:04:53 UTC

[08/10] ignite git commit: tmp

tmp


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/3200c2e4
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/3200c2e4
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/3200c2e4

Branch: refs/heads/ignite-4680-sb
Commit: 3200c2e43a9b2cee1d35a19f749de50f19f5d0d4
Parents: 5f51839
Author: sboikov <sb...@gridgain.com>
Authored: Fri Mar 17 16:59:26 2017 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Fri Mar 17 17:46:01 2017 +0300

----------------------------------------------------------------------
 .../managers/communication/GridIoManager.java   |  36 +--
 .../dht/atomic/GridDhtAtomicCache.java          | 265 +++++++++----------
 .../dht/atomic/NearAtomicResponseHelper.java    |  29 +-
 3 files changed, 151 insertions(+), 179 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/3200c2e4/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
index 6dad30b..17ae595 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
@@ -856,23 +856,25 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
 //        }
 
 
-        if (plc == GridIoPolicy.SYSTEM_POOL &&
-            (msg.partition() != Integer.MIN_VALUE ||
-                msg.message().directType() == GridNearAtomicFullUpdateRequest.DIRECT_TYPE)) {
-            Map<Integer, int[]> stripemap = msg.message().directType() == GridNearAtomicFullUpdateRequest.DIRECT_TYPE ?
-                ((GridNearAtomicFullUpdateRequest)msg.message()).stripeMap() : null;
-
-            if (stripemap != null) {
-                GridNearAtomicFullUpdateRequest msg0 = ((GridNearAtomicFullUpdateRequest)msg.message());
-
-                msg0.responseHelper(new NearAtomicResponseHelper(stripemap.keySet()));
-
-                for (Integer stripe : stripemap.keySet()) {
-                    stripedExecutor.execute(stripe, c);
-                }
-            }
-            else
-                stripedExecutor.execute(msg.partition(), c);
+        if (plc == GridIoPolicy.SYSTEM_POOL && (msg.partition() != Integer.MIN_VALUE)) {
+//            Map<Integer, int[]> stripemap = msg.message().directType() == GridNearAtomicFullUpdateRequest.DIRECT_TYPE ?
+//                ((GridNearAtomicFullUpdateRequest)msg.message()).stripeMap() : null;
+//
+//            if (stripemap != null) {
+//                GridNearAtomicFullUpdateRequest msg0 = ((GridNearAtomicFullUpdateRequest)msg.message());
+//
+//                msg0.responseHelper(new NearAtomicResponseHelper(stripemap.keySet()));
+//
+//                for (Integer stripe : stripemap.keySet()) {
+//                    stripedExecutor.execute(stripe, c);
+//                }
+//            }
+//            else
+
+//            if (msg.message().directType() == GridNearAtomicFullUpdateRequest.DIRECT_TYPE)
+//                stripedExecutor.execute(((GridNearAtomicFullUpdateRequest)msg.message()).stripeMap().keySet().iterator().next(), c);
+//            else
+            stripedExecutor.execute(msg.partition(), c);
 
             return;
         }

http://git-wip-us.apache.org/repos/asf/ignite/blob/3200c2e4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
index dcc79d0..bcfea79 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
@@ -213,14 +213,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
             @SuppressWarnings("ThrowableResultOfMethodCallIgnored")
             @Override public void apply(GridNearAtomicAbstractUpdateRequest req, GridNearAtomicUpdateResponse res) {
                 if (req.writeSynchronizationMode() != FULL_ASYNC) {
-                    if (req.responseHelper() != null) {
-                        GridNearAtomicUpdateResponse res0 = req.responseHelper().addResponse(res);
-
-                        if (res0 != null)
-                            sendNearUpdateReply(res.nodeId(), res0);
-                    }
-                    else
-                        sendNearUpdateReply(res.nodeId(), res);
+                    sendNearUpdateReply(res.nodeId(), res);
                 }
                 else {
                     if (res.remapTopologyVersion() != null)
@@ -1684,22 +1677,13 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
         final int stripeIdx,
         final UpdateReplyClosure completionCb
     ) {
-        IgniteInternalFuture<Object> forceFut;
-
-        if (stripeIdx != IgniteThread.GRP_IDX_UNASSIGNED
-            && req.directType() == GridNearAtomicFullUpdateRequest.DIRECT_TYPE
-            && req.stripeMap() != null) {
-            int[] stripeIdxs = req.stripeMap().get(stripeIdx);
+//        if (true) {
+//            updateAllAsyncInternal0(nodeId, req, ((IgniteThread)Thread.currentThread()).stripe(), completionCb);
+//
+//            return;
+//        }
 
-            List<KeyCacheObject> keys = new ArrayList<>(stripeIdxs.length);
-
-            for (int i = 0; i < stripeIdxs.length; i++)
-                keys.add(req.key(stripeIdxs[i]));
-
-            forceFut = preldr.request(keys, req.topologyVersion());
-        }
-        else
-            forceFut = preldr.request(req, req.topologyVersion());
+        IgniteInternalFuture<Object> forceFut = preldr.request(req, req.topologyVersion());
 
         if (forceFut == null || forceFut.isDone()) {
             try {
@@ -1715,9 +1699,12 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
                 return;
             }
 
-            updateAllAsyncInternal0(nodeId, req, stripeIdx, completionCb);
+            updateAllAsyncInternal0(nodeId, req, ((IgniteThread)Thread.currentThread()).stripe(), completionCb);
         }
         else {
+            if (true)
+                throw new RuntimeException("error");
+
             forceFut.listen(new CI1<IgniteInternalFuture<Object>>() {
                 @Override public void apply(IgniteInternalFuture<Object> fut) {
                     try {
@@ -1761,9 +1748,6 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
         completionCb.apply(req, res);
     }
 
-    private GridCacheVersion ver;
-
-
     /**
      * Executes local update after preloader fetched values.
      *
@@ -1774,11 +1758,11 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
      */
     private void updateAllAsyncInternal0(
         UUID nodeId,
-        GridNearAtomicAbstractUpdateRequest req,
+        final GridNearAtomicAbstractUpdateRequest req,
         int stripeIdx,
-        UpdateReplyClosure completionCb
+        final UpdateReplyClosure completionCb
     ) {
-        ClusterNode node = ctx.discovery().node(nodeId);
+        final ClusterNode node = ctx.discovery().node(nodeId);
 
         if (node == null) {
             U.warn(msgLog, "Skip near update request, node originated update request left [" +
@@ -1787,25 +1771,6 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
             return;
         }
 
-        GridNearAtomicUpdateResponse res = new GridNearAtomicUpdateResponse(ctx.cacheId(),
-            nodeId,
-            req.futureId(),
-            req.partition(),
-            false,
-            ctx.deploymentEnabled());
-
-        res.partition(req.partition());
-
-        int[] stripeIdxs = null;
-
-        if (stripeIdx != IgniteThread.GRP_IDX_UNASSIGNED
-            && req.directType() == GridNearAtomicFullUpdateRequest.DIRECT_TYPE
-            && req.stripeMap() != null) {
-            stripeIdxs = req.stripeMap().get(stripeIdx);
-
-            res.stripe(stripeIdx);
-        }
-
         assert !req.returnValue() || (req.operation() == TRANSFORM || req.size() == 1);
 
         GridDhtAtomicAbstractUpdateFuture dhtFut = null;
@@ -1830,103 +1795,41 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
 
                 try {
                     if (top.stopping()) {
-                        addAllKeysAsFailed(req, res, stripeIdxs, new IgniteCheckedException("Failed to perform cache operation " +
-                            "(cache is stopped): " + name()));
-
-                        completionCb.apply(req, res);
 
                         return;
                     }
 
                     // Do not check topology version if topology was locked on near node by
                     // external transaction or explicit lock.
-                    if (true || req.topologyLocked() || !needRemap(req.topologyVersion(), top.topologyVersion())) {
-                        locked = lockEntries(req, req.topologyVersion(), stripeIdxs);
-
-                        boolean hasNear = ctx.discovery().cacheNearNode(node, name());
-
-                        // Assign next version for update inside entries lock.
-                        if (ver == null)
-                            ver = ctx.versions().next(top.topologyVersion());
-
-                        if (hasNear)
-                            res.nearVersion(ver);
-
-                        if (msgLog.isDebugEnabled()) {
-                            msgLog.debug("Assigned update version [futId=" + req.futureId() +
-                                ", writeVer=" + ver + ']');
-                        }
-
-                        assert ver != null : "Got null version for update request: " + req;
-
-                        boolean sndPrevVal = !top.rebalanceFinished(req.topologyVersion());
-
-                        int size = stripeIdxs == null ? req.size() : stripeIdxs.length;
-
-                        dhtFut = null;//createDhtFuture(ver, req, size);
-
-                        expiry = expiryPolicy(req.expiry());
+                    if (req.topologyLocked() || !needRemap(req.topologyVersion(), top.topologyVersion())) {
+                        Map<Integer, int[]> stripemap = req.stripeMap();
 
-                        GridCacheReturn retVal = null;
+                        final GridDhtAtomicAbstractUpdateFuture fut = createDhtFuture(null, req, req.size());
 
-                        if (size > 1 &&                           // Several keys ...
-                            writeThrough() && !req.skipStore() && // and store is enabled ...
-                            !ctx.store().isLocal() &&             // and this is not local store ...
-                                                                  // (conflict resolver should be used for local store)
-                            !ctx.dr().receiveEnabled()            // and no DR.
-                            ) {
-                            // This method can only be used when there are no replicated entries in the batch.
-                            UpdateBatchResult updRes = updateWithBatch(node,
-                                hasNear,
-                                req,
-                                res,
-                                locked,
-                                ver,
-                                dhtFut,
-                                ctx.isDrEnabled(),
-                                taskName,
-                                expiry,
-                                sndPrevVal,
-                                stripeIdxs);
-
-                            deleted = updRes.deleted();
-                            dhtFut = updRes.dhtFuture();
-
-                            if (req.operation() == TRANSFORM)
-                                retVal = updRes.invokeResults();
-                        }
-                        else {
-                            UpdateSingleResult updRes = updateSingle(node,
-                                hasNear,
-                                req,
-                                res,
-                                locked,
-                                ver,
-                                dhtFut,
-                                ctx.isDrEnabled(),
-                                taskName,
-                                expiry,
-                                sndPrevVal,
-                                stripeIdxs);
+                        ((GridNearAtomicFullUpdateRequest)req).responseHelper(new NearAtomicResponseHelper(stripemap.size()));
 
-                            retVal = updRes.returnValue();
-                            deleted = updRes.deleted();
-                            dhtFut = updRes.dhtFuture();
+                        for (final Map.Entry<Integer, int[]> e : stripemap.entrySet()) {
+                            if (stripeIdx == e.getKey())
+                                update(fut, node, req, e.getValue(), completionCb);
+                            else {
+                                ctx.kernalContext().getStripedExecutorService().execute(e.getKey(), new Runnable() {
+                                    @Override public void run() {
+                                        try {
+                                            update(fut, node, req, e.getValue(), completionCb);
+                                        }
+                                        catch (Exception e) {
+                                            e.printStackTrace();
+                                        }
+                                    }
+                                });
+                            }
                         }
-
-                        if (retVal == null)
-                            retVal = new GridCacheReturn(ctx, node.isLocal(), true, null, true);
-
-                        res.returnValue(retVal);
-
-                        if (dhtFut != null)
-                            ctx.mvcc().addAtomicFuture(dhtFut.id(), dhtFut);
                     }
                     else {
                         // Should remap all keys.
                         remap = true;
 
-                        res.remapTopologyVersion(top.topologyVersion());
+                        //res.remapTopologyVersion(top.topologyVersion());
                     }
                 }
                 finally {
@@ -1958,16 +1861,16 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
 
             remap = true;
 
-            res.remapTopologyVersion(ctx.topology().topologyVersion());
+            //res.remapTopologyVersion(ctx.topology().topologyVersion());
         }
         catch (Throwable e) {
             // At least RuntimeException can be thrown by the code above when GridCacheContext is cleaned and there is
             // an attempt to use cleaned resources.
             U.error(log, "Unexpected exception during cache update", e);
 
-            addAllKeysAsFailed(req, res, stripeIdxs, e);
+            //addAllKeysAsFailed(req, res, stripeIdxs, e);
 
-            completionCb.apply(req, res);
+            //completionCb.apply(req, res);
 
             if (e instanceof Error)
                 throw e;
@@ -1975,23 +1878,91 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
             return;
         }
 
-        if (remap) {
-            assert dhtFut == null;
-            res.stripe(-1);
+//        if (remap) {
+//            assert dhtFut == null;
+//            res.stripe(-1);
+//
+//            completionCb.apply(req, res);
+//        }
+//        else {
+//            if (dhtFut != null)
+//                dhtFut.map(node, res.returnValue(), res, completionCb);
+//            else
+//                completionCb.apply(req, res);
+//        }
+//
+//        if (req.writeSynchronizationMode() != FULL_ASYNC)
+//            req.cleanup(!node.isLocal());
+//
+//        sendTtlUpdateRequest(expiry);
+    }
+
+    private void update(
+        GridDhtAtomicAbstractUpdateFuture fut,
+        ClusterNode node,
+        GridNearAtomicAbstractUpdateRequest req,
+        int[] stripeIdxs,
+        UpdateReplyClosure completionCb) throws GridCacheEntryRemovedException {
+        GridNearAtomicUpdateResponse res = new GridNearAtomicUpdateResponse(ctx.cacheId(),
+            node.id(),
+            req.futureId(),
+            req.partition(),
+            false,
+            ctx.deploymentEnabled());
 
-            completionCb.apply(req, res);
-        }
-        else {
-            if (dhtFut != null)
-                dhtFut.map(node, res.returnValue(), res, completionCb);
-            else
-                completionCb.apply(req, res);
+        List<GridDhtCacheEntry> locked = lockEntries(req, req.topologyVersion(), stripeIdxs);
+
+        boolean hasNear = ctx.discovery().cacheNearNode(node, name());
+
+        // Assign next version for update inside entries lock.
+        //if (ver == null)
+        GridCacheVersion ver = ctx.versions().next(ctx.topology().topologyVersion());
+
+        if (hasNear)
+            res.nearVersion(ver);
+
+        if (msgLog.isDebugEnabled()) {
+            msgLog.debug("Assigned update version [futId=" + req.futureId() +
+                ", writeVer=" + ver + ']');
         }
 
-        if (req.writeSynchronizationMode() != FULL_ASYNC)
-            req.cleanup(!node.isLocal());
+        assert ver != null : "Got null version for update request: " + req;
+
+        boolean sndPrevVal = false;//!top.rebalanceFinished(req.topologyVersion());
+
+        int size = stripeIdxs == null ? req.size() : stripeIdxs.length;
+
+        GridCacheReturn retVal = null;
+
+        UpdateSingleResult updRes = updateSingle(node,
+            hasNear,
+            req,
+            res,
+            locked,
+            ver,
+            null,
+            ctx.isDrEnabled(),
+            null,
+            null,
+            sndPrevVal,
+            stripeIdxs);
+
+        retVal = updRes.returnValue();
+
+        if (retVal == null)
+            retVal = new GridCacheReturn(ctx, node.isLocal(), true, null, true);
 
-        sendTtlUpdateRequest(expiry);
+        res.returnValue(retVal);
+
+        unlockEntries(locked, null);
+
+        GridNearAtomicUpdateResponse res0 = req.responseHelper().addResponse(res);
+
+        if (res0 != null) {
+            fut.onDone();
+
+            completionCb.apply(req, res);
+        }
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/3200c2e4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/NearAtomicResponseHelper.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/NearAtomicResponseHelper.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/NearAtomicResponseHelper.java
index 00c9f6c..55c450c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/NearAtomicResponseHelper.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/NearAtomicResponseHelper.java
@@ -17,8 +17,7 @@
 
 package org.apache.ignite.internal.processors.cache.distributed.dht.atomic;
 
-import java.util.HashSet;
-import java.util.Set;
+import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
 
 /**
  *
@@ -28,14 +27,16 @@ public class NearAtomicResponseHelper {
     /** */
     private GridNearAtomicUpdateResponse res;
 
+    private static final AtomicIntegerFieldUpdater<NearAtomicResponseHelper> UPD =
+        AtomicIntegerFieldUpdater.newUpdater(NearAtomicResponseHelper.class, "cnt");
+
     /** */
-    private Set<Integer> stripes;
+    private volatile int cnt;
 
     /**
-     * @param stripes Stripes collection.
      */
-    public NearAtomicResponseHelper(Set<Integer> stripes) {
-        this.stripes = new HashSet<>(stripes);
+    public NearAtomicResponseHelper(int cnt) {
+        this.cnt = cnt;
     }
 
     /**
@@ -43,18 +44,16 @@ public class NearAtomicResponseHelper {
      * @return {@code true} if all responses added.
      */
     public GridNearAtomicUpdateResponse addResponse(GridNearAtomicUpdateResponse res) {
-        synchronized (this) {
-            if (res.stripe() == -1)
-                return res;
+        this.res = res;
 
-            if (stripes.remove(res.stripe())) {
-                mergeResponse(res);
+        int c = UPD.decrementAndGet(this);
 
-                return stripes.isEmpty() ? this.res : null;
-            }
+        //mergeResponse(res);
 
-            return null;
-        }
+        if (c == 0)
+            return this.res;
+
+        return null;
     }
 
     /**