You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2017/05/17 22:24:08 UTC
[11/15] ignite git commit: ignite-5075
ignite-5075
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/d20b76c4
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/d20b76c4
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/d20b76c4
Branch: refs/heads/ignite-5075
Commit: d20b76c43d242bb9270e606688bc3adba5e61075
Parents: 194446d
Author: sboikov <sb...@gridgain.com>
Authored: Wed May 17 21:55:17 2017 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Wed May 17 21:55:17 2017 +0300
----------------------------------------------------------------------
.../continuous/CacheContinuousQueryHandler.java | 37 +++++++++++---------
1 file changed, 21 insertions(+), 16 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/d20b76c4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
index efb02c6..2802217 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
@@ -567,10 +567,12 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
GridCacheContext<K, V> cctx = cacheContext(ctx);
if (!cctx.isLocal()) {
- cacheContext(ctx).affinity().affinityReadyFuture(initTopVer).get();
+ AffinityTopologyVersion topVer = initTopVer;
+
+ cacheContext(ctx).affinity().affinityReadyFuture(topVer).get();
for (int partId = 0; partId < cacheContext(ctx).affinity().partitions(); partId++)
- getOrCreatePartitionRecovery(ctx, partId);
+ getOrCreatePartitionRecovery(ctx, partId, topVer);
}
}
@@ -736,7 +738,7 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
Collections.<CacheEntryEvent<? extends K, ? extends V>>emptyList();
}
- PartitionRecovery rec = getOrCreatePartitionRecovery(ctx, e.partition());
+ PartitionRecovery rec = getOrCreatePartitionRecovery(ctx, e.partition(), e.topologyVersion());
return rec.collectEntries(e, cctx, cache);
}
@@ -869,37 +871,40 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
/**
* @param ctx Context.
* @param partId Partition id.
+ * @param topVer Topology version for current operation.
* @return Partition recovery.
*/
- @NotNull private PartitionRecovery getOrCreatePartitionRecovery(GridKernalContext ctx, int partId) {
+ @NotNull private PartitionRecovery getOrCreatePartitionRecovery(GridKernalContext ctx,
+ int partId,
+ AffinityTopologyVersion topVer) {
+ assert topVer != null && topVer.topologyVersion() > 0 : topVer;
+
PartitionRecovery rec = rcvs.get(partId);
if (rec == null) {
T2<Long, Long> partCntrs = null;
- AffinityTopologyVersion initTopVer0 = initTopVer;
+ Map<UUID, Map<Integer, T2<Long, Long>>> initUpdCntrsPerNode = this.initUpdCntrsPerNode;
- if (initTopVer0 != null) {
+ if (initUpdCntrsPerNode != null) {
GridCacheContext<K, V> cctx = cacheContext(ctx);
GridCacheAffinityManager aff = cctx.affinity();
- if (initUpdCntrsPerNode != null) {
- for (ClusterNode node : aff.nodesByPartition(partId, initTopVer)) {
- Map<Integer, T2<Long, Long>> map = initUpdCntrsPerNode.get(node.id());
+ for (ClusterNode node : aff.nodesByPartition(partId, topVer)) {
+ Map<Integer, T2<Long, Long>> map = initUpdCntrsPerNode.get(node.id());
- if (map != null) {
- partCntrs = map.get(partId);
+ if (map != null) {
+ partCntrs = map.get(partId);
- break;
- }
+ break;
}
}
- else if (initUpdCntrs != null)
- partCntrs = initUpdCntrs.get(partId);
}
+ else if (initUpdCntrs != null)
+ partCntrs = initUpdCntrs.get(partId);
- rec = new PartitionRecovery(ctx.log(CU.CONTINUOUS_QRY_LOG_CATEGORY), initTopVer0,
+ rec = new PartitionRecovery(ctx.log(CU.CONTINUOUS_QRY_LOG_CATEGORY), topVer,
partCntrs != null ? partCntrs.get2() : null);
PartitionRecovery oldRec = rcvs.putIfAbsent(partId, rec);