You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vo...@apache.org on 2016/03/04 10:21:25 UTC

[5/6] ignite git commit: IGNITE-2745: Avoid CHM lookup for listeners on default GridTopic's.

IGNITE-2745: Avoid CHM lookup for listeners on default GridTopic's.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/1c4cfb68
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/1c4cfb68
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/1c4cfb68

Branch: refs/heads/ignite-atomic-good-lock-bench
Commit: 1c4cfb6867f0b8687b6def4087221822c52a0f7b
Parents: 7c5db21
Author: vozerov-gridgain <vo...@gridgain.com>
Authored: Fri Mar 4 12:19:53 2016 +0300
Committer: vozerov-gridgain <vo...@gridgain.com>
Committed: Fri Mar 4 12:19:53 2016 +0300

----------------------------------------------------------------------
 .../managers/communication/GridIoManager.java   | 183 +++++++++++++++++--
 1 file changed, 172 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/1c4cfb68/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
index 36be9ec..232ec2e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
@@ -37,6 +37,7 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
+
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteException;
 import org.apache.ignite.cluster.ClusterNode;
@@ -116,6 +117,12 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
     /** Listeners by topic. */
     private final ConcurrentMap<Object, GridMessageListener> lsnrMap = new ConcurrentHashMap8<>();
 
+    /** System listeners. */
+    private volatile GridMessageListener[] sysLsnrs;
+
+    /** Mutex for system listeners. */
+    private final Object sysLsnrsMux = new Object();
+
     /** Disconnect listeners. */
     private final Collection<GridDisconnectListener> disconnectLsnrs = new ConcurrentLinkedQueue<>();
 
@@ -201,6 +208,10 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
         discoDelay = ctx.config().getDiscoveryStartupDelay();
 
         marsh = ctx.config().getMarshaller();
+
+        synchronized (sysLsnrsMux) {
+            sysLsnrs = new GridMessageListener[GridTopic.values().length];
+        }
     }
 
     /**
@@ -733,7 +744,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
                 try {
                     threadProcessingMessage(true);
 
-                    GridMessageListener lsnr = lsnrMap.get(msg.topic());
+                    GridMessageListener lsnr = listenerGet0(msg.topic());
 
                     if (lsnr == null)
                         return;
@@ -810,7 +821,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
      */
     @SuppressWarnings("deprecation")
     private void processRegularMessage0(GridIoMessage msg, UUID nodeId) {
-        GridMessageListener lsnr = lsnrMap.get(msg.topic());
+        GridMessageListener lsnr = listenerGet0(msg.topic());
 
         if (lsnr == null)
             return;
@@ -823,6 +834,156 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
     }
 
     /**
+     * Get listener.
+     *
+     * @param topic Topic.
+     * @return Listener.
+     */
+    @Nullable private GridMessageListener listenerGet0(Object topic) {
+        if (topic instanceof GridTopic)
+            return sysLsnrs[systemListenerIndex(topic)];
+        else
+            return lsnrMap.get(topic);
+    }
+
+    /**
+     * Put listener if it is absent.
+     *
+     * @param topic Topic.
+     * @param lsnr Listener.
+     * @return Old listener (if any).
+     */
+    @Nullable private GridMessageListener listenerPutIfAbsent0(Object topic, GridMessageListener lsnr) {
+        if (topic instanceof GridTopic) {
+            synchronized (sysLsnrsMux) {
+                int idx = systemListenerIndex(topic);
+
+                GridMessageListener old = sysLsnrs[idx];
+
+                if (old == null)
+                    changeSystemListener(idx, lsnr);
+
+                return old;
+            }
+        }
+        else
+            return lsnrMap.putIfAbsent(topic, lsnr);
+    }
+
+    /**
+     * Remove listener.
+     *
+     * @param topic Topic.
+     * @return Removed listener (if any).
+     */
+    @Nullable private GridMessageListener listenerRemove0(Object topic) {
+        if (topic instanceof GridTopic) {
+            synchronized (sysLsnrsMux) {
+                int idx = systemListenerIndex(topic);
+
+                GridMessageListener old = sysLsnrs[idx];
+
+                if (old != null)
+                    changeSystemListener(idx, null);
+
+                return old;
+            }
+        }
+        else
+            return lsnrMap.remove(topic);
+    }
+
+    /**
+     * Remove listener if it matches expected value.
+     *
+     * @param topic Topic.
+     * @param expected Listener.
+     * @return Result.
+     */
+    private boolean listenerRemove0(Object topic, GridMessageListener expected) {
+        if (topic instanceof GridTopic) {
+            synchronized (sysLsnrsMux) {
+                return systemListenerChange(topic, expected, null);
+            }
+        }
+        else
+            return lsnrMap.remove(topic, expected);
+    }
+
+    /**
+     * Replace listener.
+     *
+     * @param topic Topic.
+     * @param expected Old value.
+     * @param newVal New value.
+     * @return Result.
+     */
+    private boolean listenerReplace0(Object topic, GridMessageListener expected, GridMessageListener newVal) {
+        if (topic instanceof GridTopic) {
+            synchronized (sysLsnrsMux) {
+                return systemListenerChange(topic, expected, newVal);
+            }
+        }
+        else
+            return lsnrMap.replace(topic, expected, newVal);
+    }
+
+    /**
+     * Change system listener.
+     *
+     * @param topic Topic.
+     * @param expected Expected value.
+     * @param newVal New value.
+     * @return Result.
+     */
+    private boolean systemListenerChange(Object topic, GridMessageListener expected, GridMessageListener newVal) {
+        assert Thread.holdsLock(sysLsnrsMux);
+        assert topic instanceof GridTopic;
+
+        int idx = systemListenerIndex(topic);
+
+        GridMessageListener old = sysLsnrs[idx];
+
+        if (old != null && old.equals(expected)) {
+            changeSystemListener(idx, newVal);
+
+            return true;
+        }
+
+        return false;
+    }
+
+    /**
+     * Change systme listener at the given index.
+     *
+     * @param idx Index.
+     * @param lsnr Listener.
+     */
+    private void changeSystemListener(int idx, @Nullable GridMessageListener lsnr) {
+        assert Thread.holdsLock(sysLsnrsMux);
+
+        GridMessageListener[] res = new GridMessageListener[sysLsnrs.length];
+
+        System.arraycopy(sysLsnrs, 0, res, 0, sysLsnrs.length);
+
+        res[idx] = lsnr;
+
+        sysLsnrs = res;
+    }
+
+    /**
+     * Get index of a system listener.
+     *
+     * @param topic Topic.
+     * @return Index.
+     */
+    private int systemListenerIndex(Object topic) {
+        assert topic instanceof GridTopic;
+
+        return ((GridTopic)topic).ordinal();
+    }
+
+    /**
      * @param nodeId Node ID.
      * @param msg Ordered message.
      * @param plc Execution policy.
@@ -928,7 +1089,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
         if (isNew && set.endTime() != Long.MAX_VALUE)
             ctx.timeout().addTimeoutObject(set);
 
-        final GridMessageListener lsnr = lsnrMap.get(msg.topic());
+        final GridMessageListener lsnr = listenerGet0(msg.topic());
 
         if (lsnr == null) {
             if (closedTopics.contains(msg.topic())) {
@@ -1537,7 +1698,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
         GridMessageListener lsnrs;
 
         for (;;) {
-            lsnrs = lsnrMap.putIfAbsent(topic, lsnr);
+            lsnrs = listenerPutIfAbsent0(topic, lsnr);
 
             if (lsnrs == null) {
                 lsnrs = lsnr;
@@ -1550,7 +1711,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
             if (!(lsnrs instanceof ArrayListener)) { // We are putting the second listener, creating array.
                 GridMessageListener arrLsnr = new ArrayListener(lsnrs, lsnr);
 
-                if (lsnrMap.replace(topic, lsnrs, arrLsnr)) {
+                if (listenerReplace0(topic, lsnrs, arrLsnr)) {
                     lsnrs = arrLsnr;
 
                     break;
@@ -1561,7 +1722,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
                     break;
 
                 // Add operation failed because array is already empty and is about to be removed, helping and retrying.
-                lsnrMap.remove(topic, lsnrs);
+                listenerRemove0(topic, lsnrs);
             }
         }
 
@@ -1639,7 +1800,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
         if (lsnr == null) {
             closedTopics.add(topic);
 
-            lsnr = lsnrMap.remove(topic);
+            lsnr = listenerRemove0(topic);
 
             rmv = lsnr != null;
 
@@ -1650,7 +1811,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
         }
         else {
             for (;;) {
-                GridMessageListener lsnrs = lsnrMap.get(topic);
+                GridMessageListener lsnrs = listenerGet0(topic);
 
                 // If removing listener before subscription happened.
                 if (lsnrs == null) {
@@ -1670,7 +1831,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
 
                     if (!(lsnrs instanceof ArrayListener)) {
                         if (lsnrs.equals(lsnr)) {
-                            if (!lsnrMap.remove(topic, lsnrs))
+                            if (!listenerRemove0(topic, lsnrs))
                                 continue; // Retry because it can be packed to array listener.
 
                             empty = true;
@@ -1688,7 +1849,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
                             rmv = false;
 
                         if (empty)
-                            lsnrMap.remove(topic, lsnrs);
+                            listenerRemove0(topic, lsnrs);
                     }
 
                     // If removing last subscribed listener.
@@ -2132,7 +2293,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
         /** {@inheritDoc} */
         @SuppressWarnings("SynchronizationOnLocalVariableOrMethodParameter")
         @Override public void onTimeout() {
-            GridMessageListener lsnr = lsnrMap.get(topic);
+            GridMessageListener lsnr = listenerGet0(topic);
 
             if (lsnr != null) {
                 long delta = 0;