You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2016/06/10 06:54:06 UTC

ignite git commit: ignite-3038

Repository: ignite
Updated Branches:
  refs/heads/ignite-3038 645adfdc5 -> c40392cc7


ignite-3038


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/c40392cc
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/c40392cc
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/c40392cc

Branch: refs/heads/ignite-3038
Commit: c40392cc7954f4a148883b8843bd98f4a5296091
Parents: 645adfd
Author: sboikov <sb...@gridgain.com>
Authored: Fri Jun 10 09:53:59 2016 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Fri Jun 10 09:53:59 2016 +0300

----------------------------------------------------------------------
 .../continuous/GridContinuousProcessor.java     | 21 ++++++++++++++++++--
 1 file changed, 19 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/c40392cc/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
index 74cadd9..e96e646 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
@@ -257,9 +257,11 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
                 @Override public void onCustomEvent(AffinityTopologyVersion topVer,
                     ClusterNode snd,
                     StopRoutineDiscoveryMessage msg) {
-                    UUID routineId = msg.routineId();
+                    if (!snd.id().equals(ctx.localNodeId())) {
+                        UUID routineId = msg.routineId();
 
-                    unregisterRemote(routineId);
+                        unregisterRemote(routineId);
+                    }
 
                     for (Map<UUID, LocalRoutineInfo> clientInfo : clientInfos.values()) {
                         if (clientInfo.remove(msg.routineId()) != null)
@@ -768,6 +770,21 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
         }
 
         if (doStop) {
+            // Unregister routine locally.
+            LocalRoutineInfo routine = locInfos.remove(routineId);
+
+            // Finish if routine is not found (wrong ID is provided).
+            if (routine == null) {
+                stopFuts.remove(routineId);
+
+                fut.onDone();
+
+                return fut;
+            }
+
+            // Unregister handler locally.
+            unregisterHandler(routineId, routine.hnd, true);
+
             try {
                 ctx.discovery().sendCustomEvent(new StopRoutineDiscoveryMessage(routineId));
             }