You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2017/05/17 22:24:08 UTC

[11/15] ignite git commit: ignite-5075

ignite-5075


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/d20b76c4
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/d20b76c4
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/d20b76c4

Branch: refs/heads/ignite-5075
Commit: d20b76c43d242bb9270e606688bc3adba5e61075
Parents: 194446d
Author: sboikov <sb...@gridgain.com>
Authored: Wed May 17 21:55:17 2017 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Wed May 17 21:55:17 2017 +0300

----------------------------------------------------------------------
 .../continuous/CacheContinuousQueryHandler.java | 37 +++++++++++---------
 1 file changed, 21 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/d20b76c4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
index efb02c6..2802217 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
@@ -567,10 +567,12 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
         GridCacheContext<K, V> cctx = cacheContext(ctx);
 
         if (!cctx.isLocal()) {
-            cacheContext(ctx).affinity().affinityReadyFuture(initTopVer).get();
+            AffinityTopologyVersion topVer = initTopVer;
+
+            cacheContext(ctx).affinity().affinityReadyFuture(topVer).get();
 
             for (int partId = 0; partId < cacheContext(ctx).affinity().partitions(); partId++)
-                getOrCreatePartitionRecovery(ctx, partId);
+                getOrCreatePartitionRecovery(ctx, partId, topVer);
         }
     }
 
@@ -736,7 +738,7 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
                 Collections.<CacheEntryEvent<? extends K, ? extends V>>emptyList();
         }
 
-        PartitionRecovery rec = getOrCreatePartitionRecovery(ctx, e.partition());
+        PartitionRecovery rec = getOrCreatePartitionRecovery(ctx, e.partition(), e.topologyVersion());
 
         return rec.collectEntries(e, cctx, cache);
     }
@@ -869,37 +871,40 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
     /**
      * @param ctx Context.
      * @param partId Partition id.
+     * @param topVer Topology version for current operation.
      * @return Partition recovery.
      */
-    @NotNull private PartitionRecovery getOrCreatePartitionRecovery(GridKernalContext ctx, int partId) {
+    @NotNull private PartitionRecovery getOrCreatePartitionRecovery(GridKernalContext ctx,
+        int partId,
+        AffinityTopologyVersion topVer) {
+        assert topVer != null && topVer.topologyVersion() > 0 : topVer;
+
         PartitionRecovery rec = rcvs.get(partId);
 
         if (rec == null) {
             T2<Long, Long> partCntrs = null;
 
-            AffinityTopologyVersion initTopVer0 = initTopVer;
+            Map<UUID, Map<Integer, T2<Long, Long>>> initUpdCntrsPerNode = this.initUpdCntrsPerNode;
 
-            if (initTopVer0 != null) {
+            if (initUpdCntrsPerNode != null) {
                 GridCacheContext<K, V> cctx = cacheContext(ctx);
 
                 GridCacheAffinityManager aff = cctx.affinity();
 
-                if (initUpdCntrsPerNode != null) {
-                    for (ClusterNode node : aff.nodesByPartition(partId, initTopVer)) {
-                        Map<Integer, T2<Long, Long>> map = initUpdCntrsPerNode.get(node.id());
+                for (ClusterNode node : aff.nodesByPartition(partId, topVer)) {
+                    Map<Integer, T2<Long, Long>> map = initUpdCntrsPerNode.get(node.id());
 
-                        if (map != null) {
-                            partCntrs = map.get(partId);
+                    if (map != null) {
+                        partCntrs = map.get(partId);
 
-                            break;
-                        }
+                        break;
                     }
                 }
-                else if (initUpdCntrs != null)
-                    partCntrs = initUpdCntrs.get(partId);
             }
+            else if (initUpdCntrs != null)
+                partCntrs = initUpdCntrs.get(partId);
 
-            rec = new PartitionRecovery(ctx.log(CU.CONTINUOUS_QRY_LOG_CATEGORY), initTopVer0,
+            rec = new PartitionRecovery(ctx.log(CU.CONTINUOUS_QRY_LOG_CATEGORY), topVer,
                 partCntrs != null ? partCntrs.get2() : null);
 
             PartitionRecovery oldRec = rcvs.putIfAbsent(partId, rec);