You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2015/05/25 20:50:44 UTC

[11/20] incubator-ignite git commit: IGNITE-709 Bug fix avoid hang GridContinuousProcessor

IGNITE-709 Bug fix avoid hang GridContinuousProcessor


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/5cb0e664
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/5cb0e664
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/5cb0e664

Branch: refs/heads/ignite-23
Commit: 5cb0e6643ba82ec021e0bd8f1aaf0a65d28b8391
Parents: d411238
Author: sevdokimov <se...@gridgain.com>
Authored: Mon May 25 12:39:19 2015 +0300
Committer: sevdokimov <se...@gridgain.com>
Committed: Mon May 25 12:39:19 2015 +0300

----------------------------------------------------------------------
 .../internal/GridEventConsumeHandler.java       | 81 +++++++++++++++-----
 1 file changed, 61 insertions(+), 20 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5cb0e664/modules/core/src/main/java/org/apache/ignite/internal/GridEventConsumeHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridEventConsumeHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/GridEventConsumeHandler.java
index 505204d..0bba809 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/GridEventConsumeHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/GridEventConsumeHandler.java
@@ -20,9 +20,9 @@ package org.apache.ignite.internal;
 import org.apache.ignite.*;
 import org.apache.ignite.cluster.*;
 import org.apache.ignite.events.*;
+import org.apache.ignite.internal.cluster.*;
 import org.apache.ignite.internal.interop.*;
 import org.apache.ignite.internal.managers.deployment.*;
-import org.apache.ignite.internal.managers.discovery.*;
 import org.apache.ignite.internal.managers.eventstorage.*;
 import org.apache.ignite.internal.processors.cache.*;
 import org.apache.ignite.internal.processors.continuous.*;
@@ -131,40 +131,81 @@ class GridEventConsumeHandler implements GridContinuousHandler {
         final boolean loc = nodeId.equals(ctx.localNodeId());
 
         lsnr = new GridLocalEventListener() {
-            @Override public void onEvent(Event evt) {
+            /** node ID, routine ID, event */
+            private final Queue<T3<UUID, UUID, Event>> notificationQueue = new LinkedList<>();
+
+            private boolean notificationInProgress;
+
+            @Override public void onEvent(final Event evt) {
                 if (filter == null || filter.apply(evt)) {
                     if (loc) {
                         if (!cb.apply(nodeId, evt))
                             ctx.continuous().stopRoutine(routineId);
                     }
                     else {
-                        GridDiscoveryManager disco = ctx.discovery();
+                        if (ctx.discovery().node(nodeId) == null)
+                            return;
+
+                        synchronized (notificationQueue) {
+                            notificationQueue.add(new T3<>(nodeId, routineId, evt));
+
+                            if (!notificationInProgress) {
+                                ctx.getSystemExecutorService().submit(new Runnable() {
+                                    @Override public void run() {
+                                        while (true) {
+                                            T3<UUID, UUID, Event> t3;
+
+                                            synchronized (notificationQueue) {
+                                                t3 = notificationQueue.poll();
+
+                                                if (t3 == null) {
+                                                    notificationInProgress = false;
 
-                        ClusterNode node = disco.node(nodeId);
+                                                    return;
+                                                }
+                                            }
 
-                        if (node != null) {
-                            try {
-                                EventWrapper wrapper = new EventWrapper(evt);
+                                            try {
+                                                Event evt = t3.get3();
 
-                                if (evt instanceof CacheEvent) {
-                                    String cacheName = ((CacheEvent)evt).cacheName();
+                                                EventWrapper wrapper = new EventWrapper(evt);
 
-                                    if (ctx.config().isPeerClassLoadingEnabled() && disco.cacheNode(node, cacheName)) {
-                                        wrapper.p2pMarshal(ctx.config().getMarshaller());
+                                                if (evt instanceof CacheEvent) {
+                                                    String cacheName = ((CacheEvent)evt).cacheName();
 
-                                        wrapper.cacheName = cacheName;
+                                                    ClusterNode node = ctx.discovery().node(t3.get1());
 
-                                        GridCacheDeploymentManager depMgr =
-                                            ctx.cache().internalCache(cacheName).context().deploy();
+                                                    if (node == null)
+                                                        continue;
 
-                                        depMgr.prepare(wrapper);
+                                                    if (ctx.config().isPeerClassLoadingEnabled()
+                                                        && ctx.discovery().cacheNode(node, cacheName)) {
+                                                        wrapper.p2pMarshal(ctx.config().getMarshaller());
+
+                                                        wrapper.cacheName = cacheName;
+
+                                                        GridCacheDeploymentManager depMgr =
+                                                            ctx.cache().internalCache(cacheName).context().deploy();
+
+                                                        depMgr.prepare(wrapper);
+                                                    }
+                                                }
+
+                                                ctx.continuous().addNotification(t3.get1(), t3.get2(), wrapper, null,
+                                                    false, false);
+                                            }
+                                            catch (ClusterTopologyCheckedException ignored) {
+                                                // No-op.
+                                            }
+                                            catch (Throwable e) {
+                                                U.error(ctx.log(GridEventConsumeHandler.class),
+                                                    "Failed to send event notification to node: " + nodeId, e);
+                                            }
+                                        }
                                     }
-                                }
+                                });
 
-                                ctx.continuous().addNotification(nodeId, routineId, wrapper, null, false, false);
-                            }
-                            catch (IgniteCheckedException e) {
-                                U.error(ctx.log(getClass()), "Failed to send event notification to node: " + nodeId, e);
+                                notificationInProgress = true;
                             }
                         }
                     }