You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ag...@apache.org on 2017/04/13 13:31:07 UTC
[20/21] ignite git commit: ignite-4946 GridCacheP2PUndeploySelfTest
became failed
ignite-4946 GridCacheP2PUndeploySelfTest became failed
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/d298e756
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/d298e756
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/d298e756
Branch: refs/heads/ignite-3477-master
Commit: d298e75610e192ef1ca17fb9e678bd83db64e1a4
Parents: e922dda
Author: Yakov Zhdanov <yz...@gridgain.com>
Authored: Thu Apr 13 15:52:20 2017 +0300
Committer: Yakov Zhdanov <yz...@gridgain.com>
Committed: Thu Apr 13 15:52:20 2017 +0300
----------------------------------------------------------------------
.../eventstorage/GridEventStorageManager.java | 341 ++++++++++---------
1 file changed, 187 insertions(+), 154 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/d298e756/modules/core/src/main/java/org/apache/ignite/internal/managers/eventstorage/GridEventStorageManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/eventstorage/GridEventStorageManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/eventstorage/GridEventStorageManager.java
index d49463e..bb57c4b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/eventstorage/GridEventStorageManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/eventstorage/GridEventStorageManager.java
@@ -21,6 +21,7 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
+import java.util.EventListener;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
@@ -33,6 +34,7 @@ import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteEvents;
import org.apache.ignite.IgniteException;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.events.DiscoveryEvent;
@@ -80,10 +82,7 @@ import static org.apache.ignite.internal.managers.communication.GridIoPolicy.PUB
*/
public class GridEventStorageManager extends GridManagerAdapter<EventStorageSpi> {
/** Local event listeners. */
- private final ConcurrentMap<Integer, Set<GridLocalEventListener>> lsnrs = new ConcurrentHashMap8<>();
-
- /** Internal discovery listeners. */
- private final ConcurrentMap<Integer, Set<DiscoveryEventListener>> discoLsnrs = new ConcurrentHashMap8<>();
+ private final ConcurrentMap<Integer, Set<EventListener>> lsnrs = new ConcurrentHashMap8<>();
/** Busy lock to control activity of threads. */
private final ReadWriteLock busyLock = new ReentrantReadWriteLock();
@@ -205,7 +204,7 @@ public class GridEventStorageManager extends GridManagerAdapter<EventStorageSpi>
@Override public void printMemoryStats() {
int lsnrsCnt = 0;
- for (Set<GridLocalEventListener> lsnrs0 : lsnrs.values())
+ for (Set<EventListener> lsnrs0 : lsnrs.values())
lsnrsCnt += lsnrs0.size();
X.println(">>>");
@@ -254,7 +253,6 @@ public class GridEventStorageManager extends GridManagerAdapter<EventStorageSpi>
msgLsnr = null;
lsnrs.clear();
- discoLsnrs.clear();
stopped = true;
}
@@ -296,6 +294,26 @@ public class GridEventStorageManager extends GridManagerAdapter<EventStorageSpi>
* @param evt Event to record.
*/
public void record(Event evt) {
+ record0(evt);
+ }
+
+ /**
+ * Records discovery events.
+ *
+ * @param evt Event to record.
+ * @param discoCache Discovery cache.
+ */
+ public void record(DiscoveryEvent evt, DiscoCache discoCache) {
+ record0(evt, discoCache);
+ }
+
+ /**
+ * Records event if it's recordable.
+ *
+ * @param evt Event to record.
+ * @param params Additional parameters.
+ */
+ private void record0(Event evt, Object... params) {
assert evt != null;
if (!enterBusy())
@@ -319,31 +337,7 @@ public class GridEventStorageManager extends GridManagerAdapter<EventStorageSpi>
}
if (isRecordable(type))
- notifyListeners(evt);
- }
- finally {
- leaveBusy();
- }
- }
-
- /**
- * Records discovery events.
- *
- * @param evt Event to record.
- * @param discoCache Discovery cache.
- */
- public void record(DiscoveryEvent evt, DiscoCache discoCache) {
- assert evt != null;
-
- if (!enterBusy())
- return;
-
- try {
- // Notify internal discovery listeners first.
- notifyDiscoveryListeners(evt, discoCache);
-
- // Notify all other registered listeners.
- record(evt);
+ notifyListeners(lsnrs.get(evt.type()), evt, params);
}
finally {
leaveBusy();
@@ -593,6 +587,8 @@ public class GridEventStorageManager extends GridManagerAdapter<EventStorageSpi>
* @param types Event types to subscribe listener for.
*/
public void addLocalEventListener(IgnitePredicate<? extends Event> lsnr, int[] types) {
+ assert lsnr != null;
+
try {
ctx.resource().injectGeneric(lsnr);
}
@@ -600,7 +596,7 @@ public class GridEventStorageManager extends GridManagerAdapter<EventStorageSpi>
throw new IgniteException("Failed to inject resources to event listener: " + lsnr, e);
}
- addLocalEventListener(new UserListenerWrapper(lsnr), types);
+ addEventListener(new UserListenerWrapper(lsnr), types);
}
/**
@@ -616,20 +612,7 @@ public class GridEventStorageManager extends GridManagerAdapter<EventStorageSpi>
assert types != null;
assert types.length > 0;
- if (!enterBusy())
- return;
-
- try {
- for (int t : types) {
- getOrCreate(lsnrs, t).add(lsnr);
-
- if (!isRecordable(t))
- U.warn(log, "Added listener for disabled event type: " + U.gridEventName(t));
- }
- }
- finally {
- leaveBusy();
- }
+ addEventListener(new LocalListenerWrapper(lsnr), types);
}
/**
@@ -642,27 +625,7 @@ public class GridEventStorageManager extends GridManagerAdapter<EventStorageSpi>
public void addLocalEventListener(GridLocalEventListener lsnr, int type, @Nullable int... types) {
assert lsnr != null;
- if (!enterBusy())
- return;
-
- try {
- getOrCreate(lsnrs, type).add(lsnr);
-
- if (!isRecordable(type))
- U.warn(log, "Added listener for disabled event type: " + U.gridEventName(type));
-
- if (types != null) {
- for (int t : types) {
- getOrCreate(lsnrs, t).add(lsnr);
-
- if (!isRecordable(t))
- U.warn(log, "Added listener for disabled event type: " + U.gridEventName(t));
- }
- }
- }
- finally {
- leaveBusy();
- }
+ addEventListener(new LocalListenerWrapper(lsnr), type, types);
}
/**
@@ -678,12 +641,40 @@ public class GridEventStorageManager extends GridManagerAdapter<EventStorageSpi>
assert types != null;
assert types.length > 0;
+ addEventListener(new DiscoveryListenerWrapper(lsnr), types);
+ }
+
+ /**
+ * Adds discovery event listener.
+ *
+ * @param lsnr Listener to add.
+ * @param type Event type to subscribe listener for.
+ * @param types Additional event types to subscribe listener for.
+ */
+ public void addDiscoveryEventListener(DiscoveryEventListener lsnr, int type, @Nullable int... types) {
+ assert lsnr != null;
+
+ addEventListener(new DiscoveryListenerWrapper(lsnr), type, types);
+ }
+
+ /**
+ * Adds local event listener. Note that this method specifically disallow an empty
+ * array of event type to prevent accidental subscription for all system event that
+ * may lead to a drastic performance decrease.
+ *
+ * @param lsnr Listener to add.
+ * @param types Event types to subscribe listener for.
+ */
+ private void addEventListener(EventListener lsnr, int[] types) {
if (!enterBusy())
return;
try {
for (int t : types) {
- getOrCreate(discoLsnrs, t).add(lsnr);
+ getOrCreate(lsnrs, t).add(lsnr);
+
+ if (!isRecordable(t))
+ U.warn(log, "Added listener for disabled event type: " + U.gridEventName(t));
}
}
finally {
@@ -692,24 +683,28 @@ public class GridEventStorageManager extends GridManagerAdapter<EventStorageSpi>
}
/**
- * Adds discovery event listener.
+ * Adds local event listener.
*
* @param lsnr Listener to add.
* @param type Event type to subscribe listener for.
* @param types Additional event types to subscribe listener for.
*/
- public void addDiscoveryEventListener(DiscoveryEventListener lsnr, int type, @Nullable int... types) {
- assert lsnr != null;
-
+ private void addEventListener(EventListener lsnr, int type, @Nullable int... types) {
if (!enterBusy())
return;
try {
- getOrCreate(discoLsnrs, type).add(lsnr);
+ getOrCreate(lsnrs, type).add(lsnr);
+
+ if (!isRecordable(type))
+ U.warn(log, "Added listener for disabled event type: " + U.gridEventName(type));
if (types != null) {
for (int t : types) {
- getOrCreate(discoLsnrs, t).add(lsnr);
+ getOrCreate(lsnrs, t).add(lsnr);
+
+ if (!isRecordable(t))
+ U.warn(log, "Added listener for disabled event type: " + U.gridEventName(t));
}
}
}
@@ -718,6 +713,7 @@ public class GridEventStorageManager extends GridManagerAdapter<EventStorageSpi>
}
}
+
/**
* @param lsnrs Listeners map.
* @param type Event type.
@@ -749,7 +745,9 @@ public class GridEventStorageManager extends GridManagerAdapter<EventStorageSpi>
* @return Returns {@code true} if removed.
*/
public boolean removeLocalEventListener(IgnitePredicate<? extends Event> lsnr, @Nullable int... types) {
- return removeLocalEventListener(new UserListenerWrapper(lsnr), types);
+ assert lsnr != null;
+
+ return removeEventListener(new UserListenerWrapper(lsnr), types);
}
/**
@@ -763,33 +761,21 @@ public class GridEventStorageManager extends GridManagerAdapter<EventStorageSpi>
public boolean removeLocalEventListener(GridLocalEventListener lsnr, @Nullable int... types) {
assert lsnr != null;
- boolean found = false;
-
- if (F.isEmpty(types)) {
- for (Set<GridLocalEventListener> set : lsnrs.values())
- if (set.remove(lsnr))
- found = true;
- }
- else {
- assert types != null;
-
- for (int type : types) {
- Set<GridLocalEventListener> set = lsnrs.get(type);
-
- if (set != null && set.remove(lsnr))
- found = true;
- }
- }
-
- if (lsnr instanceof UserListenerWrapper)
- {
- IgnitePredicate p = ((UserListenerWrapper)lsnr).listener();
+ return removeEventListener(new LocalListenerWrapper(lsnr), types);
+ }
- if (p instanceof PlatformEventFilterListener)
- ((PlatformEventFilterListener)p).onClose();
- }
+ /**
+ * Removes listener for specified events, if any. If no event types provided - it
+ * remove the listener for all its registered events.
+ *
+ * @param lsnr Listener.
+ * @param types Event types.
+ * @return Returns {@code true} if removed.
+ */
+ public boolean removeDiscoveryEventListener(DiscoveryEventListener lsnr, @Nullable int... types) {
+ assert lsnr != null;
- return found;
+ return removeEventListener(new DiscoveryListenerWrapper(lsnr), types);
}
/**
@@ -800,13 +786,13 @@ public class GridEventStorageManager extends GridManagerAdapter<EventStorageSpi>
* @param types Event types.
* @return Returns {@code true} if removed.
*/
- public boolean removeDiscoveryEventListener(DiscoveryEventListener lsnr, @Nullable int... types) {
+ private boolean removeEventListener(EventListener lsnr, @Nullable int[] types) {
assert lsnr != null;
boolean found = false;
if (F.isEmpty(types)) {
- for (Set<DiscoveryEventListener> set : discoLsnrs.values())
+ for (Set<EventListener> set : lsnrs.values())
if (set.remove(lsnr))
found = true;
}
@@ -814,13 +800,21 @@ public class GridEventStorageManager extends GridManagerAdapter<EventStorageSpi>
assert types != null;
for (int type : types) {
- Set<DiscoveryEventListener> set = discoLsnrs.get(type);
+ Set<EventListener> set = lsnrs.get(type);
if (set != null && set.remove(lsnr))
found = true;
}
}
+ if (lsnr instanceof UserListenerWrapper)
+ {
+ IgnitePredicate p = ((UserListenerWrapper)lsnr).listener();
+
+ if (p instanceof PlatformEventFilterListener)
+ ((PlatformEventFilterListener)p).onClose();
+ }
+
return found;
}
@@ -884,62 +878,18 @@ public class GridEventStorageManager extends GridManagerAdapter<EventStorageSpi>
}
/**
- * @param evt Event to notify about.
- */
- private void notifyListeners(Event evt) {
- assert evt != null;
-
- notifyListeners(lsnrs.get(evt.type()), evt);
- }
-
- /**
* @param set Set of listeners.
* @param evt Grid event.
*/
- private void notifyListeners(@Nullable Collection<GridLocalEventListener> set, Event evt) {
+ private void notifyListeners(@Nullable Collection<EventListener> set, Event evt, Object[] params) {
assert evt != null;
if (!F.isEmpty(set)) {
assert set != null;
- for (GridLocalEventListener lsnr : set) {
+ for (EventListener lsnr : set) {
try {
- lsnr.onEvent(evt);
- }
- catch (Throwable e) {
- U.error(log, "Unexpected exception in listener notification for event: " + evt, e);
-
- if (e instanceof Error)
- throw (Error)e;
- }
- }
- }
- }
-
- /**
- * @param evt Discovery event
- * @param cache Discovery cache.
- */
- private void notifyDiscoveryListeners(DiscoveryEvent evt, DiscoCache cache) {
- assert evt != null;
-
- notifyDiscoveryListeners(discoLsnrs.get(evt.type()), evt, cache);
- }
-
- /**
- * @param set Set of listeners.
- * @param evt Discovery event.
- * @param cache Discovery cache.
- */
- private void notifyDiscoveryListeners(@Nullable Collection<DiscoveryEventListener> set, DiscoveryEvent evt, DiscoCache cache) {
- assert evt != null;
-
- if (!F.isEmpty(set)) {
- assert set != null;
-
- for (DiscoveryEventListener lsnr : set) {
- try {
- lsnr.onEvent(evt, cache);
+ ((ListenerWrapper)lsnr).onEvent(evt, params);
}
catch (Throwable e) {
U.error(log, "Unexpected exception in listener notification for event: " + evt, e);
@@ -1337,10 +1287,93 @@ public class GridEventStorageManager extends GridManagerAdapter<EventStorageSpi>
}
}
+ /** */
+ private abstract static class ListenerWrapper implements EventListener {
+ abstract void onEvent(Event evt, Object[] params);
+ }
+
+ /**
+ * Wraps local listener
+ */
+ private static final class LocalListenerWrapper extends ListenerWrapper {
+ /** */
+ private final GridLocalEventListener lsnr;
+
+ /**
+ * @param lsnr Listener.
+ */
+ private LocalListenerWrapper(GridLocalEventListener lsnr) {
+ this.lsnr = lsnr;
+ }
+
+ /** {@inheritDoc} */
+ @Override void onEvent(Event evt, Object[] params) {
+ lsnr.onEvent(evt);
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean equals(Object o) {
+ if (this == o)
+ return true;
+
+ if (o == null || getClass() != o.getClass())
+ return false;
+
+ LocalListenerWrapper wrapper = (LocalListenerWrapper)o;
+
+ return lsnr.equals(wrapper.lsnr);
+ }
+
+ /** {@inheritDoc} */
+ @Override public int hashCode() {
+ return lsnr.hashCode();
+ }
+ }
+
+ /**
+ * Wraps discovery local listener
+ */
+ private static final class DiscoveryListenerWrapper extends ListenerWrapper {
+ /** */
+ private final DiscoveryEventListener lsnr;
+
+ /**
+ * @param lsnr Listener.
+ */
+ private DiscoveryListenerWrapper(DiscoveryEventListener lsnr) {
+ this.lsnr = lsnr;
+ }
+
+ /** {@inheritDoc} */
+ @Override void onEvent(Event evt, Object[] params) {
+ // No checks there since only DiscoveryManager produses DiscoveryEvents
+ // and it uses an overloaded method with additional parameters
+ lsnr.onEvent((DiscoveryEvent)evt, (DiscoCache)params[0]);
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean equals(Object o) {
+ if (this == o)
+ return true;
+
+ if (o == null || getClass() != o.getClass())
+ return false;
+
+ DiscoveryListenerWrapper wrapper = (DiscoveryListenerWrapper)o;
+
+ return lsnr.equals(wrapper.lsnr);
+ }
+
+ /** {@inheritDoc} */
+ @Override public int hashCode() {
+ return lsnr.hashCode();
+ }
+ }
+
/**
- * Wraps user listener predicate provided via {@link org.apache.ignite.IgniteEvents#localListen(org.apache.ignite.lang.IgnitePredicate, int...)}.
+ * Wraps user listener predicate provided via {@link IgniteEvents#localListen(IgnitePredicate, int...)}.
*/
- private class UserListenerWrapper implements GridLocalEventListener {
+ private final class UserListenerWrapper extends ListenerWrapper {
/** */
private final IgnitePredicate<Event> lsnr;
@@ -1359,9 +1392,9 @@ public class GridEventStorageManager extends GridManagerAdapter<EventStorageSpi>
}
/** {@inheritDoc} */
- @Override public void onEvent(Event evt) {
+ @Override void onEvent(Event evt, Object[] params) {
if (!lsnr.apply(evt))
- removeLocalEventListener(this);
+ removeEventListener(this, null);
}
/** {@inheritDoc} */