You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2017/07/11 09:38:32 UTC
ignite git commit: ignite-5727
Repository: ignite
Updated Branches:
refs/heads/ignite-5727 [created] 560e1025e
ignite-5727
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/560e1025
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/560e1025
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/560e1025
Branch: refs/heads/ignite-5727
Commit: 560e1025e31c719f2b0e69253bd26f7548528107
Parents: 2a2c803
Author: sboikov <sb...@gridgain.com>
Authored: Tue Jul 11 12:38:24 2017 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Tue Jul 11 12:38:24 2017 +0300
----------------------------------------------------------------------
.../eventstorage/GridEventStorageManager.java | 275 ++++++++++---------
.../eventstorage/HighPriorityListener.java | 25 ++
.../processors/cache/GridCacheMvccManager.java | 5 -
.../processors/cache/GridCacheProcessor.java | 2 -
.../communication/tcp/TcpCommunicationSpi.java | 24 +-
5 files changed, 179 insertions(+), 152 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/560e1025/modules/core/src/main/java/org/apache/ignite/internal/managers/eventstorage/GridEventStorageManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/eventstorage/GridEventStorageManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/eventstorage/GridEventStorageManager.java
index 1714cbb..dd54b83 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/eventstorage/GridEventStorageManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/eventstorage/GridEventStorageManager.java
@@ -83,10 +83,7 @@ import static org.apache.ignite.internal.managers.communication.GridIoPolicy.PUB
*/
public class GridEventStorageManager extends GridManagerAdapter<EventStorageSpi> {
/** Local event listeners. */
- private final ConcurrentMap<Integer, Set<EventListener>> lsnrs = new ConcurrentHashMap8<>();
-
- /** Internal discovery listeners. */
- private final ConcurrentMap<Integer, Set<DiscoveryEventListener>> discoLsnrs = new ConcurrentHashMap8<>();
+ private final ConcurrentMap<Integer, Listeners> lsnrs = new ConcurrentHashMap8<>();
/** Busy lock to control activity of threads. */
private final ReadWriteLock busyLock = new ReentrantReadWriteLock();
@@ -208,8 +205,8 @@ public class GridEventStorageManager extends GridManagerAdapter<EventStorageSpi>
@Override public void printMemoryStats() {
int lsnrsCnt = 0;
- for (Set<EventListener> lsnrs0 : lsnrs.values())
- lsnrsCnt += lsnrs0.size();
+ for (Listeners lsnrs0 : lsnrs.values())
+ lsnrsCnt += lsnrs0.lsnrs.size();
X.println(">>>");
X.println(">>> Event storage manager memory stats [igniteInstanceName=" + ctx.igniteInstanceName() + ']');
@@ -250,9 +247,7 @@ public class GridEventStorageManager extends GridManagerAdapter<EventStorageSpi>
try {
if (msgLsnr != null)
- ctx.io().removeMessageListener(
- TOPIC_EVENT,
- msgLsnr);
+ ctx.io().removeMessageListener(TOPIC_EVENT, msgLsnr);
msgLsnr = null;
@@ -332,13 +327,14 @@ public class GridEventStorageManager extends GridManagerAdapter<EventStorageSpi>
}
// Override user recordable settings for daemon node.
- if ((isDaemon || isUserRecordable(type)) && !isHiddenEvent(type))
+ if ((isDaemon || isUserRecordable(type)) && !isHiddenEvent(type)) {
try {
getSpi().record(evt);
}
catch (IgniteSpiException e) {
U.error(log, "Failed to record event: " + evt, e);
}
+ }
if (isRecordable(type))
notifyListeners(lsnrs.get(evt.type()), evt, params);
@@ -669,17 +665,13 @@ public class GridEventStorageManager extends GridManagerAdapter<EventStorageSpi>
* @param lsnr Listener to add.
* @param types Event types to subscribe listener for.
*/
- private void addEventListener(EventListener lsnr, int[] types) {
+ private void addEventListener(ListenerWrapper lsnr, int[] types) {
if (!enterBusy())
return;
try {
- for (int t : types) {
- getOrCreate(lsnrs, t).add(lsnr);
-
- if (!isRecordable(t))
- U.warn(log, "Added listener for disabled event type: " + U.gridEventName(t));
- }
+ for (int t : types)
+ registerListener(lsnr, t);
}
finally {
leaveBusy();
@@ -693,23 +685,16 @@ public class GridEventStorageManager extends GridManagerAdapter<EventStorageSpi>
* @param type Event type to subscribe listener for.
* @param types Additional event types to subscribe listener for.
*/
- private void addEventListener(EventListener lsnr, int type, @Nullable int... types) {
+ private void addEventListener(ListenerWrapper lsnr, int type, @Nullable int... types) {
if (!enterBusy())
return;
try {
- getOrCreate(lsnrs, type).add(lsnr);
-
- if (!isRecordable(type))
- U.warn(log, "Added listener for disabled event type: " + U.gridEventName(type));
+ registerListener(lsnr, type);
if (types != null) {
- for (int t : types) {
- getOrCreate(lsnrs, t).add(lsnr);
-
- if (!isRecordable(t))
- U.warn(log, "Added listener for disabled event type: " + U.gridEventName(t));
- }
+ for (int t : types)
+ registerListener(lsnr, t);
}
}
finally {
@@ -718,25 +703,25 @@ public class GridEventStorageManager extends GridManagerAdapter<EventStorageSpi>
}
/**
- * @param lsnrs Listeners map.
+ * @param lsnr Listener.
* @param type Event type.
- * @return Listeners for given event type.
*/
- private <T> Collection<T> getOrCreate(ConcurrentMap<Integer, Set<T>> lsnrs, Integer type) {
- Set<T> set = lsnrs.get(type);
+ private void registerListener(ListenerWrapper lsnr, int type) {
+ Listeners lsnrs0 = lsnrs.get(type);
- if (set == null) {
- set = new GridConcurrentLinkedHashSet<>();
+ if (lsnrs0 == null) {
+ lsnrs0 = new Listeners();
- Set<T> prev = lsnrs.putIfAbsent(type, set);
+ Listeners prev = lsnrs.putIfAbsent(type, lsnrs0);
if (prev != null)
- set = prev;
+ lsnrs0 = prev;
}
- assert set != null;
+ lsnrs0.addListener(lsnr);
- return set;
+ if (!isRecordable(type))
+ U.warn(log, "Added listener for disabled event type: " + U.gridEventName(type));
}
/**
@@ -789,29 +774,29 @@ public class GridEventStorageManager extends GridManagerAdapter<EventStorageSpi>
* @param types Event types.
* @return Returns {@code true} if removed.
*/
- private boolean removeEventListener(EventListener lsnr, @Nullable int[] types) {
+ private boolean removeEventListener(ListenerWrapper lsnr, @Nullable int[] types) {
assert lsnr != null;
boolean found = false;
if (F.isEmpty(types)) {
- for (Set<EventListener> set : lsnrs.values())
- if (set.remove(lsnr))
+ for (Listeners set : lsnrs.values()) {
+ if (set.removeListener(lsnr))
found = true;
+ }
}
else {
assert types != null;
for (int type : types) {
- Set<EventListener> set = lsnrs.get(type);
+ Listeners set = lsnrs.get(type);
- if (set != null && set.remove(lsnr))
+ if (set != null && set.removeListener(lsnr))
found = true;
}
}
- if (lsnr instanceof UserListenerWrapper)
- {
+ if (lsnr instanceof UserListenerWrapper) {
IgnitePredicate p = ((UserListenerWrapper)lsnr).listener();
if (p instanceof PlatformEventFilterListener)
@@ -845,96 +830,38 @@ public class GridEventStorageManager extends GridManagerAdapter<EventStorageSpi>
}
/**
- *
- * @param timeout Timeout.
- * @param c Optional continuation.
- * @param p Optional predicate.
- * @param types Event types to wait for.
- * @return Event.
- * @throws IgniteCheckedException Thrown in case of any errors.
- */
- public Event waitForEvent(long timeout, @Nullable Runnable c,
- @Nullable final IgnitePredicate<? super Event> p, int... types) throws IgniteCheckedException {
- assert timeout >= 0;
-
- final GridFutureAdapter<Event> fut = new GridFutureAdapter<>();
-
- addLocalEventListener(new GridLocalEventListener() {
- @Override public void onEvent(Event evt) {
- if (p == null || p.apply(evt)) {
- fut.onDone(evt);
-
- removeLocalEventListener(this);
- }
- }
- }, types);
-
- try {
- if (c != null)
- c.run();
- }
- catch (Exception e) {
- throw new IgniteCheckedException(e);
- }
-
- return fut.get(timeout);
- }
-
- /**
- * @param set Set of listeners.
+ * @param lsnrs Set of listeners.
* @param evt Grid event.
+ * @param params Event parameters.
*/
- private void notifyListeners(@Nullable Collection<EventListener> set, Event evt, Object[] params) {
+ private void notifyListeners(@Nullable Listeners lsnrs, Event evt, Object[] params) {
assert evt != null;
- if (!F.isEmpty(set)) {
- assert set != null;
-
- for (EventListener lsnr : set) {
- try {
- ((ListenerWrapper)lsnr).onEvent(evt, params);
- }
- catch (Throwable e) {
- U.error(log, "Unexpected exception in listener notification for event: " + evt, e);
+ if (lsnrs != null) {
+ notifyListeners(lsnrs.highPriorityLsnrs, evt, params);
- if (e instanceof Error)
- throw (Error)e;
- }
- }
+ notifyListeners(lsnrs.lsnrs, evt, params);
}
}
/**
- * @param evt Discovery event
- * @param cache Discovery cache.
- */
- private void notifyDiscoveryListeners(DiscoveryEvent evt, DiscoCache cache) {
- assert evt != null;
-
- notifyDiscoveryListeners(discoLsnrs.get(evt.type()), evt, cache);
- }
-
- /**
- * @param set Set of listeners.
- * @param evt Discovery event.
- * @param cache Discovery cache.
+ * @param lsnrs Listeners collection.
+ * @param evt Event.
+ * @param params Event parameters.
*/
- private void notifyDiscoveryListeners(@Nullable Collection<DiscoveryEventListener> set, DiscoveryEvent evt, DiscoCache cache) {
- assert evt != null;
-
- if (!F.isEmpty(set)) {
- assert set != null;
+ private void notifyListeners(@Nullable Collection<ListenerWrapper> lsnrs, Event evt, Object[] params) {
+ if (lsnrs == null || lsnrs.isEmpty())
+ return;
- for (DiscoveryEventListener lsnr : set) {
- try {
- lsnr.onEvent(evt, cache);
- }
- catch (Throwable e) {
- U.error(log, "Unexpected exception in listener notification for event: " + evt, e);
+ for (EventListener lsnr : lsnrs) {
+ try {
+ ((ListenerWrapper)lsnr).onEvent(evt, params);
+ }
+ catch (Throwable e) {
+ U.error(log, "Unexpected exception in listener notification for event: " + evt, e);
- if (e instanceof Error)
- throw (Error)e;
- }
+ if (e instanceof Error)
+ throw (Error)e;
}
}
}
@@ -1208,16 +1135,6 @@ public class GridEventStorageManager extends GridManagerAdapter<EventStorageSpi>
}
/**
- * @param arr Array.
- * @return Array copy.
- */
- private boolean[] copy(boolean[] arr) {
- assert arr != null;
-
- return Arrays.copyOf(arr, arr.length);
- }
-
- /**
*
*/
private class RequestListener implements GridMessageListener {
@@ -1329,9 +1246,81 @@ public class GridEventStorageManager extends GridManagerAdapter<EventStorageSpi>
}
}
- /** */
+ /**
+ *
+ */
+ static class Listeners {
+ /** */
+ private volatile List<ListenerWrapper> highPriorityLsnrs;
+
+ /** */
+ private final Set<ListenerWrapper> lsnrs = new GridConcurrentLinkedHashSet<>();
+
+ /**
+ * @param lsnr Listener to add.
+ */
+ void addListener(ListenerWrapper lsnr) {
+ if (lsnr.highPriority()) {
+ synchronized (this) {
+ List<ListenerWrapper> curLsnrs = highPriorityLsnrs;
+ List<ListenerWrapper> newLsnrs = new ArrayList<>();
+
+ if (curLsnrs != null)
+ newLsnrs.addAll(curLsnrs);
+
+ assert !newLsnrs.contains(lsnr) : lsnr;
+
+ newLsnrs.add(lsnr);
+
+ highPriorityLsnrs = newLsnrs;
+ }
+ }
+ else
+ lsnrs.add(lsnr);
+ }
+
+ /**
+ * @param lsnr Listener to remove.
+ * @return {@code True}
+ */
+ boolean removeListener(ListenerWrapper lsnr) {
+ if (lsnr.highPriority()) {
+ synchronized (this) {
+ List<ListenerWrapper> curLsnrs = highPriorityLsnrs;
+
+ if (curLsnrs == null)
+ return false;
+
+ List<ListenerWrapper> newLsnrs = new ArrayList<>(curLsnrs);
+
+ if (newLsnrs.remove(lsnr)) {
+ highPriorityLsnrs = newLsnrs;
+
+ return true;
+ }
+
+ return false;
+ }
+ }
+ else
+ return lsnrs.remove(lsnr);
+ }
+ }
+
+ /**
+ *
+ */
private abstract static class ListenerWrapper implements EventListener {
+ /**
+ * @param evt Event.
+ * @param params Parameters.
+ */
abstract void onEvent(Event evt, Object[] params);
+
+ /**
+ * @return {@code True} if high priority listener.
+ */
+ abstract boolean highPriority();
}
/**
@@ -1370,6 +1359,11 @@ public class GridEventStorageManager extends GridManagerAdapter<EventStorageSpi>
@Override public int hashCode() {
return lsnr.hashCode();
}
+
+ /** {@inheritDoc} */
+ @Override boolean highPriority() {
+ return lsnr instanceof HighPriorityListener;
+ }
}
/**
@@ -1388,7 +1382,7 @@ public class GridEventStorageManager extends GridManagerAdapter<EventStorageSpi>
/** {@inheritDoc} */
@Override void onEvent(Event evt, Object[] params) {
- // No checks there since only DiscoveryManager produses DiscoveryEvents
+ // No checks there since only DiscoveryManager produces DiscoveryEvents
// and it uses an overloaded method with additional parameters
lsnr.onEvent((DiscoveryEvent)evt, (DiscoCache)params[0]);
}
@@ -1410,6 +1404,11 @@ public class GridEventStorageManager extends GridManagerAdapter<EventStorageSpi>
@Override public int hashCode() {
return lsnr.hashCode();
}
+
+ /** {@inheritDoc} */
+ @Override boolean highPriority() {
+ return lsnr instanceof HighPriorityListener;
+ }
}
/**
@@ -1450,12 +1449,16 @@ public class GridEventStorageManager extends GridManagerAdapter<EventStorageSpi>
UserListenerWrapper that = (UserListenerWrapper)o;
return lsnr.equals(that.lsnr);
-
}
/** {@inheritDoc} */
@Override public int hashCode() {
return lsnr.hashCode();
}
+
+ /** {@inheritDoc} */
+ @Override boolean highPriority() {
+ return false;
+ }
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/560e1025/modules/core/src/main/java/org/apache/ignite/internal/managers/eventstorage/HighPriorityListener.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/eventstorage/HighPriorityListener.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/eventstorage/HighPriorityListener.java
new file mode 100644
index 0000000..a840f80
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/eventstorage/HighPriorityListener.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.managers.eventstorage;
+
+/**
+ *
+ */
+public interface HighPriorityListener {
+ // No-op.
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/560e1025/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java
index a6907b9..b156708 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java
@@ -272,12 +272,7 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter {
exchLog = cctx.logger(getClass().getName() + ".exchange");
pendingExplicit = GridConcurrentFactory.newMap();
- }
- /**
- * Cache futures listener must be registered after communication listener.
- */
- public void registerEventListener() {
cctx.gridEvents().addLocalEventListener(discoLsnr, EVT_NODE_FAILED, EVT_NODE_LEFT);
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/560e1025/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
index 0488a14..9cedac6 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
@@ -838,8 +838,6 @@ public class GridCacheProcessor extends GridProcessorAdapter {
ctx.query().onCacheKernalStart();
- sharedCtx.mvcc().registerEventListener();
-
sharedCtx.exchange().onKernalStart(active, false);
}
finally {
http://git-wip-us.apache.org/repos/asf/ignite/blob/560e1025/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
index 8cdb3c0..9c885ce 100755
--- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
@@ -64,6 +64,7 @@ import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.IgniteInterruptedCheckedException;
import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener;
+import org.apache.ignite.internal.managers.eventstorage.HighPriorityListener;
import org.apache.ignite.internal.util.GridConcurrentFactory;
import org.apache.ignite.internal.util.GridSpinReadWriteLock;
import org.apache.ignite.internal.util.future.GridFinishedFuture;
@@ -1091,15 +1092,8 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati
/** */
private final ConcurrentMap<ConnectionKey, GridNioRecoveryDescriptor> inRecDescs = GridConcurrentFactory.newMap();
- /** Discovery listener. */
- private final GridLocalEventListener discoLsnr = new GridLocalEventListener() {
- @Override public void onEvent(Event evt) {
- assert evt instanceof DiscoveryEvent : evt;
- assert evt.type() == EVT_NODE_LEFT || evt.type() == EVT_NODE_FAILED ;
-
- onNodeLeft(((DiscoveryEvent)evt).eventNode().id());
- }
- };
+ /** */
+ private final GridLocalEventListener discoLsnr = new DiscoveryListener();
/**
* @return {@code True} if ssl enabled.
@@ -3751,6 +3745,18 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati
/**
*
*/
+ private class DiscoveryListener implements GridLocalEventListener, HighPriorityListener {
+ @Override public void onEvent(Event evt) {
+ assert evt instanceof DiscoveryEvent : evt;
+ assert evt.type() == EVT_NODE_LEFT || evt.type() == EVT_NODE_FAILED ;
+
+ onNodeLeft(((DiscoveryEvent)evt).eventNode().id());
+ }
+ }
+
+ /**
+ *
+ */
private class ShmemWorker extends GridWorker {
/** */
private final IpcEndpoint endpoint;