You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2017/03/17 15:04:52 UTC

[07/10] ignite git commit: tmp

tmp


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/5f518395
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/5f518395
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/5f518395

Branch: refs/heads/ignite-4680-sb
Commit: 5f51839525a839c1eec9c28aa7772cc9f1bc59c1
Parents: 3e7ee08
Author: sboikov <sb...@gridgain.com>
Authored: Fri Mar 17 15:54:56 2017 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Fri Mar 17 15:54:56 2017 +0300

----------------------------------------------------------------------
 .../managers/communication/GridIoManager.java   | 33 ++++++++++++++++++++
 .../dht/atomic/GridDhtAtomicCache.java          | 15 ++++++---
 .../atomic/GridNearAtomicFullUpdateRequest.java |  1 +
 .../apache/ignite/internal/util/MPSCQueue.java  |  8 ++---
 4 files changed, 49 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/5f518395/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
index 39c514b..6dad30b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
@@ -55,11 +55,13 @@ import org.apache.ignite.internal.managers.deployment.GridDeployment;
 import org.apache.ignite.internal.managers.eventstorage.GridEventStorageManager;
 import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener;
 import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicFullUpdateRequest;
+import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicUpdateResponse;
 import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.NearAtomicResponseHelper;
 import org.apache.ignite.internal.processors.platform.message.PlatformMessageFilter;
 import org.apache.ignite.internal.processors.pool.PoolProcessor;
 import org.apache.ignite.internal.processors.timeout.GridTimeoutObject;
 import org.apache.ignite.internal.util.GridBoundedConcurrentLinkedHashSet;
+import org.apache.ignite.internal.util.MPSCQueue;
 import org.apache.ignite.internal.util.StripedCompositeReadWriteLock;
 import org.apache.ignite.internal.util.StripedExecutor;
 import org.apache.ignite.internal.util.future.GridFinishedFuture;
@@ -201,6 +203,10 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
         }
     };
 
+    private Thread resThread;
+
+    private MPSCQueue<Runnable> q;
+
     /**
      * @param ctx Grid kernal context.
      */
@@ -221,6 +227,26 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
         synchronized (sysLsnrsMux) {
             sysLsnrs = new GridMessageListener[GridTopic.values().length];
         }
+
+        resThread = new Thread() {
+            public void run() {
+                while (true) {
+                    try {
+                        Runnable r = q.take();
+
+                        r.run();
+                    }
+                    catch (Exception e) {
+                        e.printStackTrace();
+                    }
+                }
+            }
+        };
+
+        q = new MPSCQueue<>(resThread);
+
+        resThread.setDaemon(true);
+        resThread.start();
     }
 
     /**
@@ -823,6 +849,13 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
             return;
         }
 
+//        if (msg.message() instanceof GridNearAtomicUpdateResponse) {
+//            q.add(c);
+//
+//            return;
+//        }
+
+
         if (plc == GridIoPolicy.SYSTEM_POOL &&
             (msg.partition() != Integer.MIN_VALUE ||
                 msg.message().directType() == GridNearAtomicFullUpdateRequest.DIRECT_TYPE)) {

http://git-wip-us.apache.org/repos/asf/ignite/blob/5f518395/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
index 973256f..dcc79d0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
@@ -1761,6 +1761,9 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
         completionCb.apply(req, res);
     }
 
+    private GridCacheVersion ver;
+
+
     /**
      * Executes local update after preloader fetched values.
      *
@@ -1837,13 +1840,14 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
 
                     // Do not check topology version if topology was locked on near node by
                     // external transaction or explicit lock.
-                    if (req.topologyLocked() || !needRemap(req.topologyVersion(), top.topologyVersion())) {
+                    if (true || req.topologyLocked() || !needRemap(req.topologyVersion(), top.topologyVersion())) {
                         locked = lockEntries(req, req.topologyVersion(), stripeIdxs);
 
                         boolean hasNear = ctx.discovery().cacheNearNode(node, name());
 
                         // Assign next version for update inside entries lock.
-                        GridCacheVersion ver = ctx.versions().next(top.topologyVersion());
+                        if (ver == null)
+                            ver = ctx.versions().next(top.topologyVersion());
 
                         if (hasNear)
                             res.nearVersion(ver);
@@ -1859,7 +1863,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
 
                         int size = stripeIdxs == null ? req.size() : stripeIdxs.length;
 
-                        dhtFut = createDhtFuture(ver, req, size);
+                        dhtFut = null;//createDhtFuture(ver, req, size);
 
                         expiry = expiryPolicy(req.expiry());
 
@@ -1977,9 +1981,12 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
 
             completionCb.apply(req, res);
         }
-        else
+        else {
             if (dhtFut != null)
                 dhtFut.map(node, res.returnValue(), res, completionCb);
+            else
+                completionCb.apply(req, res);
+        }
 
         if (req.writeSynchronizationMode() != FULL_ASYNC)
             req.cleanup(!node.isLocal());

http://git-wip-us.apache.org/repos/asf/ignite/blob/5f518395/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicFullUpdateRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicFullUpdateRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicFullUpdateRequest.java
index 2e619ee..ce6035e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicFullUpdateRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicFullUpdateRequest.java
@@ -406,6 +406,7 @@ public class GridNearAtomicFullUpdateRequest extends GridNearAtomicAbstractUpdat
 
     /** {@inheritDoc} */
     @Override @Nullable public Map<Integer, int[]> stripeMap() {
+        //stripeMap = null;
         return stripeMap;
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/5f518395/modules/core/src/main/java/org/apache/ignite/internal/util/MPSCQueue.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/MPSCQueue.java b/modules/core/src/main/java/org/apache/ignite/internal/util/MPSCQueue.java
index 5505b3a..5725390 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/MPSCQueue.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/MPSCQueue.java
@@ -41,7 +41,7 @@ public final class MPSCQueue<E> extends AbstractQueue<E> implements BlockingQueu
     /** */
     final AtomicReference<Node> putStack = new AtomicReference<>();
     /** */
-    private final AtomicInteger takeStackSize = new AtomicInteger();
+    //private final AtomicInteger takeStackSize = new AtomicInteger();
 
     /** */
     private Thread consumerThread;
@@ -189,7 +189,7 @@ public final class MPSCQueue<E> extends AbstractQueue<E> implements BlockingQueu
     private void dequeue() {
         takeStack[takeStackIndex] = null;
         takeStackIndex++;
-        takeStackSize.lazySet(takeStackSize.get() - 1);
+        //takeStackSize.lazySet(takeStackSize.get() - 1);
     }
 
     /** */
@@ -248,7 +248,7 @@ public final class MPSCQueue<E> extends AbstractQueue<E> implements BlockingQueu
     private void copyIntoTakeStack(Node putStackHead) {
         int putStackSize = putStackHead.size;
 
-        takeStackSize.lazySet(putStackSize);
+        //takeStackSize.lazySet(putStackSize);
 
         if (putStackSize > takeStack.length)
             takeStack = new Object[nextPowerOfTwo(putStackHead.size)];
@@ -270,7 +270,7 @@ public final class MPSCQueue<E> extends AbstractQueue<E> implements BlockingQueu
     @Override public int size() {
         Node h = putStack.get();
         int putStackSize = h == null ? 0 : h.size;
-        return putStackSize + takeStackSize.get();
+        return putStackSize + 0;//takeStackSize.get();
     }
 
     /** {@inheritDoc}. */