You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2015/05/25 20:50:44 UTC
[11/20] incubator-ignite git commit: IGNITE-709 Bug fix avoid hang
GridContinuousProcessor
IGNITE-709 Bug fix avoid hang GridContinuousProcessor
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/5cb0e664
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/5cb0e664
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/5cb0e664
Branch: refs/heads/ignite-23
Commit: 5cb0e6643ba82ec021e0bd8f1aaf0a65d28b8391
Parents: d411238
Author: sevdokimov <se...@gridgain.com>
Authored: Mon May 25 12:39:19 2015 +0300
Committer: sevdokimov <se...@gridgain.com>
Committed: Mon May 25 12:39:19 2015 +0300
----------------------------------------------------------------------
.../internal/GridEventConsumeHandler.java | 81 +++++++++++++++-----
1 file changed, 61 insertions(+), 20 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5cb0e664/modules/core/src/main/java/org/apache/ignite/internal/GridEventConsumeHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridEventConsumeHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/GridEventConsumeHandler.java
index 505204d..0bba809 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/GridEventConsumeHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/GridEventConsumeHandler.java
@@ -20,9 +20,9 @@ package org.apache.ignite.internal;
import org.apache.ignite.*;
import org.apache.ignite.cluster.*;
import org.apache.ignite.events.*;
+import org.apache.ignite.internal.cluster.*;
import org.apache.ignite.internal.interop.*;
import org.apache.ignite.internal.managers.deployment.*;
-import org.apache.ignite.internal.managers.discovery.*;
import org.apache.ignite.internal.managers.eventstorage.*;
import org.apache.ignite.internal.processors.cache.*;
import org.apache.ignite.internal.processors.continuous.*;
@@ -131,40 +131,81 @@ class GridEventConsumeHandler implements GridContinuousHandler {
final boolean loc = nodeId.equals(ctx.localNodeId());
lsnr = new GridLocalEventListener() {
- @Override public void onEvent(Event evt) {
+ /** node ID, routine ID, event */
+ private final Queue<T3<UUID, UUID, Event>> notificationQueue = new LinkedList<>();
+
+ private boolean notificationInProgress;
+
+ @Override public void onEvent(final Event evt) {
if (filter == null || filter.apply(evt)) {
if (loc) {
if (!cb.apply(nodeId, evt))
ctx.continuous().stopRoutine(routineId);
}
else {
- GridDiscoveryManager disco = ctx.discovery();
+ if (ctx.discovery().node(nodeId) == null)
+ return;
+
+ synchronized (notificationQueue) {
+ notificationQueue.add(new T3<>(nodeId, routineId, evt));
+
+ if (!notificationInProgress) {
+ ctx.getSystemExecutorService().submit(new Runnable() {
+ @Override public void run() {
+ while (true) {
+ T3<UUID, UUID, Event> t3;
+
+ synchronized (notificationQueue) {
+ t3 = notificationQueue.poll();
+
+ if (t3 == null) {
+ notificationInProgress = false;
- ClusterNode node = disco.node(nodeId);
+ return;
+ }
+ }
- if (node != null) {
- try {
- EventWrapper wrapper = new EventWrapper(evt);
+ try {
+ Event evt = t3.get3();
- if (evt instanceof CacheEvent) {
- String cacheName = ((CacheEvent)evt).cacheName();
+ EventWrapper wrapper = new EventWrapper(evt);
- if (ctx.config().isPeerClassLoadingEnabled() && disco.cacheNode(node, cacheName)) {
- wrapper.p2pMarshal(ctx.config().getMarshaller());
+ if (evt instanceof CacheEvent) {
+ String cacheName = ((CacheEvent)evt).cacheName();
- wrapper.cacheName = cacheName;
+ ClusterNode node = ctx.discovery().node(t3.get1());
- GridCacheDeploymentManager depMgr =
- ctx.cache().internalCache(cacheName).context().deploy();
+ if (node == null)
+ continue;
- depMgr.prepare(wrapper);
+ if (ctx.config().isPeerClassLoadingEnabled()
+ && ctx.discovery().cacheNode(node, cacheName)) {
+ wrapper.p2pMarshal(ctx.config().getMarshaller());
+
+ wrapper.cacheName = cacheName;
+
+ GridCacheDeploymentManager depMgr =
+ ctx.cache().internalCache(cacheName).context().deploy();
+
+ depMgr.prepare(wrapper);
+ }
+ }
+
+ ctx.continuous().addNotification(t3.get1(), t3.get2(), wrapper, null,
+ false, false);
+ }
+ catch (ClusterTopologyCheckedException ignored) {
+ // No-op.
+ }
+ catch (Throwable e) {
+ U.error(ctx.log(GridEventConsumeHandler.class),
+ "Failed to send event notification to node: " + nodeId, e);
+ }
+ }
}
- }
+ });
- ctx.continuous().addNotification(nodeId, routineId, wrapper, null, false, false);
- }
- catch (IgniteCheckedException e) {
- U.error(ctx.log(getClass()), "Failed to send event notification to node: " + nodeId, e);
+ notificationInProgress = true;
}
}
}