You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2015/05/25 20:50:50 UTC
[17/20] incubator-ignite git commit: # IGNITE-709 Check
filter.apply(evt) in system thread, not in the discovery worker thread.
# IGNITE-709 Check filter.apply(evt) in system thread, not in the discovery worker thread.
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/3dd77820
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/3dd77820
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/3dd77820
Branch: refs/heads/ignite-23
Commit: 3dd778209c9341a2ffabcced703e31b9a1ae8705
Parents: 8aad099
Author: sevdokimov <se...@gridgain.com>
Authored: Mon May 25 18:31:33 2015 +0300
Committer: sevdokimov <se...@gridgain.com>
Committed: Mon May 25 18:32:55 2015 +0300
----------------------------------------------------------------------
.../internal/GridEventConsumeHandler.java | 109 ++++++++++---------
1 file changed, 56 insertions(+), 53 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3dd77820/modules/core/src/main/java/org/apache/ignite/internal/GridEventConsumeHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridEventConsumeHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/GridEventConsumeHandler.java
index 0bba809..bb8366a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/GridEventConsumeHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/GridEventConsumeHandler.java
@@ -136,78 +136,81 @@ class GridEventConsumeHandler implements GridContinuousHandler {
private boolean notificationInProgress;
- @Override public void onEvent(final Event evt) {
- if (filter == null || filter.apply(evt)) {
- if (loc) {
- if (!cb.apply(nodeId, evt))
- ctx.continuous().stopRoutine(routineId);
- }
- else {
- if (ctx.discovery().node(nodeId) == null)
- return;
+ @Override public void onEvent(Event evt) {
+ synchronized (notificationQueue) {
+ notificationQueue.add(new T3<>(nodeId, routineId, evt));
- synchronized (notificationQueue) {
- notificationQueue.add(new T3<>(nodeId, routineId, evt));
+ if (!notificationInProgress) {
+ ctx.getSystemExecutorService().submit(new Runnable() {
+ @Override public void run() {
+ while (true) {
+ T3<UUID, UUID, Event> t3;
- if (!notificationInProgress) {
- ctx.getSystemExecutorService().submit(new Runnable() {
- @Override public void run() {
- while (true) {
- T3<UUID, UUID, Event> t3;
+ synchronized (notificationQueue) {
+ t3 = notificationQueue.poll();
- synchronized (notificationQueue) {
- t3 = notificationQueue.poll();
+ if (t3 == null) {
+ notificationInProgress = false;
- if (t3 == null) {
- notificationInProgress = false;
+ return;
+ }
+ }
- return;
- }
- }
+ try {
+ Event evt = t3.get3();
- try {
- Event evt = t3.get3();
+ if (filter != null && !filter.apply(evt))
+ continue;
- EventWrapper wrapper = new EventWrapper(evt);
+ if (loc) {
+ if (!cb.apply(nodeId, evt)) {
+ ctx.continuous().stopRoutine(routineId);
- if (evt instanceof CacheEvent) {
- String cacheName = ((CacheEvent)evt).cacheName();
+ return;
+ }
- ClusterNode node = ctx.discovery().node(t3.get1());
+ continue;
+ }
- if (node == null)
- continue;
+ ClusterNode node = ctx.discovery().node(t3.get1());
- if (ctx.config().isPeerClassLoadingEnabled()
- && ctx.discovery().cacheNode(node, cacheName)) {
- wrapper.p2pMarshal(ctx.config().getMarshaller());
+ if (node == null)
+ continue;
- wrapper.cacheName = cacheName;
+ EventWrapper wrapper = new EventWrapper(evt);
- GridCacheDeploymentManager depMgr =
- ctx.cache().internalCache(cacheName).context().deploy();
+ if (evt instanceof CacheEvent) {
+ String cacheName = ((CacheEvent)evt).cacheName();
- depMgr.prepare(wrapper);
- }
- }
+ if (ctx.config().isPeerClassLoadingEnabled()
+ && ctx.discovery().cacheNode(node, cacheName)) {
+ wrapper.p2pMarshal(ctx.config().getMarshaller());
- ctx.continuous().addNotification(t3.get1(), t3.get2(), wrapper, null,
- false, false);
- }
- catch (ClusterTopologyCheckedException ignored) {
- // No-op.
- }
- catch (Throwable e) {
- U.error(ctx.log(GridEventConsumeHandler.class),
- "Failed to send event notification to node: " + nodeId, e);
+ wrapper.cacheName = cacheName;
+
+ GridCacheDeploymentManager depMgr =
+ ctx.cache().internalCache(cacheName).context().deploy();
+
+ depMgr.prepare(wrapper);
}
}
- }
- });
- notificationInProgress = true;
+ ctx.continuous().addNotification(t3.get1(), t3.get2(), wrapper, null,
+ false, false);
+ }
+ catch (ClusterTopologyCheckedException
+ | IgniteInterruptedCheckedException ignored) {
+ // No-op.
+ }
+ catch (Throwable e) {
+ U.error(ctx.log(GridEventConsumeHandler.class),
+ "Failed to send event notification to node: " + nodeId, e);
+ }
+ }
}
- }
+ });
+
+ notificationInProgress = true;
}
}
}