You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2016/02/26 13:12:39 UTC

[11/17] ignite git commit: Continuous query compatibility fix (topVer can be null for old CacheContinuousQueryEntry).

Continuous query compatibility fix (topVer can be null for old CacheContinuousQueryEntry).


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/dee61900
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/dee61900
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/dee61900

Branch: refs/heads/ignite-1232
Commit: dee61900c26b1f2a0a84d5e400001fecad545ada
Parents: 10214cc
Author: sboikov <sb...@gridgain.com>
Authored: Thu Feb 25 12:54:11 2016 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Thu Feb 25 12:54:11 2016 +0300

----------------------------------------------------------------------
 .../continuous/CacheContinuousQueryHandler.java | 43 +++++++++++---------
 1 file changed, 23 insertions(+), 20 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/dee61900/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
index 4397f69..1938edb 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
@@ -737,6 +737,12 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
         public Collection<CacheContinuousQueryEntry> collectEntries(CacheContinuousQueryEntry entry) {
             assert entry != null;
 
+            if (entry.topologyVersion() == null) { // Possible if entry is sent from old node.
+                assert entry.updateCounter() == 0L : entry;
+
+                return F.asList(entry);
+            }
+
             List<CacheContinuousQueryEntry> entries;
 
             synchronized (pendingEvts) {
@@ -991,28 +997,25 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
                         routineId,
                         t.get1());
 
-                    Collection<ClusterNode> nodes = new HashSet<>();
-
-                    for (AffinityTopologyVersion topVer : t.get2())
-                        nodes.addAll(ctx.discovery().cacheAffinityNodes(cctx.name(), topVer));
-
-                    for (ClusterNode node : nodes) {
-                        if (!node.isLocal() && node.version().compareTo(CacheContinuousQueryBatchAck.SINCE_VER) >= 0) {
-                            try {
-                                cctx.io().send(node, msg, GridIoPolicy.SYSTEM_POOL);
-                            }
-                            catch (ClusterTopologyCheckedException e) {
-                                IgniteLogger log = ctx.log(getClass());
+                    for (AffinityTopologyVersion topVer : t.get2()) {
+                        for (ClusterNode node : ctx.discovery().cacheAffinityNodes(cctx.name(), topVer)) {
+                            if (!node.isLocal() && node.version().compareTo(CacheContinuousQueryBatchAck.SINCE_VER) >= 0) {
+                                try {
+                                    cctx.io().send(node, msg, GridIoPolicy.SYSTEM_POOL);
+                                }
+                                catch (ClusterTopologyCheckedException e) {
+                                    IgniteLogger log = ctx.log(getClass());
 
-                                if (log.isDebugEnabled())
-                                    log.debug("Failed to send acknowledge message, node left " +
-                                        "[msg=" + msg + ", node=" + node + ']');
-                            }
-                            catch (IgniteCheckedException e) {
-                                IgniteLogger log = ctx.log(getClass());
+                                    if (log.isDebugEnabled())
+                                        log.debug("Failed to send acknowledge message, node left " +
+                                            "[msg=" + msg + ", node=" + node + ']');
+                                }
+                                catch (IgniteCheckedException e) {
+                                    IgniteLogger log = ctx.log(getClass());
 
-                                U.error(log, "Failed to send acknowledge message " +
-                                    "[msg=" + msg + ", node=" + node + ']', e);
+                                    U.error(log, "Failed to send acknowledge message " +
+                                        "[msg=" + msg + ", node=" + node + ']', e);
+                                }
                             }
                         }
                     }