You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ag...@apache.org on 2017/11/29 10:28:50 UTC
[09/12] ignite git commit: Minor: moved custom events processing in
GridContinuousProcessor's methods.
Minor: moved custom events processing in GridContinuousProcessor's methods.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/fe806701
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/fe806701
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/fe806701
Branch: refs/heads/ignite-7016
Commit: fe806701ec42c951378d32ff931e98411260f997
Parents: 6afbc09
Author: sboikov <sb...@gridgain.com>
Authored: Wed Nov 29 11:34:23 2017 +0300
Committer: Alexey Goncharuk <al...@gmail.com>
Committed: Wed Nov 29 13:28:34 2017 +0300
----------------------------------------------------------------------
.../continuous/GridContinuousProcessor.java | 132 ++++++++++++-------
1 file changed, 85 insertions(+), 47 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/fe806701/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
index fa52be2..571d654 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
@@ -176,8 +176,10 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
@Override public void onCustomEvent(AffinityTopologyVersion topVer,
ClusterNode snd,
StartRoutineDiscoveryMessage msg) {
- if (!snd.id().equals(ctx.localNodeId()) && !ctx.isStopping())
- processStartRequest(snd, msg);
+ if (ctx.isStopping())
+ return;
+
+ processStartRequest(snd, msg);
}
});
@@ -186,39 +188,10 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
@Override public void onCustomEvent(AffinityTopologyVersion topVer,
ClusterNode snd,
StartRoutineAckDiscoveryMessage msg) {
- StartFuture fut = startFuts.remove(msg.routineId());
-
- if (fut != null) {
- if (msg.errs().isEmpty()) {
- LocalRoutineInfo routine = locInfos.get(msg.routineId());
-
- // Update partition counters.
- if (routine != null && routine.handler().isQuery()) {
- Map<UUID, Map<Integer, T2<Long, Long>>> cntrsPerNode = msg.updateCountersPerNode();
- Map<Integer, T2<Long, Long>> cntrs = msg.updateCounters();
-
- GridCacheAdapter<Object, Object> interCache =
- ctx.cache().internalCache(routine.handler().cacheName());
-
- GridCacheContext cctx = interCache != null ? interCache.context() : null;
-
- if (cctx != null && cntrsPerNode != null && !cctx.isLocal() && cctx.affinityNode())
- cntrsPerNode.put(ctx.localNodeId(),
- toCountersMap(cctx.topology().localUpdateCounters(false)));
-
- routine.handler().updateCounters(topVer, cntrsPerNode, cntrs);
- }
-
- fut.onRemoteRegistered();
- }
- else {
- IgniteCheckedException firstEx = F.first(msg.errs().values());
-
- fut.onDone(firstEx);
+ if (ctx.isStopping())
+ return;
- stopRoutine(msg.routineId());
- }
- }
+ processStartAckRequest(topVer, msg);
}
});
@@ -227,16 +200,10 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
@Override public void onCustomEvent(AffinityTopologyVersion topVer,
ClusterNode snd,
StopRoutineDiscoveryMessage msg) {
- if (!snd.id().equals(ctx.localNodeId())) {
- UUID routineId = msg.routineId();
-
- unregisterRemote(routineId);
- }
+ if (ctx.isStopping())
+ return;
- for (Map<UUID, LocalRoutineInfo> clientInfo : clientInfos.values()) {
- if (clientInfo.remove(msg.routineId()) != null)
- break;
- }
+ processStopRequest(snd, msg);
}
});
@@ -245,10 +212,10 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
@Override public void onCustomEvent(AffinityTopologyVersion topVer,
ClusterNode snd,
StopRoutineAckDiscoveryMessage msg) {
- StopFuture fut = stopFuts.remove(msg.routineId());
+ if (ctx.isStopping())
+ return;
- if (fut != null)
- fut.onDone();
+ processStopAckRequest(msg);
}
});
@@ -459,7 +426,7 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
/** {@inheritDoc} */
@Override public void onJoiningNodeDataReceived(JoiningNodeDiscoveryData data) {
if (log.isDebugEnabled()) {
- log.info("onJoiningNodeDataReceived [joining=" + data.joiningNodeId() +
+ log.debug("onJoiningNodeDataReceived [joining=" + data.joiningNodeId() +
", loc=" + ctx.localNodeId() +
", data=" + data.joiningNodeData() +
']');
@@ -976,11 +943,82 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
}
/**
+ * @param msg Message.
+ */
+ private void processStopAckRequest(StopRoutineAckDiscoveryMessage msg) {
+ StopFuture fut = stopFuts.remove(msg.routineId());
+
+ if (fut != null)
+ fut.onDone();
+ }
+
+ /**
+ * @param snd Sender node.
+ * @param msg Message/
+ */
+ private void processStopRequest(ClusterNode snd, StopRoutineDiscoveryMessage msg) {
+ if (!snd.id().equals(ctx.localNodeId())) {
+ UUID routineId = msg.routineId();
+
+ unregisterRemote(routineId);
+ }
+
+ for (Map<UUID, LocalRoutineInfo> clientInfo : clientInfos.values()) {
+ if (clientInfo.remove(msg.routineId()) != null)
+ break;
+ }
+ }
+
+ /**
+ * @param topVer Topology version.
+ * @param msg Message.
+ */
+ private void processStartAckRequest(AffinityTopologyVersion topVer,
+ StartRoutineAckDiscoveryMessage msg) {
+ StartFuture fut = startFuts.remove(msg.routineId());
+
+ if (fut != null) {
+ if (msg.errs().isEmpty()) {
+ LocalRoutineInfo routine = locInfos.get(msg.routineId());
+
+ // Update partition counters.
+ if (routine != null && routine.handler().isQuery()) {
+ Map<UUID, Map<Integer, T2<Long, Long>>> cntrsPerNode = msg.updateCountersPerNode();
+ Map<Integer, T2<Long, Long>> cntrs = msg.updateCounters();
+
+ GridCacheAdapter<Object, Object> interCache =
+ ctx.cache().internalCache(routine.handler().cacheName());
+
+ GridCacheContext cctx = interCache != null ? interCache.context() : null;
+
+ if (cctx != null && cntrsPerNode != null && !cctx.isLocal() && cctx.affinityNode())
+ cntrsPerNode.put(ctx.localNodeId(),
+ toCountersMap(cctx.topology().localUpdateCounters(false)));
+
+ routine.handler().updateCounters(topVer, cntrsPerNode, cntrs);
+ }
+
+ fut.onRemoteRegistered();
+ }
+ else {
+ IgniteCheckedException firstEx = F.first(msg.errs().values());
+
+ fut.onDone(firstEx);
+
+ stopRoutine(msg.routineId());
+ }
+ }
+ }
+
+ /**
* @param node Sender.
* @param req Start request.
*/
private void processStartRequest(ClusterNode node, StartRoutineDiscoveryMessage req) {
UUID routineId = req.routineId();
+ if (node.id().equals(ctx.localNodeId()))
+ return;
+
StartRequestData data = req.startRequestData();
GridContinuousHandler hnd = data.handler();