You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2017/07/12 06:05:34 UTC
[2/3] ignite git commit: ignite-5727 Call TcpCommunicationSpi's
discovery listener first
ignite-5727 Call TcpCommunicationSpi's discovery listener first
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/b95f76f8
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/b95f76f8
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/b95f76f8
Branch: refs/heads/ignite-5578-locJoin
Commit: b95f76f8a0a3a7e920f78f20b3d814112fc6d522
Parents: 5c36318
Author: sboikov <sb...@gridgain.com>
Authored: Wed Jul 12 08:47:04 2017 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Wed Jul 12 08:47:04 2017 +0300
----------------------------------------------------------------------
.../eventstorage/GridEventStorageManager.java | 309 ++++++++++---------
.../eventstorage/HighPriorityListener.java | 28 ++
.../processors/cache/GridCacheMvccManager.java | 5 -
.../processors/cache/GridCacheProcessor.java | 2 -
.../continuous/GridContinuousProcessor.java | 91 +++---
.../communication/tcp/TcpCommunicationSpi.java | 30 +-
6 files changed, 269 insertions(+), 196 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/b95f76f8/modules/core/src/main/java/org/apache/ignite/internal/managers/eventstorage/GridEventStorageManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/eventstorage/GridEventStorageManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/eventstorage/GridEventStorageManager.java
index 1714cbb..944420f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/eventstorage/GridEventStorageManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/eventstorage/GridEventStorageManager.java
@@ -21,6 +21,7 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
+import java.util.Comparator;
import java.util.EventListener;
import java.util.HashSet;
import java.util.Iterator;
@@ -83,10 +84,7 @@ import static org.apache.ignite.internal.managers.communication.GridIoPolicy.PUB
*/
public class GridEventStorageManager extends GridManagerAdapter<EventStorageSpi> {
/** Local event listeners. */
- private final ConcurrentMap<Integer, Set<EventListener>> lsnrs = new ConcurrentHashMap8<>();
-
- /** Internal discovery listeners. */
- private final ConcurrentMap<Integer, Set<DiscoveryEventListener>> discoLsnrs = new ConcurrentHashMap8<>();
+ private final ConcurrentMap<Integer, Listeners> lsnrs = new ConcurrentHashMap8<>();
/** Busy lock to control activity of threads. */
private final ReadWriteLock busyLock = new ReentrantReadWriteLock();
@@ -208,8 +206,8 @@ public class GridEventStorageManager extends GridManagerAdapter<EventStorageSpi>
@Override public void printMemoryStats() {
int lsnrsCnt = 0;
- for (Set<EventListener> lsnrs0 : lsnrs.values())
- lsnrsCnt += lsnrs0.size();
+ for (Listeners lsnrs0 : lsnrs.values())
+ lsnrsCnt += lsnrs0.lsnrs.size();
X.println(">>>");
X.println(">>> Event storage manager memory stats [igniteInstanceName=" + ctx.igniteInstanceName() + ']');
@@ -250,9 +248,7 @@ public class GridEventStorageManager extends GridManagerAdapter<EventStorageSpi>
try {
if (msgLsnr != null)
- ctx.io().removeMessageListener(
- TOPIC_EVENT,
- msgLsnr);
+ ctx.io().removeMessageListener(TOPIC_EVENT, msgLsnr);
msgLsnr = null;
@@ -332,13 +328,14 @@ public class GridEventStorageManager extends GridManagerAdapter<EventStorageSpi>
}
// Override user recordable settings for daemon node.
- if ((isDaemon || isUserRecordable(type)) && !isHiddenEvent(type))
+ if ((isDaemon || isUserRecordable(type)) && !isHiddenEvent(type)) {
try {
getSpi().record(evt);
}
catch (IgniteSpiException e) {
U.error(log, "Failed to record event: " + evt, e);
}
+ }
if (isRecordable(type))
notifyListeners(lsnrs.get(evt.type()), evt, params);
@@ -669,17 +666,13 @@ public class GridEventStorageManager extends GridManagerAdapter<EventStorageSpi>
* @param lsnr Listener to add.
* @param types Event types to subscribe listener for.
*/
- private void addEventListener(EventListener lsnr, int[] types) {
+ private void addEventListener(ListenerWrapper lsnr, int[] types) {
if (!enterBusy())
return;
try {
- for (int t : types) {
- getOrCreate(lsnrs, t).add(lsnr);
-
- if (!isRecordable(t))
- U.warn(log, "Added listener for disabled event type: " + U.gridEventName(t));
- }
+ for (int t : types)
+ registerListener(lsnr, t);
}
finally {
leaveBusy();
@@ -693,23 +686,16 @@ public class GridEventStorageManager extends GridManagerAdapter<EventStorageSpi>
* @param type Event type to subscribe listener for.
* @param types Additional event types to subscribe listener for.
*/
- private void addEventListener(EventListener lsnr, int type, @Nullable int... types) {
+ private void addEventListener(ListenerWrapper lsnr, int type, @Nullable int... types) {
if (!enterBusy())
return;
try {
- getOrCreate(lsnrs, type).add(lsnr);
-
- if (!isRecordable(type))
- U.warn(log, "Added listener for disabled event type: " + U.gridEventName(type));
+ registerListener(lsnr, type);
if (types != null) {
- for (int t : types) {
- getOrCreate(lsnrs, t).add(lsnr);
-
- if (!isRecordable(t))
- U.warn(log, "Added listener for disabled event type: " + U.gridEventName(t));
- }
+ for (int t : types)
+ registerListener(lsnr, t);
}
}
finally {
@@ -718,25 +704,25 @@ public class GridEventStorageManager extends GridManagerAdapter<EventStorageSpi>
}
/**
- * @param lsnrs Listeners map.
+ * @param lsnr Listener.
* @param type Event type.
- * @return Listeners for given event type.
*/
- private <T> Collection<T> getOrCreate(ConcurrentMap<Integer, Set<T>> lsnrs, Integer type) {
- Set<T> set = lsnrs.get(type);
+ private void registerListener(ListenerWrapper lsnr, Integer type) {
+ Listeners lsnrs0 = lsnrs.get(type);
- if (set == null) {
- set = new GridConcurrentLinkedHashSet<>();
+ if (lsnrs0 == null) {
+ lsnrs0 = new Listeners();
- Set<T> prev = lsnrs.putIfAbsent(type, set);
+ Listeners prev = lsnrs.putIfAbsent(type, lsnrs0);
if (prev != null)
- set = prev;
+ lsnrs0 = prev;
}
- assert set != null;
+ lsnrs0.addListener(lsnr);
- return set;
+ if (!isRecordable(type))
+ U.warn(log, "Added listener for disabled event type: " + U.gridEventName(type));
}
/**
@@ -789,29 +775,29 @@ public class GridEventStorageManager extends GridManagerAdapter<EventStorageSpi>
* @param types Event types.
* @return Returns {@code true} if removed.
*/
- private boolean removeEventListener(EventListener lsnr, @Nullable int[] types) {
+ private boolean removeEventListener(ListenerWrapper lsnr, @Nullable int[] types) {
assert lsnr != null;
boolean found = false;
if (F.isEmpty(types)) {
- for (Set<EventListener> set : lsnrs.values())
- if (set.remove(lsnr))
+ for (Listeners set : lsnrs.values()) {
+ if (set.removeListener(lsnr))
found = true;
+ }
}
else {
assert types != null;
for (int type : types) {
- Set<EventListener> set = lsnrs.get(type);
+ Listeners set = lsnrs.get(type);
- if (set != null && set.remove(lsnr))
+ if (set != null && set.removeListener(lsnr))
found = true;
}
}
- if (lsnr instanceof UserListenerWrapper)
- {
+ if (lsnr instanceof UserListenerWrapper) {
IgnitePredicate p = ((UserListenerWrapper)lsnr).listener();
if (p instanceof PlatformEventFilterListener)
@@ -845,96 +831,38 @@ public class GridEventStorageManager extends GridManagerAdapter<EventStorageSpi>
}
/**
- *
- * @param timeout Timeout.
- * @param c Optional continuation.
- * @param p Optional predicate.
- * @param types Event types to wait for.
- * @return Event.
- * @throws IgniteCheckedException Thrown in case of any errors.
- */
- public Event waitForEvent(long timeout, @Nullable Runnable c,
- @Nullable final IgnitePredicate<? super Event> p, int... types) throws IgniteCheckedException {
- assert timeout >= 0;
-
- final GridFutureAdapter<Event> fut = new GridFutureAdapter<>();
-
- addLocalEventListener(new GridLocalEventListener() {
- @Override public void onEvent(Event evt) {
- if (p == null || p.apply(evt)) {
- fut.onDone(evt);
-
- removeLocalEventListener(this);
- }
- }
- }, types);
-
- try {
- if (c != null)
- c.run();
- }
- catch (Exception e) {
- throw new IgniteCheckedException(e);
- }
-
- return fut.get(timeout);
- }
-
- /**
- * @param set Set of listeners.
+ * @param lsnrs Set of listeners.
* @param evt Grid event.
+ * @param params Event parameters.
*/
- private void notifyListeners(@Nullable Collection<EventListener> set, Event evt, Object[] params) {
+ private void notifyListeners(@Nullable Listeners lsnrs, Event evt, Object[] params) {
assert evt != null;
- if (!F.isEmpty(set)) {
- assert set != null;
-
- for (EventListener lsnr : set) {
- try {
- ((ListenerWrapper)lsnr).onEvent(evt, params);
- }
- catch (Throwable e) {
- U.error(log, "Unexpected exception in listener notification for event: " + evt, e);
+ if (lsnrs != null) {
+ notifyListeners(lsnrs.highPriorityLsnrs, evt, params);
- if (e instanceof Error)
- throw (Error)e;
- }
- }
+ notifyListeners(lsnrs.lsnrs, evt, params);
}
}
/**
- * @param evt Discovery event
- * @param cache Discovery cache.
+ * @param lsnrs Listeners collection.
+ * @param evt Event.
+ * @param params Event parameters.
*/
- private void notifyDiscoveryListeners(DiscoveryEvent evt, DiscoCache cache) {
- assert evt != null;
-
- notifyDiscoveryListeners(discoLsnrs.get(evt.type()), evt, cache);
- }
-
- /**
- * @param set Set of listeners.
- * @param evt Discovery event.
- * @param cache Discovery cache.
- */
- private void notifyDiscoveryListeners(@Nullable Collection<DiscoveryEventListener> set, DiscoveryEvent evt, DiscoCache cache) {
- assert evt != null;
-
- if (!F.isEmpty(set)) {
- assert set != null;
+ private void notifyListeners(@Nullable Collection<ListenerWrapper> lsnrs, Event evt, Object[] params) {
+ if (lsnrs == null || lsnrs.isEmpty())
+ return;
- for (DiscoveryEventListener lsnr : set) {
- try {
- lsnr.onEvent(evt, cache);
- }
- catch (Throwable e) {
- U.error(log, "Unexpected exception in listener notification for event: " + evt, e);
+ for (EventListener lsnr : lsnrs) {
+ try {
+ ((ListenerWrapper)lsnr).onEvent(evt, params);
+ }
+ catch (Throwable e) {
+ U.error(log, "Unexpected exception in listener notification for event: " + evt, e);
- if (e instanceof Error)
- throw (Error)e;
- }
+ if (e instanceof Error)
+ throw (Error)e;
}
}
}
@@ -1208,16 +1136,6 @@ public class GridEventStorageManager extends GridManagerAdapter<EventStorageSpi>
}
/**
- * @param arr Array.
- * @return Array copy.
- */
- private boolean[] copy(boolean[] arr) {
- assert arr != null;
-
- return Arrays.copyOf(arr, arr.length);
- }
-
- /**
*
*/
private class RequestListener implements GridMessageListener {
@@ -1329,9 +1247,98 @@ public class GridEventStorageManager extends GridManagerAdapter<EventStorageSpi>
}
}
- /** */
+ /**
+ *
+ */
+ static class Listeners {
+ /** */
+ static Comparator<ListenerWrapper> ORDERED_CMP = new Comparator<ListenerWrapper>() {
+ @Override public int compare(ListenerWrapper lsnr1, ListenerWrapper lsnr2) {
+ int o1 = ((HighPriorityListener)lsnr1.listener()).order();
+ int o2 = ((HighPriorityListener)lsnr2.listener()).order();
+
+ return Integer.compare(o1, o2);
+ }
+ };
+
+ /** */
+ private volatile List<ListenerWrapper> highPriorityLsnrs;
+
+ /** */
+ private final Set<ListenerWrapper> lsnrs = new GridConcurrentLinkedHashSet<>();
+
+ /**
+ * @param lsnr Listener to add.
+ */
+ void addListener(ListenerWrapper lsnr) {
+ if (lsnr.highPriority()) {
+ synchronized (this) {
+ List<ListenerWrapper> curLsnrs = highPriorityLsnrs;
+ List<ListenerWrapper> newLsnrs = new ArrayList<>();
+
+ if (curLsnrs != null)
+ newLsnrs.addAll(curLsnrs);
+
+ assert !newLsnrs.contains(lsnr) : lsnr;
+
+ newLsnrs.add(lsnr);
+
+ Collections.sort(newLsnrs, ORDERED_CMP);
+
+ highPriorityLsnrs = newLsnrs;
+ }
+ }
+ else
+ lsnrs.add(lsnr);
+ }
+
+ /**
+ * @param lsnr Listener to remove.
+ * @return {@code True}
+ */
+ boolean removeListener(ListenerWrapper lsnr) {
+ if (lsnr.highPriority()) {
+ synchronized (this) {
+ List<ListenerWrapper> curLsnrs = highPriorityLsnrs;
+
+ if (curLsnrs == null)
+ return false;
+
+ List<ListenerWrapper> newLsnrs = new ArrayList<>(curLsnrs);
+
+ if (newLsnrs.remove(lsnr)) {
+ highPriorityLsnrs = newLsnrs;
+
+ return true;
+ }
+
+ return false;
+ }
+ }
+ else
+ return lsnrs.remove(lsnr);
+ }
+ }
+
+ /**
+ *
+ */
private abstract static class ListenerWrapper implements EventListener {
+ /**
+ * @param evt Event.
+ * @param params Parameters.
+ */
abstract void onEvent(Event evt, Object[] params);
+
+ /**
+ * @return Wrapped listener.
+ */
+ abstract Object listener();
+
+ /**
+ * @return {@code True} if high priority listener.
+ */
+ abstract boolean highPriority();
}
/**
@@ -1349,6 +1356,16 @@ public class GridEventStorageManager extends GridManagerAdapter<EventStorageSpi>
}
/** {@inheritDoc} */
+ @Override EventListener listener() {
+ return lsnr;
+ }
+
+ /** {@inheritDoc} */
+ @Override boolean highPriority() {
+ return lsnr instanceof HighPriorityListener;
+ }
+
+ /** {@inheritDoc} */
@Override void onEvent(Event evt, Object[] params) {
lsnr.onEvent(evt);
}
@@ -1387,8 +1404,18 @@ public class GridEventStorageManager extends GridManagerAdapter<EventStorageSpi>
}
/** {@inheritDoc} */
+ @Override EventListener listener() {
+ return lsnr;
+ }
+
+ /** {@inheritDoc} */
+ @Override boolean highPriority() {
+ return lsnr instanceof HighPriorityListener;
+ }
+
+ /** {@inheritDoc} */
@Override void onEvent(Event evt, Object[] params) {
- // No checks there since only DiscoveryManager produses DiscoveryEvents
+ // No checks there since only DiscoveryManager produces DiscoveryEvents
// and it uses an overloaded method with additional parameters
lsnr.onEvent((DiscoveryEvent)evt, (DiscoCache)params[0]);
}
@@ -1426,10 +1453,8 @@ public class GridEventStorageManager extends GridManagerAdapter<EventStorageSpi>
this.lsnr = (IgnitePredicate<Event>)lsnr;
}
- /**
- * @return User listener.
- */
- private IgnitePredicate<? extends Event> listener() {
+ /** {@inheritDoc} */
+ public IgnitePredicate<? extends Event> listener() {
return lsnr;
}
@@ -1450,12 +1475,16 @@ public class GridEventStorageManager extends GridManagerAdapter<EventStorageSpi>
UserListenerWrapper that = (UserListenerWrapper)o;
return lsnr.equals(that.lsnr);
-
}
/** {@inheritDoc} */
@Override public int hashCode() {
return lsnr.hashCode();
}
+
+ /** {@inheritDoc} */
+ @Override boolean highPriority() {
+ return false;
+ }
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/b95f76f8/modules/core/src/main/java/org/apache/ignite/internal/managers/eventstorage/HighPriorityListener.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/eventstorage/HighPriorityListener.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/eventstorage/HighPriorityListener.java
new file mode 100644
index 0000000..c55aa8d
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/eventstorage/HighPriorityListener.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.managers.eventstorage;
+
+/**
+ * Marker interface for listeners called before 'regular' listeners.
+ */
+public interface HighPriorityListener {
+ /**
+ * @return Order.
+ */
+ public int order();
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/b95f76f8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java
index a6907b9..b156708 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java
@@ -272,12 +272,7 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter {
exchLog = cctx.logger(getClass().getName() + ".exchange");
pendingExplicit = GridConcurrentFactory.newMap();
- }
- /**
- * Cache futures listener must be registered after communication listener.
- */
- public void registerEventListener() {
cctx.gridEvents().addLocalEventListener(discoLsnr, EVT_NODE_FAILED, EVT_NODE_LEFT);
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/b95f76f8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
index 0488a14..9cedac6 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
@@ -838,8 +838,6 @@ public class GridCacheProcessor extends GridProcessorAdapter {
ctx.query().onCacheKernalStart();
- sharedCtx.mvcc().registerEventListener();
-
sharedCtx.exchange().onKernalStart(active, false);
}
finally {
http://git-wip-us.apache.org/repos/asf/ignite/blob/b95f76f8/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
index 8b9b277..7062353 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
@@ -55,6 +55,7 @@ import org.apache.ignite.internal.managers.deployment.GridDeploymentInfo;
import org.apache.ignite.internal.managers.deployment.GridDeploymentInfoBean;
import org.apache.ignite.internal.managers.discovery.CustomEventListener;
import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener;
+import org.apache.ignite.internal.managers.eventstorage.HighPriorityListener;
import org.apache.ignite.internal.processors.GridProcessorAdapter;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.GridCacheAdapter;
@@ -161,46 +162,7 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
marsh = ctx.config().getMarshaller();
- ctx.event().addLocalEventListener(new GridLocalEventListener() {
- @SuppressWarnings({"fallthrough", "TooBroadScope"})
- @Override public void onEvent(Event evt) {
- assert evt instanceof DiscoveryEvent;
- assert evt.type() == EVT_NODE_LEFT || evt.type() == EVT_NODE_FAILED;
-
- UUID nodeId = ((DiscoveryEvent)evt).eventNode().id();
-
- clientInfos.remove(nodeId);
-
- // Unregister handlers created by left node.
- for (Map.Entry<UUID, RemoteRoutineInfo> e : rmtInfos.entrySet()) {
- UUID routineId = e.getKey();
- RemoteRoutineInfo info = e.getValue();
-
- if (nodeId.equals(info.nodeId)) {
- if (info.autoUnsubscribe)
- unregisterRemote(routineId);
-
- if (info.hnd.isQuery())
- info.hnd.onNodeLeft();
- }
- }
-
- for (Map.Entry<IgniteUuid, SyncMessageAckFuture> e : syncMsgFuts.entrySet()) {
- SyncMessageAckFuture fut = e.getValue();
-
- if (fut.nodeId().equals(nodeId)) {
- SyncMessageAckFuture fut0 = syncMsgFuts.remove(e.getKey());
-
- if (fut0 != null) {
- ClusterTopologyCheckedException err = new ClusterTopologyCheckedException(
- "Node left grid while sending message to: " + nodeId);
-
- fut0.onDone(err);
- }
- }
- }
- }
- }, EVT_NODE_LEFT, EVT_NODE_FAILED);
+ ctx.event().addLocalEventListener(new DiscoveryListener(), EVT_NODE_LEFT, EVT_NODE_FAILED);
ctx.event().addLocalEventListener(new GridLocalEventListener() {
@Override public void onEvent(Event evt) {
@@ -1424,6 +1386,55 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
}
/**
+ *
+ */
+ private class DiscoveryListener implements GridLocalEventListener, HighPriorityListener {
+ /** {@inheritDoc} */
+ @Override public void onEvent(Event evt) {
+ assert evt instanceof DiscoveryEvent;
+ assert evt.type() == EVT_NODE_LEFT || evt.type() == EVT_NODE_FAILED;
+
+ UUID nodeId = ((DiscoveryEvent)evt).eventNode().id();
+
+ clientInfos.remove(nodeId);
+
+ // Unregister handlers created by left node.
+ for (Map.Entry<UUID, RemoteRoutineInfo> e : rmtInfos.entrySet()) {
+ UUID routineId = e.getKey();
+ RemoteRoutineInfo info = e.getValue();
+
+ if (nodeId.equals(info.nodeId)) {
+ if (info.autoUnsubscribe)
+ unregisterRemote(routineId);
+
+ if (info.hnd.isQuery())
+ info.hnd.onNodeLeft();
+ }
+ }
+
+ for (Map.Entry<IgniteUuid, SyncMessageAckFuture> e : syncMsgFuts.entrySet()) {
+ SyncMessageAckFuture fut = e.getValue();
+
+ if (fut.nodeId().equals(nodeId)) {
+ SyncMessageAckFuture fut0 = syncMsgFuts.remove(e.getKey());
+
+ if (fut0 != null) {
+ ClusterTopologyCheckedException err = new ClusterTopologyCheckedException(
+ "Node left grid while sending message to: " + nodeId);
+
+ fut0.onDone(err);
+ }
+ }
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public int order() {
+ return 1;
+ }
+ }
+
+ /**
* Local routine info.
*/
@SuppressWarnings("PackageVisibleInnerClass")
http://git-wip-us.apache.org/repos/asf/ignite/blob/b95f76f8/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
index 8cdb3c0..5aca2f9 100755
--- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
@@ -64,6 +64,7 @@ import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.IgniteInterruptedCheckedException;
import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener;
+import org.apache.ignite.internal.managers.eventstorage.HighPriorityListener;
import org.apache.ignite.internal.util.GridConcurrentFactory;
import org.apache.ignite.internal.util.GridSpinReadWriteLock;
import org.apache.ignite.internal.util.future.GridFinishedFuture;
@@ -1091,15 +1092,8 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati
/** */
private final ConcurrentMap<ConnectionKey, GridNioRecoveryDescriptor> inRecDescs = GridConcurrentFactory.newMap();
- /** Discovery listener. */
- private final GridLocalEventListener discoLsnr = new GridLocalEventListener() {
- @Override public void onEvent(Event evt) {
- assert evt instanceof DiscoveryEvent : evt;
- assert evt.type() == EVT_NODE_LEFT || evt.type() == EVT_NODE_FAILED ;
-
- onNodeLeft(((DiscoveryEvent)evt).eventNode().id());
- }
- };
+ /** */
+ private final GridLocalEventListener discoLsnr = new DiscoveryListener();
/**
* @return {@code True} if ssl enabled.
@@ -3751,6 +3745,24 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati
/**
*
*/
+ private class DiscoveryListener implements GridLocalEventListener, HighPriorityListener {
+ /** {@inheritDoc} */
+ @Override public void onEvent(Event evt) {
+ assert evt instanceof DiscoveryEvent : evt;
+ assert evt.type() == EVT_NODE_LEFT || evt.type() == EVT_NODE_FAILED ;
+
+ onNodeLeft(((DiscoveryEvent)evt).eventNode().id());
+ }
+
+ /** {@inheritDoc} */
+ @Override public int order() {
+ return 0;
+ }
+ }
+
+ /**
+ *
+ */
private class ShmemWorker extends GridWorker {
/** */
private final IpcEndpoint endpoint;