You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by av...@apache.org on 2016/02/11 16:25:14 UTC
[3/3] ignite git commit: IGNITE-2468
IGNITE-2468
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/d8062a17
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/d8062a17
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/d8062a17
Branch: refs/heads/ignite-2468
Commit: d8062a17e03b46952d154b62aade46122b0321a9
Parents: 95da173
Author: Anton Vinogradov <av...@apache.org>
Authored: Thu Feb 11 18:23:17 2016 +0300
Committer: Anton Vinogradov <av...@apache.org>
Committed: Thu Feb 11 18:23:17 2016 +0300
----------------------------------------------------------------------
.../internal/GridMessageListenHandler.java | 18 ++++++++++++--
.../continuous/GridContinuousProcessor.java | 25 +++++++++++---------
2 files changed, 30 insertions(+), 13 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/d8062a17/modules/core/src/main/java/org/apache/ignite/internal/GridMessageListenHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridMessageListenHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/GridMessageListenHandler.java
index 2e78cf5..bf81944 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/GridMessageListenHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/GridMessageListenHandler.java
@@ -83,6 +83,22 @@ public class GridMessageListenHandler implements GridContinuousHandler {
this.pred = pred;
}
+ /**
+ *
+ * @param orig Handler to be copied.
+ */
+ public GridMessageListenHandler(GridMessageListenHandler orig) {
+ assert orig != null;
+
+ this.clsName = orig.clsName;
+ this.depInfo = orig.depInfo;
+ this.pred = orig.pred;
+ this.predBytes = orig.predBytes;
+ this.topic = orig.topic;
+ this.topicBytes = orig.topicBytes;
+ this.depEnabled = false;
+ }
+
/** {@inheritDoc} */
@Override public boolean isEvents() {
return false;
@@ -179,8 +195,6 @@ public class GridMessageListenHandler implements GridContinuousHandler {
topic = ctx.config().getMarshaller().unmarshal(topicBytes, ldr);
pred = ctx.config().getMarshaller().unmarshal(predBytes, ldr);
-
- depEnabled = false;
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/ignite/blob/d8062a17/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
index 2b72239..ed6ff5b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
@@ -40,6 +40,7 @@ import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.events.DiscoveryEvent;
import org.apache.ignite.events.Event;
import org.apache.ignite.internal.GridKernalContext;
+import org.apache.ignite.internal.GridMessageListenHandler;
import org.apache.ignite.internal.IgniteClientDisconnectedCheckedException;
import org.apache.ignite.internal.IgniteDeploymentCheckedException;
import org.apache.ignite.internal.IgniteInternalFuture;
@@ -860,6 +861,10 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
U.error(log, "Failed to register handler [nodeId=" + node.id() + ", routineId=" + routineId + ']', e);
}
+ GridContinuousHandler hnd0 = hnd instanceof GridMessageListenHandler ?
+ new GridMessageListenHandler((GridMessageListenHandler)hnd) :
+ hnd;
+
if (node.isClient()) {
Map<UUID, LocalRoutineInfo> clientRoutineMap = clientInfos.get(node.id());
@@ -872,7 +877,7 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
}
clientRoutineMap.put(routineId, new LocalRoutineInfo(data.projectionPredicate(),
- hnd,
+ hnd0,
data.bufferSize(),
data.interval(),
data.autoUnsubscribe()));
@@ -889,14 +894,14 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
if ((prjPred == null || prjPred.apply(ctx.discovery().node(ctx.localNodeId()))) &&
!locInfos.containsKey(routineId)) {
- registered = registerHandler(node.id(), routineId, hnd, data.bufferSize(), data.interval(),
+ registered = registerHandler(node.id(), routineId, hnd0, data.bufferSize(), data.interval(),
data.autoUnsubscribe(), false);
}
if (!data.autoUnsubscribe())
// Register routine locally.
locInfos.putIfAbsent(routineId, new LocalRoutineInfo(
- prjPred, hnd, data.bufferSize(), data.interval(), data.autoUnsubscribe()));
+ prjPred, hnd0, data.bufferSize(), data.interval(), data.autoUnsubscribe()));
}
catch (IgniteCheckedException e) {
err = e;
@@ -906,11 +911,11 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
}
// Load partition counters.
- if (hnd.isQuery()) {
+ if (hnd0.isQuery()) {
GridCacheProcessor proc = ctx.cache();
if (proc != null) {
- GridCacheAdapter cache = ctx.cache().internalCache(hnd.cacheName());
+ GridCacheAdapter cache = ctx.cache().internalCache(hnd0.cacheName());
if (cache != null && !cache.isLocal()) {
Map<Integer, Long> cntrs = cache.context().topology().updateCounters();
@@ -924,7 +929,7 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
req.addError(ctx.localNodeId(), err);
if (registered)
- hnd.onListenerRegistered(routineId, ctx);
+ hnd0.onListenerRegistered(routineId, ctx);
}
/**
@@ -1115,9 +1120,7 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
try {
remote = rmtInfos.remove(routineId);
- if (remote == null)
- // Removes routine at node started it when stopRoutine called from another node.
- loc = locInfos.remove(routineId);
+ loc = locInfos.remove(routineId);
if (remote == null)
stopped.add(routineId);
@@ -1128,8 +1131,8 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
if (remote != null)
unregisterHandler(routineId, remote.hnd, false);
-
- if (loc != null)
+ else
+ // Removes routine at node started it when stopRoutine called from another node.
unregisterHandler(routineId, loc.hnd, false);
}