You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2016/02/26 13:12:39 UTC
[11/17] ignite git commit: Continuous query compatibility fix (topVer
can be null for old CacheContinuousQueryEntry).
Continuous query compatibility fix (topVer can be null for old CacheContinuousQueryEntry).
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/dee61900
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/dee61900
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/dee61900
Branch: refs/heads/ignite-1232
Commit: dee61900c26b1f2a0a84d5e400001fecad545ada
Parents: 10214cc
Author: sboikov <sb...@gridgain.com>
Authored: Thu Feb 25 12:54:11 2016 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Thu Feb 25 12:54:11 2016 +0300
----------------------------------------------------------------------
.../continuous/CacheContinuousQueryHandler.java | 43 +++++++++++---------
1 file changed, 23 insertions(+), 20 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/dee61900/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
index 4397f69..1938edb 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
@@ -737,6 +737,12 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
public Collection<CacheContinuousQueryEntry> collectEntries(CacheContinuousQueryEntry entry) {
assert entry != null;
+ if (entry.topologyVersion() == null) { // Possible if entry is sent from old node.
+ assert entry.updateCounter() == 0L : entry;
+
+ return F.asList(entry);
+ }
+
List<CacheContinuousQueryEntry> entries;
synchronized (pendingEvts) {
@@ -991,28 +997,25 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
routineId,
t.get1());
- Collection<ClusterNode> nodes = new HashSet<>();
-
- for (AffinityTopologyVersion topVer : t.get2())
- nodes.addAll(ctx.discovery().cacheAffinityNodes(cctx.name(), topVer));
-
- for (ClusterNode node : nodes) {
- if (!node.isLocal() && node.version().compareTo(CacheContinuousQueryBatchAck.SINCE_VER) >= 0) {
- try {
- cctx.io().send(node, msg, GridIoPolicy.SYSTEM_POOL);
- }
- catch (ClusterTopologyCheckedException e) {
- IgniteLogger log = ctx.log(getClass());
+ for (AffinityTopologyVersion topVer : t.get2()) {
+ for (ClusterNode node : ctx.discovery().cacheAffinityNodes(cctx.name(), topVer)) {
+ if (!node.isLocal() && node.version().compareTo(CacheContinuousQueryBatchAck.SINCE_VER) >= 0) {
+ try {
+ cctx.io().send(node, msg, GridIoPolicy.SYSTEM_POOL);
+ }
+ catch (ClusterTopologyCheckedException e) {
+ IgniteLogger log = ctx.log(getClass());
- if (log.isDebugEnabled())
- log.debug("Failed to send acknowledge message, node left " +
- "[msg=" + msg + ", node=" + node + ']');
- }
- catch (IgniteCheckedException e) {
- IgniteLogger log = ctx.log(getClass());
+ if (log.isDebugEnabled())
+ log.debug("Failed to send acknowledge message, node left " +
+ "[msg=" + msg + ", node=" + node + ']');
+ }
+ catch (IgniteCheckedException e) {
+ IgniteLogger log = ctx.log(getClass());
- U.error(log, "Failed to send acknowledge message " +
- "[msg=" + msg + ", node=" + node + ']', e);
+ U.error(log, "Failed to send acknowledge message " +
+ "[msg=" + msg + ", node=" + node + ']', e);
+ }
}
}
}