You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by av...@apache.org on 2016/02/11 16:25:14 UTC

[3/3] ignite git commit: IGNITE-2468

IGNITE-2468


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/d8062a17
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/d8062a17
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/d8062a17

Branch: refs/heads/ignite-2468
Commit: d8062a17e03b46952d154b62aade46122b0321a9
Parents: 95da173
Author: Anton Vinogradov <av...@apache.org>
Authored: Thu Feb 11 18:23:17 2016 +0300
Committer: Anton Vinogradov <av...@apache.org>
Committed: Thu Feb 11 18:23:17 2016 +0300

----------------------------------------------------------------------
 .../internal/GridMessageListenHandler.java      | 18 ++++++++++++--
 .../continuous/GridContinuousProcessor.java     | 25 +++++++++++---------
 2 files changed, 30 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/d8062a17/modules/core/src/main/java/org/apache/ignite/internal/GridMessageListenHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridMessageListenHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/GridMessageListenHandler.java
index 2e78cf5..bf81944 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/GridMessageListenHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/GridMessageListenHandler.java
@@ -83,6 +83,22 @@ public class GridMessageListenHandler implements GridContinuousHandler {
         this.pred = pred;
     }
 
+    /**
+     *
+     * @param orig Handler to be copied.
+     */
+    public GridMessageListenHandler(GridMessageListenHandler orig) {
+        assert orig != null;
+
+        this.clsName = orig.clsName;
+        this.depInfo = orig.depInfo;
+        this.pred = orig.pred;
+        this.predBytes = orig.predBytes;
+        this.topic = orig.topic;
+        this.topicBytes = orig.topicBytes;
+        this.depEnabled = false;
+    }
+
     /** {@inheritDoc} */
     @Override public boolean isEvents() {
         return false;
@@ -179,8 +195,6 @@ public class GridMessageListenHandler implements GridContinuousHandler {
             topic = ctx.config().getMarshaller().unmarshal(topicBytes, ldr);
 
         pred = ctx.config().getMarshaller().unmarshal(predBytes, ldr);
-
-        depEnabled = false;
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/d8062a17/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
index 2b72239..ed6ff5b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
@@ -40,6 +40,7 @@ import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.events.DiscoveryEvent;
 import org.apache.ignite.events.Event;
 import org.apache.ignite.internal.GridKernalContext;
+import org.apache.ignite.internal.GridMessageListenHandler;
 import org.apache.ignite.internal.IgniteClientDisconnectedCheckedException;
 import org.apache.ignite.internal.IgniteDeploymentCheckedException;
 import org.apache.ignite.internal.IgniteInternalFuture;
@@ -860,6 +861,10 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
             U.error(log, "Failed to register handler [nodeId=" + node.id() + ", routineId=" + routineId + ']', e);
         }
 
+        GridContinuousHandler hnd0 = hnd instanceof GridMessageListenHandler ?
+            new GridMessageListenHandler((GridMessageListenHandler)hnd) :
+            hnd;
+
         if (node.isClient()) {
             Map<UUID, LocalRoutineInfo> clientRoutineMap = clientInfos.get(node.id());
 
@@ -872,7 +877,7 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
             }
 
             clientRoutineMap.put(routineId, new LocalRoutineInfo(data.projectionPredicate(),
-                hnd,
+                hnd0,
                 data.bufferSize(),
                 data.interval(),
                 data.autoUnsubscribe()));
@@ -889,14 +894,14 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
 
                 if ((prjPred == null || prjPred.apply(ctx.discovery().node(ctx.localNodeId()))) &&
                     !locInfos.containsKey(routineId)) {
-                    registered = registerHandler(node.id(), routineId, hnd, data.bufferSize(), data.interval(),
+                    registered = registerHandler(node.id(), routineId, hnd0, data.bufferSize(), data.interval(),
                         data.autoUnsubscribe(), false);
                 }
 
                 if (!data.autoUnsubscribe())
                     // Register routine locally.
                     locInfos.putIfAbsent(routineId, new LocalRoutineInfo(
-                        prjPred, hnd, data.bufferSize(), data.interval(), data.autoUnsubscribe()));
+                        prjPred, hnd0, data.bufferSize(), data.interval(), data.autoUnsubscribe()));
             }
             catch (IgniteCheckedException e) {
                 err = e;
@@ -906,11 +911,11 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
         }
 
         // Load partition counters.
-        if (hnd.isQuery()) {
+        if (hnd0.isQuery()) {
             GridCacheProcessor proc = ctx.cache();
 
             if (proc != null) {
-                GridCacheAdapter cache = ctx.cache().internalCache(hnd.cacheName());
+                GridCacheAdapter cache = ctx.cache().internalCache(hnd0.cacheName());
 
                 if (cache != null && !cache.isLocal()) {
                     Map<Integer, Long> cntrs = cache.context().topology().updateCounters();
@@ -924,7 +929,7 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
             req.addError(ctx.localNodeId(), err);
 
         if (registered)
-            hnd.onListenerRegistered(routineId, ctx);
+            hnd0.onListenerRegistered(routineId, ctx);
     }
 
     /**
@@ -1115,9 +1120,7 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
         try {
             remote = rmtInfos.remove(routineId);
 
-            if (remote == null)
-                // Removes routine at node started it when stopRoutine called from another node.
-                loc = locInfos.remove(routineId);
+            loc = locInfos.remove(routineId);
 
             if (remote == null)
                 stopped.add(routineId);
@@ -1128,8 +1131,8 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
 
         if (remote != null)
             unregisterHandler(routineId, remote.hnd, false);
-
-        if (loc != null)
+        else
+            // Removes routine at node started it when stopRoutine called from another node.
             unregisterHandler(routineId, loc.hnd, false);
     }