You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vo...@apache.org on 2016/03/09 10:27:55 UTC
[40/50] [abbrv] ignite git commit: IGNITE-2745: Avoid CHM lookup for
listeners on default GridTopic's.
IGNITE-2745: Avoid CHM lookup for listeners on default GridTopic's.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/1c4cfb68
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/1c4cfb68
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/1c4cfb68
Branch: refs/heads/ignite-1786
Commit: 1c4cfb6867f0b8687b6def4087221822c52a0f7b
Parents: 7c5db21
Author: vozerov-gridgain <vo...@gridgain.com>
Authored: Fri Mar 4 12:19:53 2016 +0300
Committer: vozerov-gridgain <vo...@gridgain.com>
Committed: Fri Mar 4 12:19:53 2016 +0300
----------------------------------------------------------------------
.../managers/communication/GridIoManager.java | 183 +++++++++++++++++--
1 file changed, 172 insertions(+), 11 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/1c4cfb68/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
index 36be9ec..232ec2e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
@@ -37,6 +37,7 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
+
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
import org.apache.ignite.cluster.ClusterNode;
@@ -116,6 +117,12 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
/** Listeners by topic. */
private final ConcurrentMap<Object, GridMessageListener> lsnrMap = new ConcurrentHashMap8<>();
+ /** System listeners. */
+ private volatile GridMessageListener[] sysLsnrs;
+
+ /** Mutex for system listeners. */
+ private final Object sysLsnrsMux = new Object();
+
/** Disconnect listeners. */
private final Collection<GridDisconnectListener> disconnectLsnrs = new ConcurrentLinkedQueue<>();
@@ -201,6 +208,10 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
discoDelay = ctx.config().getDiscoveryStartupDelay();
marsh = ctx.config().getMarshaller();
+
+ synchronized (sysLsnrsMux) {
+ sysLsnrs = new GridMessageListener[GridTopic.values().length];
+ }
}
/**
@@ -733,7 +744,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
try {
threadProcessingMessage(true);
- GridMessageListener lsnr = lsnrMap.get(msg.topic());
+ GridMessageListener lsnr = listenerGet0(msg.topic());
if (lsnr == null)
return;
@@ -810,7 +821,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
*/
@SuppressWarnings("deprecation")
private void processRegularMessage0(GridIoMessage msg, UUID nodeId) {
- GridMessageListener lsnr = lsnrMap.get(msg.topic());
+ GridMessageListener lsnr = listenerGet0(msg.topic());
if (lsnr == null)
return;
@@ -823,6 +834,156 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
}
/**
+ * Get listener.
+ *
+ * @param topic Topic.
+ * @return Listener.
+ */
+ @Nullable private GridMessageListener listenerGet0(Object topic) {
+ if (topic instanceof GridTopic)
+ return sysLsnrs[systemListenerIndex(topic)];
+ else
+ return lsnrMap.get(topic);
+ }
+
+ /**
+ * Put listener if it is absent.
+ *
+ * @param topic Topic.
+ * @param lsnr Listener.
+ * @return Old listener (if any).
+ */
+ @Nullable private GridMessageListener listenerPutIfAbsent0(Object topic, GridMessageListener lsnr) {
+ if (topic instanceof GridTopic) {
+ synchronized (sysLsnrsMux) {
+ int idx = systemListenerIndex(topic);
+
+ GridMessageListener old = sysLsnrs[idx];
+
+ if (old == null)
+ changeSystemListener(idx, lsnr);
+
+ return old;
+ }
+ }
+ else
+ return lsnrMap.putIfAbsent(topic, lsnr);
+ }
+
+ /**
+ * Remove listener.
+ *
+ * @param topic Topic.
+ * @return Removed listener (if any).
+ */
+ @Nullable private GridMessageListener listenerRemove0(Object topic) {
+ if (topic instanceof GridTopic) {
+ synchronized (sysLsnrsMux) {
+ int idx = systemListenerIndex(topic);
+
+ GridMessageListener old = sysLsnrs[idx];
+
+ if (old != null)
+ changeSystemListener(idx, null);
+
+ return old;
+ }
+ }
+ else
+ return lsnrMap.remove(topic);
+ }
+
+ /**
+ * Remove listener if it matches expected value.
+ *
+ * @param topic Topic.
+ * @param expected Listener.
+ * @return Result.
+ */
+ private boolean listenerRemove0(Object topic, GridMessageListener expected) {
+ if (topic instanceof GridTopic) {
+ synchronized (sysLsnrsMux) {
+ return systemListenerChange(topic, expected, null);
+ }
+ }
+ else
+ return lsnrMap.remove(topic, expected);
+ }
+
+ /**
+ * Replace listener.
+ *
+ * @param topic Topic.
+ * @param expected Old value.
+ * @param newVal New value.
+ * @return Result.
+ */
+ private boolean listenerReplace0(Object topic, GridMessageListener expected, GridMessageListener newVal) {
+ if (topic instanceof GridTopic) {
+ synchronized (sysLsnrsMux) {
+ return systemListenerChange(topic, expected, newVal);
+ }
+ }
+ else
+ return lsnrMap.replace(topic, expected, newVal);
+ }
+
+ /**
+ * Change system listener.
+ *
+ * @param topic Topic.
+ * @param expected Expected value.
+ * @param newVal New value.
+ * @return Result.
+ */
+ private boolean systemListenerChange(Object topic, GridMessageListener expected, GridMessageListener newVal) {
+ assert Thread.holdsLock(sysLsnrsMux);
+ assert topic instanceof GridTopic;
+
+ int idx = systemListenerIndex(topic);
+
+ GridMessageListener old = sysLsnrs[idx];
+
+ if (old != null && old.equals(expected)) {
+ changeSystemListener(idx, newVal);
+
+ return true;
+ }
+
+ return false;
+ }
+
+ /**
+ * Change systme listener at the given index.
+ *
+ * @param idx Index.
+ * @param lsnr Listener.
+ */
+ private void changeSystemListener(int idx, @Nullable GridMessageListener lsnr) {
+ assert Thread.holdsLock(sysLsnrsMux);
+
+ GridMessageListener[] res = new GridMessageListener[sysLsnrs.length];
+
+ System.arraycopy(sysLsnrs, 0, res, 0, sysLsnrs.length);
+
+ res[idx] = lsnr;
+
+ sysLsnrs = res;
+ }
+
+ /**
+ * Get index of a system listener.
+ *
+ * @param topic Topic.
+ * @return Index.
+ */
+ private int systemListenerIndex(Object topic) {
+ assert topic instanceof GridTopic;
+
+ return ((GridTopic)topic).ordinal();
+ }
+
+ /**
* @param nodeId Node ID.
* @param msg Ordered message.
* @param plc Execution policy.
@@ -928,7 +1089,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
if (isNew && set.endTime() != Long.MAX_VALUE)
ctx.timeout().addTimeoutObject(set);
- final GridMessageListener lsnr = lsnrMap.get(msg.topic());
+ final GridMessageListener lsnr = listenerGet0(msg.topic());
if (lsnr == null) {
if (closedTopics.contains(msg.topic())) {
@@ -1537,7 +1698,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
GridMessageListener lsnrs;
for (;;) {
- lsnrs = lsnrMap.putIfAbsent(topic, lsnr);
+ lsnrs = listenerPutIfAbsent0(topic, lsnr);
if (lsnrs == null) {
lsnrs = lsnr;
@@ -1550,7 +1711,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
if (!(lsnrs instanceof ArrayListener)) { // We are putting the second listener, creating array.
GridMessageListener arrLsnr = new ArrayListener(lsnrs, lsnr);
- if (lsnrMap.replace(topic, lsnrs, arrLsnr)) {
+ if (listenerReplace0(topic, lsnrs, arrLsnr)) {
lsnrs = arrLsnr;
break;
@@ -1561,7 +1722,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
break;
// Add operation failed because array is already empty and is about to be removed, helping and retrying.
- lsnrMap.remove(topic, lsnrs);
+ listenerRemove0(topic, lsnrs);
}
}
@@ -1639,7 +1800,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
if (lsnr == null) {
closedTopics.add(topic);
- lsnr = lsnrMap.remove(topic);
+ lsnr = listenerRemove0(topic);
rmv = lsnr != null;
@@ -1650,7 +1811,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
}
else {
for (;;) {
- GridMessageListener lsnrs = lsnrMap.get(topic);
+ GridMessageListener lsnrs = listenerGet0(topic);
// If removing listener before subscription happened.
if (lsnrs == null) {
@@ -1670,7 +1831,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
if (!(lsnrs instanceof ArrayListener)) {
if (lsnrs.equals(lsnr)) {
- if (!lsnrMap.remove(topic, lsnrs))
+ if (!listenerRemove0(topic, lsnrs))
continue; // Retry because it can be packed to array listener.
empty = true;
@@ -1688,7 +1849,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
rmv = false;
if (empty)
- lsnrMap.remove(topic, lsnrs);
+ listenerRemove0(topic, lsnrs);
}
// If removing last subscribed listener.
@@ -2132,7 +2293,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
/** {@inheritDoc} */
@SuppressWarnings("SynchronizationOnLocalVariableOrMethodParameter")
@Override public void onTimeout() {
- GridMessageListener lsnr = lsnrMap.get(topic);
+ GridMessageListener lsnr = listenerGet0(topic);
if (lsnr != null) {
long delta = 0;