You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2015/05/25 20:50:50 UTC

[17/20] incubator-ignite git commit: # IGNITE-709 Check filter.apply(evt) in system thread, not in the discovery worker thread.

# IGNITE-709 Check filter.apply(evt) in system thread, not in the discovery worker thread.


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/3dd77820
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/3dd77820
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/3dd77820

Branch: refs/heads/ignite-23
Commit: 3dd778209c9341a2ffabcced703e31b9a1ae8705
Parents: 8aad099
Author: sevdokimov <se...@gridgain.com>
Authored: Mon May 25 18:31:33 2015 +0300
Committer: sevdokimov <se...@gridgain.com>
Committed: Mon May 25 18:32:55 2015 +0300

----------------------------------------------------------------------
 .../internal/GridEventConsumeHandler.java       | 109 ++++++++++---------
 1 file changed, 56 insertions(+), 53 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3dd77820/modules/core/src/main/java/org/apache/ignite/internal/GridEventConsumeHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridEventConsumeHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/GridEventConsumeHandler.java
index 0bba809..bb8366a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/GridEventConsumeHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/GridEventConsumeHandler.java
@@ -136,78 +136,81 @@ class GridEventConsumeHandler implements GridContinuousHandler {
 
             private boolean notificationInProgress;
 
-            @Override public void onEvent(final Event evt) {
-                if (filter == null || filter.apply(evt)) {
-                    if (loc) {
-                        if (!cb.apply(nodeId, evt))
-                            ctx.continuous().stopRoutine(routineId);
-                    }
-                    else {
-                        if (ctx.discovery().node(nodeId) == null)
-                            return;
+            @Override public void onEvent(Event evt) {
+                synchronized (notificationQueue) {
+                    notificationQueue.add(new T3<>(nodeId, routineId, evt));
 
-                        synchronized (notificationQueue) {
-                            notificationQueue.add(new T3<>(nodeId, routineId, evt));
+                    if (!notificationInProgress) {
+                        ctx.getSystemExecutorService().submit(new Runnable() {
+                            @Override public void run() {
+                                while (true) {
+                                    T3<UUID, UUID, Event> t3;
 
-                            if (!notificationInProgress) {
-                                ctx.getSystemExecutorService().submit(new Runnable() {
-                                    @Override public void run() {
-                                        while (true) {
-                                            T3<UUID, UUID, Event> t3;
+                                    synchronized (notificationQueue) {
+                                        t3 = notificationQueue.poll();
 
-                                            synchronized (notificationQueue) {
-                                                t3 = notificationQueue.poll();
+                                        if (t3 == null) {
+                                            notificationInProgress = false;
 
-                                                if (t3 == null) {
-                                                    notificationInProgress = false;
+                                            return;
+                                        }
+                                    }
 
-                                                    return;
-                                                }
-                                            }
+                                    try {
+                                        Event evt = t3.get3();
 
-                                            try {
-                                                Event evt = t3.get3();
+                                        if (filter != null && !filter.apply(evt))
+                                            continue;
 
-                                                EventWrapper wrapper = new EventWrapper(evt);
+                                        if (loc) {
+                                            if (!cb.apply(nodeId, evt)) {
+                                                ctx.continuous().stopRoutine(routineId);
 
-                                                if (evt instanceof CacheEvent) {
-                                                    String cacheName = ((CacheEvent)evt).cacheName();
+                                                return;
+                                            }
 
-                                                    ClusterNode node = ctx.discovery().node(t3.get1());
+                                            continue;
+                                        }
 
-                                                    if (node == null)
-                                                        continue;
+                                        ClusterNode node = ctx.discovery().node(t3.get1());
 
-                                                    if (ctx.config().isPeerClassLoadingEnabled()
-                                                        && ctx.discovery().cacheNode(node, cacheName)) {
-                                                        wrapper.p2pMarshal(ctx.config().getMarshaller());
+                                        if (node == null)
+                                            continue;
 
-                                                        wrapper.cacheName = cacheName;
+                                        EventWrapper wrapper = new EventWrapper(evt);
 
-                                                        GridCacheDeploymentManager depMgr =
-                                                            ctx.cache().internalCache(cacheName).context().deploy();
+                                        if (evt instanceof CacheEvent) {
+                                            String cacheName = ((CacheEvent)evt).cacheName();
 
-                                                        depMgr.prepare(wrapper);
-                                                    }
-                                                }
+                                            if (ctx.config().isPeerClassLoadingEnabled()
+                                                && ctx.discovery().cacheNode(node, cacheName)) {
+                                                wrapper.p2pMarshal(ctx.config().getMarshaller());
 
-                                                ctx.continuous().addNotification(t3.get1(), t3.get2(), wrapper, null,
-                                                    false, false);
-                                            }
-                                            catch (ClusterTopologyCheckedException ignored) {
-                                                // No-op.
-                                            }
-                                            catch (Throwable e) {
-                                                U.error(ctx.log(GridEventConsumeHandler.class),
-                                                    "Failed to send event notification to node: " + nodeId, e);
+                                                wrapper.cacheName = cacheName;
+
+                                                GridCacheDeploymentManager depMgr =
+                                                    ctx.cache().internalCache(cacheName).context().deploy();
+
+                                                depMgr.prepare(wrapper);
                                             }
                                         }
-                                    }
-                                });
 
-                                notificationInProgress = true;
+                                        ctx.continuous().addNotification(t3.get1(), t3.get2(), wrapper, null,
+                                            false, false);
+                                    }
+                                    catch (ClusterTopologyCheckedException
+                                        | IgniteInterruptedCheckedException ignored) {
+                                        // No-op.
+                                    }
+                                    catch (Throwable e) {
+                                        U.error(ctx.log(GridEventConsumeHandler.class),
+                                            "Failed to send event notification to node: " + nodeId, e);
+                                    }
+                                }
                             }
-                        }
+                        });
+
+                        notificationInProgress = true;
                     }
                 }
             }