You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ag...@apache.org on 2017/11/29 10:28:50 UTC

[09/12] ignite git commit: Minor: moved custom events processing in GridContinuousProcessor's methods.

Minor: moved custom events processing in GridContinuousProcessor's methods.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/fe806701
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/fe806701
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/fe806701

Branch: refs/heads/ignite-7016
Commit: fe806701ec42c951378d32ff931e98411260f997
Parents: 6afbc09
Author: sboikov <sb...@gridgain.com>
Authored: Wed Nov 29 11:34:23 2017 +0300
Committer: Alexey Goncharuk <al...@gmail.com>
Committed: Wed Nov 29 13:28:34 2017 +0300

----------------------------------------------------------------------
 .../continuous/GridContinuousProcessor.java     | 132 ++++++++++++-------
 1 file changed, 85 insertions(+), 47 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/fe806701/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
index fa52be2..571d654 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
@@ -176,8 +176,10 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
                 @Override public void onCustomEvent(AffinityTopologyVersion topVer,
                     ClusterNode snd,
                     StartRoutineDiscoveryMessage msg) {
-                    if (!snd.id().equals(ctx.localNodeId()) && !ctx.isStopping())
-                        processStartRequest(snd, msg);
+                    if (ctx.isStopping())
+                        return;
+
+                    processStartRequest(snd, msg);
                 }
             });
 
@@ -186,39 +188,10 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
                 @Override public void onCustomEvent(AffinityTopologyVersion topVer,
                     ClusterNode snd,
                     StartRoutineAckDiscoveryMessage msg) {
-                    StartFuture fut = startFuts.remove(msg.routineId());
-
-                    if (fut != null) {
-                        if (msg.errs().isEmpty()) {
-                            LocalRoutineInfo routine = locInfos.get(msg.routineId());
-
-                            // Update partition counters.
-                            if (routine != null && routine.handler().isQuery()) {
-                                Map<UUID, Map<Integer, T2<Long, Long>>> cntrsPerNode = msg.updateCountersPerNode();
-                                Map<Integer, T2<Long, Long>> cntrs = msg.updateCounters();
-
-                                GridCacheAdapter<Object, Object> interCache =
-                                    ctx.cache().internalCache(routine.handler().cacheName());
-
-                                GridCacheContext cctx = interCache != null ? interCache.context() : null;
-
-                                if (cctx != null && cntrsPerNode != null && !cctx.isLocal() && cctx.affinityNode())
-                                    cntrsPerNode.put(ctx.localNodeId(),
-                                        toCountersMap(cctx.topology().localUpdateCounters(false)));
-
-                                routine.handler().updateCounters(topVer, cntrsPerNode, cntrs);
-                            }
-
-                            fut.onRemoteRegistered();
-                        }
-                        else {
-                            IgniteCheckedException firstEx = F.first(msg.errs().values());
-
-                            fut.onDone(firstEx);
+                    if (ctx.isStopping())
+                        return;
 
-                            stopRoutine(msg.routineId());
-                        }
-                    }
+                    processStartAckRequest(topVer, msg);
                 }
             });
 
@@ -227,16 +200,10 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
                 @Override public void onCustomEvent(AffinityTopologyVersion topVer,
                     ClusterNode snd,
                     StopRoutineDiscoveryMessage msg) {
-                    if (!snd.id().equals(ctx.localNodeId())) {
-                        UUID routineId = msg.routineId();
-
-                        unregisterRemote(routineId);
-                    }
+                    if (ctx.isStopping())
+                        return;
 
-                    for (Map<UUID, LocalRoutineInfo> clientInfo : clientInfos.values()) {
-                        if (clientInfo.remove(msg.routineId()) != null)
-                            break;
-                    }
+                    processStopRequest(snd, msg);
                 }
             });
 
@@ -245,10 +212,10 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
                 @Override public void onCustomEvent(AffinityTopologyVersion topVer,
                     ClusterNode snd,
                     StopRoutineAckDiscoveryMessage msg) {
-                    StopFuture fut = stopFuts.remove(msg.routineId());
+                    if (ctx.isStopping())
+                        return;
 
-                    if (fut != null)
-                        fut.onDone();
+                    processStopAckRequest(msg);
                 }
             });
 
@@ -459,7 +426,7 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
     /** {@inheritDoc} */
     @Override public void onJoiningNodeDataReceived(JoiningNodeDiscoveryData data) {
         if (log.isDebugEnabled()) {
-            log.info("onJoiningNodeDataReceived [joining=" + data.joiningNodeId() +
+            log.debug("onJoiningNodeDataReceived [joining=" + data.joiningNodeId() +
                     ", loc=" + ctx.localNodeId() +
                     ", data=" + data.joiningNodeData() +
                     ']');
@@ -976,11 +943,82 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
     }
 
     /**
+     * @param msg Message.
+     */
+    private void processStopAckRequest(StopRoutineAckDiscoveryMessage msg) {
+        StopFuture fut = stopFuts.remove(msg.routineId());
+
+        if (fut != null)
+            fut.onDone();
+    }
+
+    /**
+     * @param snd Sender node.
+     * @param msg Message/
+     */
+    private void processStopRequest(ClusterNode snd, StopRoutineDiscoveryMessage msg) {
+        if (!snd.id().equals(ctx.localNodeId())) {
+            UUID routineId = msg.routineId();
+
+            unregisterRemote(routineId);
+        }
+
+        for (Map<UUID, LocalRoutineInfo> clientInfo : clientInfos.values()) {
+            if (clientInfo.remove(msg.routineId()) != null)
+                break;
+        }
+    }
+
+    /**
+     * @param topVer Topology version.
+     * @param msg Message.
+     */
+    private void processStartAckRequest(AffinityTopologyVersion topVer,
+        StartRoutineAckDiscoveryMessage msg) {
+        StartFuture fut = startFuts.remove(msg.routineId());
+
+        if (fut != null) {
+            if (msg.errs().isEmpty()) {
+                LocalRoutineInfo routine = locInfos.get(msg.routineId());
+
+                // Update partition counters.
+                if (routine != null && routine.handler().isQuery()) {
+                    Map<UUID, Map<Integer, T2<Long, Long>>> cntrsPerNode = msg.updateCountersPerNode();
+                    Map<Integer, T2<Long, Long>> cntrs = msg.updateCounters();
+
+                    GridCacheAdapter<Object, Object> interCache =
+                        ctx.cache().internalCache(routine.handler().cacheName());
+
+                    GridCacheContext cctx = interCache != null ? interCache.context() : null;
+
+                    if (cctx != null && cntrsPerNode != null && !cctx.isLocal() && cctx.affinityNode())
+                        cntrsPerNode.put(ctx.localNodeId(),
+                            toCountersMap(cctx.topology().localUpdateCounters(false)));
+
+                    routine.handler().updateCounters(topVer, cntrsPerNode, cntrs);
+                }
+
+                fut.onRemoteRegistered();
+            }
+            else {
+                IgniteCheckedException firstEx = F.first(msg.errs().values());
+
+                fut.onDone(firstEx);
+
+                stopRoutine(msg.routineId());
+            }
+        }
+    }
+
+    /**
      * @param node Sender.
      * @param req Start request.
      */
     private void processStartRequest(ClusterNode node, StartRoutineDiscoveryMessage req) {
         UUID routineId = req.routineId();
+        if (node.id().equals(ctx.localNodeId()))
+            return;
+
         StartRequestData data = req.startRequestData();
 
         GridContinuousHandler hnd = data.handler();