You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2017/04/18 15:04:16 UTC

[17/46] ignite git commit: ignite-4946 GridCacheP2PUndeploySelfTest became failed

ignite-4946 GridCacheP2PUndeploySelfTest became failed

(cherry picked from commit d298e75)


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/cd0b9295
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/cd0b9295
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/cd0b9295

Branch: refs/heads/ignite-1561-1
Commit: cd0b92950c6691c6fc1a26cb4f7e55f5ee459298
Parents: 20016a2
Author: Yakov Zhdanov <yz...@gridgain.com>
Authored: Thu Apr 13 15:52:20 2017 +0300
Committer: Igor Seliverstov <gv...@gmail.com>
Committed: Thu Apr 13 16:44:07 2017 +0300

----------------------------------------------------------------------
 .../eventstorage/GridEventStorageManager.java   | 341 ++++++++++---------
 1 file changed, 187 insertions(+), 154 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/cd0b9295/modules/core/src/main/java/org/apache/ignite/internal/managers/eventstorage/GridEventStorageManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/eventstorage/GridEventStorageManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/eventstorage/GridEventStorageManager.java
index a2c64ba..406da2d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/eventstorage/GridEventStorageManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/eventstorage/GridEventStorageManager.java
@@ -21,6 +21,7 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.EventListener;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.LinkedList;
@@ -33,6 +34,7 @@ import java.util.concurrent.atomic.AtomicReference;
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteEvents;
 import org.apache.ignite.IgniteException;
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.events.DiscoveryEvent;
@@ -78,10 +80,7 @@ import static org.apache.ignite.internal.managers.communication.GridIoPolicy.PUB
  */
 public class GridEventStorageManager extends GridManagerAdapter<EventStorageSpi> {
     /** Local event listeners. */
-    private final ConcurrentMap<Integer, Set<GridLocalEventListener>> lsnrs = new ConcurrentHashMap8<>();
-
-    /** Internal discovery listeners. */
-    private final ConcurrentMap<Integer, Set<DiscoveryEventListener>> discoLsnrs = new ConcurrentHashMap8<>();
+    private final ConcurrentMap<Integer, Set<EventListener>> lsnrs = new ConcurrentHashMap8<>();
 
     /** Busy lock to control activity of threads. */
     private final ReadWriteLock busyLock = new ReentrantReadWriteLock();
@@ -200,7 +199,7 @@ public class GridEventStorageManager extends GridManagerAdapter<EventStorageSpi>
     @Override public void printMemoryStats() {
         int lsnrsCnt = 0;
 
-        for (Set<GridLocalEventListener> lsnrs0 : lsnrs.values())
+        for (Set<EventListener> lsnrs0 : lsnrs.values())
             lsnrsCnt += lsnrs0.size();
 
         X.println(">>>");
@@ -238,7 +237,6 @@ public class GridEventStorageManager extends GridManagerAdapter<EventStorageSpi>
         msgLsnr = null;
 
         lsnrs.clear();
-        discoLsnrs.clear();
     }
 
     /** {@inheritDoc} */
@@ -274,6 +272,26 @@ public class GridEventStorageManager extends GridManagerAdapter<EventStorageSpi>
      * @param evt Event to record.
      */
     public void record(Event evt) {
+        record0(evt);
+    }
+
+    /**
+     * Records discovery events.
+     *
+     * @param evt Event to record.
+     * @param discoCache Discovery cache.
+     */
+    public void record(DiscoveryEvent evt, DiscoCache discoCache) {
+        record0(evt, discoCache);
+    }
+
+    /**
+     * Records event if it's recordable.
+     *
+     * @param evt Event to record.
+     * @param params Additional parameters.
+     */
+    private void record0(Event evt, Object... params) {
         assert evt != null;
 
         if (!enterBusy())
@@ -297,31 +315,7 @@ public class GridEventStorageManager extends GridManagerAdapter<EventStorageSpi>
                 }
 
             if (isRecordable(type))
-                notifyListeners(evt);
-        }
-        finally {
-            leaveBusy();
-        }
-    }
-
-    /**
-     * Records discovery events.
-     *
-     * @param evt Event to record.
-     * @param discoCache Discovery cache.
-     */
-    public void record(DiscoveryEvent evt, DiscoCache discoCache) {
-        assert evt != null;
-
-        if (!enterBusy())
-            return;
-
-        try {
-            // Notify internal discovery listeners first.
-            notifyDiscoveryListeners(evt, discoCache);
-
-            // Notify all other registered listeners.
-            record(evt);
+                notifyListeners(lsnrs.get(evt.type()), evt, params);
         }
         finally {
             leaveBusy();
@@ -571,6 +565,8 @@ public class GridEventStorageManager extends GridManagerAdapter<EventStorageSpi>
      * @param types Event types to subscribe listener for.
      */
     public void addLocalEventListener(IgnitePredicate<? extends Event> lsnr, int[] types) {
+        assert lsnr != null;
+
         try {
             ctx.resource().injectGeneric(lsnr);
         }
@@ -578,7 +574,7 @@ public class GridEventStorageManager extends GridManagerAdapter<EventStorageSpi>
             throw new IgniteException("Failed to inject resources to event listener: " + lsnr, e);
         }
 
-        addLocalEventListener(new UserListenerWrapper(lsnr), types);
+        addEventListener(new UserListenerWrapper(lsnr), types);
     }
 
     /**
@@ -594,20 +590,7 @@ public class GridEventStorageManager extends GridManagerAdapter<EventStorageSpi>
         assert types != null;
         assert types.length > 0;
 
-        if (!enterBusy())
-            return;
-
-        try {
-            for (int t : types) {
-                getOrCreate(lsnrs, t).add(lsnr);
-
-                if (!isRecordable(t))
-                    U.warn(log, "Added listener for disabled event type: " + U.gridEventName(t));
-            }
-        }
-        finally {
-            leaveBusy();
-        }
+        addEventListener(new LocalListenerWrapper(lsnr), types);
     }
 
     /**
@@ -620,27 +603,7 @@ public class GridEventStorageManager extends GridManagerAdapter<EventStorageSpi>
     public void addLocalEventListener(GridLocalEventListener lsnr, int type, @Nullable int... types) {
         assert lsnr != null;
 
-        if (!enterBusy())
-            return;
-
-        try {
-            getOrCreate(lsnrs, type).add(lsnr);
-
-            if (!isRecordable(type))
-                U.warn(log, "Added listener for disabled event type: " + U.gridEventName(type));
-
-            if (types != null) {
-                for (int t : types) {
-                    getOrCreate(lsnrs, t).add(lsnr);
-
-                    if (!isRecordable(t))
-                        U.warn(log, "Added listener for disabled event type: " + U.gridEventName(t));
-                }
-            }
-        }
-        finally {
-            leaveBusy();
-        }
+        addEventListener(new LocalListenerWrapper(lsnr), type, types);
     }
 
     /**
@@ -656,12 +619,40 @@ public class GridEventStorageManager extends GridManagerAdapter<EventStorageSpi>
         assert types != null;
         assert types.length > 0;
 
+        addEventListener(new DiscoveryListenerWrapper(lsnr), types);
+    }
+
+    /**
+     * Adds discovery event listener.
+     *
+     * @param lsnr Listener to add.
+     * @param type Event type to subscribe listener for.
+     * @param types Additional event types to subscribe listener for.
+     */
+    public void addDiscoveryEventListener(DiscoveryEventListener lsnr, int type, @Nullable int... types) {
+        assert lsnr != null;
+
+        addEventListener(new DiscoveryListenerWrapper(lsnr), type, types);
+    }
+
+    /**
+     * Adds local event listener. Note that this method specifically disallow an empty
+     * array of event type to prevent accidental subscription for all system event that
+     * may lead to a drastic performance decrease.
+     *
+     * @param lsnr Listener to add.
+     * @param types Event types to subscribe listener for.
+     */
+    private void addEventListener(EventListener lsnr, int[] types) {
         if (!enterBusy())
             return;
 
         try {
             for (int t : types) {
-                getOrCreate(discoLsnrs, t).add(lsnr);
+                getOrCreate(lsnrs, t).add(lsnr);
+
+                if (!isRecordable(t))
+                    U.warn(log, "Added listener for disabled event type: " + U.gridEventName(t));
             }
         }
         finally {
@@ -670,24 +661,28 @@ public class GridEventStorageManager extends GridManagerAdapter<EventStorageSpi>
     }
 
     /**
-     * Adds discovery event listener.
+     * Adds local event listener.
      *
      * @param lsnr Listener to add.
      * @param type Event type to subscribe listener for.
      * @param types Additional event types to subscribe listener for.
      */
-    public void addDiscoveryEventListener(DiscoveryEventListener lsnr, int type, @Nullable int... types) {
-        assert lsnr != null;
-
+    private void addEventListener(EventListener lsnr, int type, @Nullable int... types) {
         if (!enterBusy())
             return;
 
         try {
-            getOrCreate(discoLsnrs, type).add(lsnr);
+            getOrCreate(lsnrs, type).add(lsnr);
+
+            if (!isRecordable(type))
+                U.warn(log, "Added listener for disabled event type: " + U.gridEventName(type));
 
             if (types != null) {
                 for (int t : types) {
-                    getOrCreate(discoLsnrs, t).add(lsnr);
+                    getOrCreate(lsnrs, t).add(lsnr);
+
+                    if (!isRecordable(t))
+                        U.warn(log, "Added listener for disabled event type: " + U.gridEventName(t));
                 }
             }
         }
@@ -696,6 +691,7 @@ public class GridEventStorageManager extends GridManagerAdapter<EventStorageSpi>
         }
     }
 
+
     /**
      * @param lsnrs Listeners map.
      * @param type Event type.
@@ -727,7 +723,9 @@ public class GridEventStorageManager extends GridManagerAdapter<EventStorageSpi>
      * @return Returns {@code true} if removed.
      */
     public boolean removeLocalEventListener(IgnitePredicate<? extends Event> lsnr, @Nullable int... types) {
-        return removeLocalEventListener(new UserListenerWrapper(lsnr), types);
+        assert lsnr != null;
+
+        return removeEventListener(new UserListenerWrapper(lsnr), types);
     }
 
     /**
@@ -741,33 +739,21 @@ public class GridEventStorageManager extends GridManagerAdapter<EventStorageSpi>
     public boolean removeLocalEventListener(GridLocalEventListener lsnr, @Nullable int... types) {
         assert lsnr != null;
 
-        boolean found = false;
-
-        if (F.isEmpty(types)) {
-            for (Set<GridLocalEventListener> set : lsnrs.values())
-                if (set.remove(lsnr))
-                    found = true;
-        }
-        else {
-            assert types != null;
-
-            for (int type : types) {
-                Set<GridLocalEventListener> set = lsnrs.get(type);
-
-                if (set != null && set.remove(lsnr))
-                    found = true;
-            }
-        }
-
-        if (lsnr instanceof UserListenerWrapper)
-        {
-            IgnitePredicate p = ((UserListenerWrapper)lsnr).listener();
+        return removeEventListener(new LocalListenerWrapper(lsnr), types);
+    }
 
-            if (p instanceof PlatformEventFilterListener)
-                ((PlatformEventFilterListener)p).onClose();
-        }
+    /**
+     * Removes listener for specified events, if any. If no event types provided - it
+     * remove the listener for all its registered events.
+     *
+     * @param lsnr Listener.
+     * @param types Event types.
+     * @return Returns {@code true} if removed.
+     */
+    public boolean removeDiscoveryEventListener(DiscoveryEventListener lsnr, @Nullable int... types) {
+        assert lsnr != null;
 
-        return found;
+        return removeEventListener(new DiscoveryListenerWrapper(lsnr), types);
     }
 
     /**
@@ -778,13 +764,13 @@ public class GridEventStorageManager extends GridManagerAdapter<EventStorageSpi>
      * @param types Event types.
      * @return Returns {@code true} if removed.
      */
-    public boolean removeDiscoveryEventListener(DiscoveryEventListener lsnr, @Nullable int... types) {
+    private boolean removeEventListener(EventListener lsnr, @Nullable int[] types) {
         assert lsnr != null;
 
         boolean found = false;
 
         if (F.isEmpty(types)) {
-            for (Set<DiscoveryEventListener> set : discoLsnrs.values())
+            for (Set<EventListener> set : lsnrs.values())
                 if (set.remove(lsnr))
                     found = true;
         }
@@ -792,13 +778,21 @@ public class GridEventStorageManager extends GridManagerAdapter<EventStorageSpi>
             assert types != null;
 
             for (int type : types) {
-                Set<DiscoveryEventListener> set = discoLsnrs.get(type);
+                Set<EventListener> set = lsnrs.get(type);
 
                 if (set != null && set.remove(lsnr))
                     found = true;
             }
         }
 
+        if (lsnr instanceof UserListenerWrapper)
+        {
+            IgnitePredicate p = ((UserListenerWrapper)lsnr).listener();
+
+            if (p instanceof PlatformEventFilterListener)
+                ((PlatformEventFilterListener)p).onClose();
+        }
+
         return found;
     }
 
@@ -862,62 +856,18 @@ public class GridEventStorageManager extends GridManagerAdapter<EventStorageSpi>
     }
 
     /**
-     * @param evt Event to notify about.
-     */
-    private void notifyListeners(Event evt) {
-        assert evt != null;
-
-        notifyListeners(lsnrs.get(evt.type()), evt);
-    }
-
-    /**
      * @param set Set of listeners.
      * @param evt Grid event.
      */
-    private void notifyListeners(@Nullable Collection<GridLocalEventListener> set, Event evt) {
-        assert evt != null;
-
-        if (!F.isEmpty(set)) {
-            assert set != null;
-
-            for (GridLocalEventListener lsnr : set) {
-                try {
-                    lsnr.onEvent(evt);
-                }
-                catch (Throwable e) {
-                    U.error(log, "Unexpected exception in listener notification for event: " + evt, e);
-
-                    if (e instanceof Error)
-                        throw (Error)e;
-                }
-            }
-        }
-    }
-
-    /**
-     * @param evt Discovery event
-     * @param cache Discovery cache.
-     */
-    private void notifyDiscoveryListeners(DiscoveryEvent evt, DiscoCache cache) {
-        assert evt != null;
-
-        notifyDiscoveryListeners(discoLsnrs.get(evt.type()), evt, cache);
-    }
-
-    /**
-     * @param set Set of listeners.
-     * @param evt Discovery event.
-     * @param cache Discovery cache.
-     */
-    private void notifyDiscoveryListeners(@Nullable Collection<DiscoveryEventListener> set, DiscoveryEvent evt, DiscoCache cache) {
+    private void notifyListeners(@Nullable Collection<EventListener> set, Event evt, Object[] params) {
         assert evt != null;
 
         if (!F.isEmpty(set)) {
             assert set != null;
 
-            for (DiscoveryEventListener lsnr : set) {
+            for (EventListener lsnr : set) {
                 try {
-                    lsnr.onEvent(evt, cache);
+                    ((ListenerWrapper)lsnr).onEvent(evt, params);
                 }
                 catch (Throwable e) {
                     U.error(log, "Unexpected exception in listener notification for event: " + evt, e);
@@ -1307,10 +1257,93 @@ public class GridEventStorageManager extends GridManagerAdapter<EventStorageSpi>
         }
     }
 
+    /** */
+    private abstract static class ListenerWrapper implements EventListener {
+        abstract void onEvent(Event evt, Object[] params);
+    }
+
+    /**
+     * Wraps local listener
+     */
+    private static final class LocalListenerWrapper extends ListenerWrapper {
+        /** */
+        private final GridLocalEventListener lsnr;
+
+        /**
+         * @param lsnr Listener.
+         */
+        private LocalListenerWrapper(GridLocalEventListener lsnr) {
+            this.lsnr = lsnr;
+        }
+
+        /** {@inheritDoc} */
+        @Override void onEvent(Event evt, Object[] params) {
+            lsnr.onEvent(evt);
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean equals(Object o) {
+            if (this == o)
+                return true;
+
+            if (o == null || getClass() != o.getClass())
+                return false;
+
+            LocalListenerWrapper wrapper = (LocalListenerWrapper)o;
+
+            return lsnr.equals(wrapper.lsnr);
+        }
+
+        /** {@inheritDoc} */
+        @Override public int hashCode() {
+            return lsnr.hashCode();
+        }
+    }
+
+    /**
+     * Wraps discovery local listener
+     */
+    private static final class DiscoveryListenerWrapper extends ListenerWrapper {
+        /** */
+        private final DiscoveryEventListener lsnr;
+
+        /**
+         * @param lsnr Listener.
+         */
+        private DiscoveryListenerWrapper(DiscoveryEventListener lsnr) {
+            this.lsnr = lsnr;
+        }
+
+        /** {@inheritDoc} */
+        @Override void onEvent(Event evt, Object[] params) {
+            // No checks there since only DiscoveryManager produses DiscoveryEvents
+            // and it uses an overloaded method with additional parameters
+            lsnr.onEvent((DiscoveryEvent)evt, (DiscoCache)params[0]);
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean equals(Object o) {
+            if (this == o)
+                return true;
+
+            if (o == null || getClass() != o.getClass())
+                return false;
+
+            DiscoveryListenerWrapper wrapper = (DiscoveryListenerWrapper)o;
+
+            return lsnr.equals(wrapper.lsnr);
+        }
+
+        /** {@inheritDoc} */
+        @Override public int hashCode() {
+            return lsnr.hashCode();
+        }
+    }
+
     /**
-     * Wraps user listener predicate provided via {@link org.apache.ignite.IgniteEvents#localListen(org.apache.ignite.lang.IgnitePredicate, int...)}.
+     * Wraps user listener predicate provided via {@link IgniteEvents#localListen(IgnitePredicate, int...)}.
      */
-    private class UserListenerWrapper implements GridLocalEventListener {
+    private final class UserListenerWrapper extends ListenerWrapper {
         /** */
         private final IgnitePredicate<Event> lsnr;
 
@@ -1329,9 +1362,9 @@ public class GridEventStorageManager extends GridManagerAdapter<EventStorageSpi>
         }
 
         /** {@inheritDoc} */
-        @Override public void onEvent(Event evt) {
+        @Override void onEvent(Event evt, Object[] params) {
             if (!lsnr.apply(evt))
-                removeLocalEventListener(this);
+                removeEventListener(this, null);
         }
 
         /** {@inheritDoc} */