You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2017/08/25 08:39:46 UTC
[04/18] ignite git commit: ignite-5872 Fixed backward compatibility
ignite-5872 Fixed backward compatibility
(cherry picked from commit cca9117)
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/129be29e
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/129be29e
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/129be29e
Branch: refs/heads/ignite-6149
Commit: 129be29e96cfaba3bb7645c66981de62646f820c
Parents: fa42218
Author: sboikov <sb...@gridgain.com>
Authored: Mon Aug 21 18:39:12 2017 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Wed Aug 23 12:22:15 2017 +0300
----------------------------------------------------------------------
.../GridCachePartitionExchangeManager.java | 47 ++++++++---
.../dht/GridClientPartitionTopology.java | 9 +++
.../dht/GridDhtPartitionTopology.java | 5 ++
.../dht/GridDhtPartitionTopologyImpl.java | 5 ++
.../CachePartitionFullCountersMap.java | 36 +++++++++
.../CachePartitionPartialCountersMap.java | 23 ++++++
.../GridDhtPartitionsExchangeFuture.java | 41 +++++++---
.../preloader/GridDhtPartitionsFullMessage.java | 84 +++++++++++++++++---
.../GridDhtPartitionsSingleMessage.java | 23 ++++--
.../IgniteDhtPartitionCountersMap.java | 14 ++--
.../IgniteDhtPartitionCountersMap2.java | 69 ++++++++++++++++
11 files changed, 315 insertions(+), 41 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/129be29e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
index 200f677..984721b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
@@ -65,6 +65,8 @@ import org.apache.ignite.internal.processors.affinity.GridAffinityAssignmentCach
import org.apache.ignite.internal.processors.cache.distributed.dht.GridClientPartitionTopology;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionTopology;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTopologyFuture;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.CachePartitionFullCountersMap;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.CachePartitionPartialCountersMap;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.ForceRebalanceExchangeTask;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionDemandMessage;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionExchangeId;
@@ -973,7 +975,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
*/
private void sendAllPartitions(Collection<ClusterNode> nodes,
AffinityTopologyVersion msgTopVer) {
- GridDhtPartitionsFullMessage m = createPartitionsFullMessage(true, null, null, null, null);
+ GridDhtPartitionsFullMessage m = createPartitionsFullMessage(true, false, null, null, null, null);
m.topologyVersion(msgTopVer);
@@ -1000,6 +1002,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
/**
* @param compress {@code True} if possible to compress message (properly work only if prepareMarshall/
* finishUnmarshall methods are called).
+ * @param newCntrMap {@code True} if possible to use {@link CachePartitionFullCountersMap}.
* @param exchId Non-null exchange ID if message is created for exchange.
* @param lastVer Last version.
* @param partHistSuppliers Partition history suppliers map.
@@ -1008,6 +1011,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
*/
public GridDhtPartitionsFullMessage createPartitionsFullMessage(
boolean compress,
+ boolean newCntrMap,
@Nullable final GridDhtPartitionExchangeId exchId,
@Nullable GridCacheVersion lastVer,
@Nullable IgniteDhtPartitionHistorySuppliersMap partHistSuppliers,
@@ -1046,8 +1050,16 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
affCache.similarAffinityKey());
}
- if (exchId != null)
- m.addPartitionUpdateCounters(grp.groupId(), grp.topology().fullUpdateCounters());
+ if (exchId != null) {
+ CachePartitionFullCountersMap cntrsMap = grp.topology().fullUpdateCounters();
+
+ if (newCntrMap)
+ m.addPartitionUpdateCounters(grp.groupId(), cntrsMap);
+ else {
+ m.addPartitionUpdateCounters(grp.groupId(),
+ CachePartitionFullCountersMap.toCountersMap(cntrsMap));
+ }
+ }
}
}
@@ -1064,8 +1076,14 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
top.similarAffinityKey());
}
- if (exchId != null)
- m.addPartitionUpdateCounters(top.groupId(), top.fullUpdateCounters());
+ if (exchId != null) {
+ CachePartitionFullCountersMap cntrsMap = top.fullUpdateCounters();
+
+ if (newCntrMap)
+ m.addPartitionUpdateCounters(top.groupId(), cntrsMap);
+ else
+ m.addPartitionUpdateCounters(top.groupId(), CachePartitionFullCountersMap.toCountersMap(cntrsMap));
+ }
}
return m;
@@ -1119,6 +1137,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
GridDhtPartitionsSingleMessage m = createPartitionsSingleMessage(id,
cctx.kernalContext().clientNode(),
false,
+ false,
null);
if (log.isDebugEnabled())
@@ -1141,12 +1160,14 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
* @param exchangeId Exchange ID.
* @param clientOnlyExchange Client exchange flag.
* @param sndCounters {@code True} if need send partition update counters.
+ * @param newCntrMap {@code True} if possible to use {@link CachePartitionPartialCountersMap}.
* @return Message.
*/
public GridDhtPartitionsSingleMessage createPartitionsSingleMessage(
@Nullable GridDhtPartitionExchangeId exchangeId,
boolean clientOnlyExchange,
boolean sndCounters,
+ boolean newCntrMap,
ExchangeActions exchActions
) {
GridDhtPartitionsSingleMessage m = new GridDhtPartitionsSingleMessage(exchangeId,
@@ -1167,8 +1188,12 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
locMap,
grp.affinity().similarAffinityKey());
- if (sndCounters)
- m.partitionUpdateCounters(grp.groupId(), grp.topology().localUpdateCounters(true));
+ if (sndCounters) {
+ CachePartitionPartialCountersMap cntrsMap = grp.topology().localUpdateCounters(true);
+
+ m.addPartitionUpdateCounters(grp.groupId(),
+ newCntrMap ? cntrsMap : CachePartitionPartialCountersMap.toCountersMap(cntrsMap));
+ }
}
}
@@ -1185,8 +1210,12 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
locMap,
top.similarAffinityKey());
- if (sndCounters)
- m.partitionUpdateCounters(top.groupId(), top.localUpdateCounters(true));
+ if (sndCounters) {
+ CachePartitionPartialCountersMap cntrsMap = top.localUpdateCounters(true);
+
+ m.addPartitionUpdateCounters(top.groupId(),
+ newCntrMap ? cntrsMap : CachePartitionPartialCountersMap.toCountersMap(cntrsMap));
+ }
}
return m;
http://git-wip-us.apache.org/repos/asf/ignite/blob/129be29e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
index 77792c7..c8856fd 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
@@ -115,6 +115,9 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology {
/** */
private volatile DiscoCache discoCache;
+ /** */
+ private final int parts;
+
/**
* @param cctx Context.
* @param grpId Group ID.
@@ -130,6 +133,7 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology {
this.cctx = cctx;
this.grpId = grpId;
this.similarAffKey = similarAffKey;
+ this.parts = parts;
topVer = AffinityTopologyVersion.NONE;
@@ -142,6 +146,11 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology {
cntrMap = new CachePartitionFullCountersMap(parts);
}
+ /** {@inheritDoc} */
+ @Override public int partitions() {
+ return parts;
+ }
+
/**
* @return Key to find caches with similar affinity.
*/
http://git-wip-us.apache.org/repos/asf/ignite/blob/129be29e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java
index 22205ea..4ae68ef 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java
@@ -43,6 +43,11 @@ import org.jetbrains.annotations.Nullable;
@GridToStringExclude
public interface GridDhtPartitionTopology {
/**
+ * @return Total cache partitions.
+ */
+ public int partitions();
+
+ /**
* Locks the topology, usually during mapping on locks or transactions.
*/
public void readLock();
http://git-wip-us.apache.org/repos/asf/ignite/blob/129be29e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
index 16fe012..f25ae21 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
@@ -161,6 +161,11 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
}
/** {@inheritDoc} */
+ @Override public int partitions() {
+ return grp.affinityFunction().partitions();
+ }
+
+ /** {@inheritDoc} */
@Override public int groupId() {
return grp.groupId();
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/129be29e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/CachePartitionFullCountersMap.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/CachePartitionFullCountersMap.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/CachePartitionFullCountersMap.java
index 1384a55..ebc993c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/CachePartitionFullCountersMap.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/CachePartitionFullCountersMap.java
@@ -19,6 +19,9 @@ package org.apache.ignite.internal.processors.cache.distributed.dht.preloader;
import java.io.Serializable;
import java.util.Arrays;
+import java.util.Map;
+import org.apache.ignite.internal.util.typedef.T2;
+import org.apache.ignite.internal.util.typedef.internal.U;
/**
*
@@ -96,4 +99,37 @@ public class CachePartitionFullCountersMap implements Serializable {
Arrays.fill(initialUpdCntrs, 0);
Arrays.fill(updCntrs, 0);
}
+
+ /**
+ * @param map Full counters map.
+ * @return Regular java map with counters.
+ */
+ public static Map<Integer, T2<Long, Long>> toCountersMap(CachePartitionFullCountersMap map) {
+ int partsCnt = map.updCntrs.length;
+
+ Map<Integer, T2<Long, Long>> map0 = U.newHashMap(partsCnt);
+
+ for (int p = 0; p < partsCnt; p++)
+ map0.put(p, new T2<>(map.initialUpdCntrs[p], map.updCntrs[p]));
+
+ return map0;
+ }
+
+ /**
+ * @param map Regular java map with counters.
+ * @param partsCnt Total cache partitions.
+ * @return Full counters map.
+ */
+ static CachePartitionFullCountersMap fromCountersMap(Map<Integer, T2<Long, Long>> map, int partsCnt) {
+ CachePartitionFullCountersMap map0 = new CachePartitionFullCountersMap(partsCnt);
+
+ for (Map.Entry<Integer, T2<Long, Long>> e : map.entrySet()) {
+ T2<Long, Long> cntrs = e.getValue();
+
+ map0.initialUpdCntrs[e.getKey()] = cntrs.get1();
+ map0.updCntrs[e.getKey()] = cntrs.get2();
+ }
+
+ return map0;
+ }
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/129be29e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/CachePartitionPartialCountersMap.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/CachePartitionPartialCountersMap.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/CachePartitionPartialCountersMap.java
index 851ffed..83c0231 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/CachePartitionPartialCountersMap.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/CachePartitionPartialCountersMap.java
@@ -21,8 +21,10 @@ import java.io.Serializable;
import java.util.Arrays;
import java.util.Collections;
import java.util.Map;
+import java.util.TreeMap;
import org.apache.ignite.internal.util.typedef.T2;
import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteProductVersion;
/**
*
@@ -32,6 +34,9 @@ public class CachePartitionPartialCountersMap implements Serializable {
private static final long serialVersionUID = 0L;
/** */
+ static final IgniteProductVersion PARTIAL_COUNTERS_MAP_SINCE = IgniteProductVersion.fromString("2.1.4");
+
+ /** */
public static final CachePartitionPartialCountersMap EMPTY = new CachePartitionPartialCountersMap();
/** */
@@ -158,4 +163,22 @@ public class CachePartitionPartialCountersMap implements Serializable {
return res;
}
+
+ /**
+ * @param map Partition ID to partition counters map.
+ * @param partsCnt Total cache partitions.
+ * @return Partial local counters map.
+ */
+ static CachePartitionPartialCountersMap fromCountersMap(Map<Integer, T2<Long, Long>> map, int partsCnt) {
+ CachePartitionPartialCountersMap map0 = new CachePartitionPartialCountersMap(partsCnt);
+
+ TreeMap<Integer, T2<Long, Long>> sorted = new TreeMap<>(map);
+
+ for (Map.Entry<Integer, T2<Long, Long>> e : sorted.entrySet())
+ map0.add(e.getKey(), e.getValue().get1(), e.getValue().get2());
+
+ map0.trim();
+
+ return map0;
+ }
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/129be29e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
index ceb5abc..8e0deb9 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
@@ -92,6 +92,7 @@ import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteInClosure;
+import org.apache.ignite.lang.IgniteProductVersion;
import org.apache.ignite.lang.IgniteRunnable;
import org.jetbrains.annotations.Nullable;
import org.jsr166.ConcurrentHashMap8;
@@ -106,6 +107,7 @@ import static org.apache.ignite.events.EventType.EVT_NODE_JOINED;
import static org.apache.ignite.events.EventType.EVT_NODE_LEFT;
import static org.apache.ignite.internal.events.DiscoveryCustomEvent.EVT_DISCOVERY_CUSTOM_EVT;
import static org.apache.ignite.internal.managers.communication.GridIoPolicy.SYSTEM_POOL;
+import static org.apache.ignite.internal.processors.cache.distributed.dht.preloader.CachePartitionPartialCountersMap.PARTIAL_COUNTERS_MAP_SINCE;
/**
* Future for exchanging partition maps.
@@ -1231,6 +1233,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
msg = cctx.exchange().createPartitionsSingleMessage(exchangeId(),
false,
true,
+ node.version().compareToIgnoreTimestamp(PARTIAL_COUNTERS_MAP_SINCE) >= 0,
exchActions);
Map<Integer, Map<Integer, Long>> partHistReserved0 = partHistReserved;
@@ -1258,13 +1261,16 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
/**
* @param compress Message compress flag.
+ * @param newCntrMap {@code True} if possible to use {@link CachePartitionFullCountersMap}.
* @return Message.
*/
- private GridDhtPartitionsFullMessage createPartitionsMessage(boolean compress) {
+ private GridDhtPartitionsFullMessage createPartitionsMessage(boolean compress,
+ boolean newCntrMap) {
GridCacheVersion last = lastVer.get();
GridDhtPartitionsFullMessage m = cctx.exchange().createPartitionsFullMessage(
compress,
+ newCntrMap,
exchangeId(),
last != null ? last : cctx.versions().last(),
partHistSuppliers,
@@ -1797,9 +1803,14 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
if (finishState0 == null) {
assert firstDiscoEvt.type() == EVT_NODE_JOINED && CU.clientNode(firstDiscoEvt.eventNode()) : this;
+ ClusterNode node = cctx.node(nodeId);
+
+ if (node == null)
+ return;
+
finishState0 = new FinishState(cctx.localNodeId(),
initialVersion(),
- createPartitionsMessage(true));
+ createPartitionsMessage(true, node.version().compareToIgnoreTimestamp(PARTIAL_COUNTERS_MAP_SINCE) >= 0));
}
sendAllPartitionsToNode(finishState0, msg, nodeId);
@@ -1937,7 +1948,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
Map<Integer, Map<Integer, List<UUID>>> assignmentChange = fut.get();
- GridDhtPartitionsFullMessage m = createPartitionsMessage(false);
+ GridDhtPartitionsFullMessage m = createPartitionsMessage(false, false);
CacheAffinityChangeMessage msg = new CacheAffinityChangeMessage(exchId, m, assignmentChange);
@@ -1959,7 +1970,8 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
Map<Integer, Long> minCntrs = new HashMap<>();
for (Map.Entry<UUID, GridDhtPartitionsSingleMessage> e : msgs.entrySet()) {
- CachePartitionPartialCountersMap nodeCntrs = e.getValue().partitionUpdateCounters(top.groupId());
+ CachePartitionPartialCountersMap nodeCntrs = e.getValue().partitionUpdateCounters(top.groupId(),
+ top.partitions());
assert nodeCntrs != null;
@@ -2235,7 +2247,8 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
GridDhtPartitionTopology top = grp != null ? grp.topology() :
cctx.exchange().clientTopology(grpId);
- CachePartitionPartialCountersMap cntrs = msg.partitionUpdateCounters(grpId);
+ CachePartitionPartialCountersMap cntrs = msg.partitionUpdateCounters(grpId,
+ top.partitions());
if (cntrs != null)
top.collectUpdateCounters(cntrs);
@@ -2283,7 +2296,10 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
cctx.versions().onExchange(lastVer.get().order());
- GridDhtPartitionsFullMessage msg = createPartitionsMessage(true);
+ IgniteProductVersion minVer = exchCtx.events().discoveryCache().minimumNodeVersion();
+
+ GridDhtPartitionsFullMessage msg = createPartitionsMessage(true,
+ minVer.compareToIgnoreTimestamp(PARTIAL_COUNTERS_MAP_SINCE) >= 0);
if (exchCtx.mergeExchanges()) {
assert !centralizedAff;
@@ -2571,6 +2587,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
msg.restoreExchangeId(),
cctx.kernalContext().clientNode(),
true,
+ node.version().compareToIgnoreTimestamp(PARTIAL_COUNTERS_MAP_SINCE) >= 0,
exchActions);
if (localJoinExchange() && finishState0 == null)
@@ -2745,11 +2762,12 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
for (Map.Entry<Integer, GridDhtPartitionFullMap> entry : msg.partitions().entrySet()) {
Integer grpId = entry.getKey();
- CachePartitionFullCountersMap cntrMap = msg.partitionUpdateCounters(grpId);
-
CacheGroupContext grp = cctx.cache().cacheGroup(grpId);
if (grp != null) {
+ CachePartitionFullCountersMap cntrMap = msg.partitionUpdateCounters(grpId,
+ grp.topology().partitions());
+
grp.topology().update(resTopVer,
entry.getValue(),
cntrMap,
@@ -2760,7 +2778,12 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
ClusterNode oldest = cctx.discovery().oldestAliveCacheServerNode(AffinityTopologyVersion.NONE);
if (oldest != null && oldest.isLocal()) {
- cctx.exchange().clientTopology(grpId).update(resTopVer,
+ GridDhtPartitionTopology top = cctx.exchange().clientTopology(grpId);
+
+ CachePartitionFullCountersMap cntrMap = msg.partitionUpdateCounters(grpId,
+ top.partitions());
+
+ top.update(resTopVer,
entry.getValue(),
cntrMap,
Collections.<Integer>emptySet(),
http://git-wip-us.apache.org/repos/asf/ignite/blob/129be29e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java
index 2bb19cd..edbfc23 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java
@@ -33,6 +33,7 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartit
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
import org.apache.ignite.internal.util.tostring.GridToStringInclude;
import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.T2;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType;
@@ -68,6 +69,14 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa
/** Serialized partitions counters. */
private byte[] partCntrsBytes;
+ /** Partitions update counters. */
+ @GridToStringInclude
+ @GridDirectTransient
+ private IgniteDhtPartitionCountersMap2 partCntrs2;
+
+ /** Serialized partitions counters. */
+ private byte[] partCntrsBytes2;
+
/** Partitions history suppliers. */
@GridToStringInclude
@GridDirectTransient
@@ -149,6 +158,8 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa
cp.partsBytes = partsBytes;
cp.partCntrs = partCntrs;
cp.partCntrsBytes = partCntrsBytes;
+ cp.partCntrs2 = partCntrs2;
+ cp.partCntrsBytes2 = partCntrsBytes2;
cp.partHistSuppliers = partHistSuppliers;
cp.partHistSuppliersBytes = partHistSuppliersBytes;
cp.partsToReload = partsToReload;
@@ -275,7 +286,7 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa
* @param grpId Cache group ID.
* @param cntrMap Partition update counters.
*/
- public void addPartitionUpdateCounters(int grpId, CachePartitionFullCountersMap cntrMap) {
+ public void addPartitionUpdateCounters(int grpId, Map<Integer, T2<Long, Long>> cntrMap) {
if (partCntrs == null)
partCntrs = new IgniteDhtPartitionCountersMap();
@@ -284,10 +295,30 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa
/**
* @param grpId Cache group ID.
+ * @param cntrMap Partition update counters.
+ */
+ public void addPartitionUpdateCounters(int grpId, CachePartitionFullCountersMap cntrMap) {
+ if (partCntrs2 == null)
+ partCntrs2 = new IgniteDhtPartitionCountersMap2();
+
+ partCntrs2.putIfAbsent(grpId, cntrMap);
+ }
+
+ /**
+ * @param grpId Cache group ID.
+ * @param partsCnt Total cache partitions.
* @return Partition update counters.
*/
- public CachePartitionFullCountersMap partitionUpdateCounters(int grpId) {
- return partCntrs == null ? null : partCntrs.get(grpId);
+ public CachePartitionFullCountersMap partitionUpdateCounters(int grpId, int partsCnt) {
+ if (partCntrs2 != null)
+ return partCntrs2.get(grpId);
+
+ if (partCntrs == null)
+ return null;
+
+ Map<Integer, T2<Long, Long>> map = partCntrs.get(grpId);
+
+ return map != null ? CachePartitionFullCountersMap.fromCountersMap(map, partsCnt) : null;
}
/**
@@ -327,6 +358,7 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa
boolean marshal = (!F.isEmpty(parts) && partsBytes == null) ||
(partCntrs != null && !partCntrs.empty() && partCntrsBytes == null) ||
+ (partCntrs2 != null && !partCntrs2.empty() && partCntrsBytes2 == null) ||
(partHistSuppliers != null && partHistSuppliersBytes == null) ||
(partsToReload != null && partsToReloadBytes == null) ||
(!F.isEmpty(errs) && errsBytes == null);
@@ -334,6 +366,7 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa
if (marshal) {
byte[] partsBytes0 = null;
byte[] partCntrsBytes0 = null;
+ byte[] partCntrsBytes20 = null;
byte[] partHistSuppliersBytes0 = null;
byte[] partsToReloadBytes0 = null;
byte[] errsBytes0 = null;
@@ -344,6 +377,9 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa
if (partCntrs != null && !partCntrs.empty() && partCntrsBytes == null)
partCntrsBytes0 = U.marshal(ctx, partCntrs);
+ if (partCntrs2 != null && !partCntrs2.empty() && partCntrsBytes2 == null)
+ partCntrsBytes20 = U.marshal(ctx, partCntrs2);
+
if (partHistSuppliers != null && partHistSuppliersBytes == null)
partHistSuppliersBytes0 = U.marshal(ctx, partHistSuppliers);
@@ -359,12 +395,14 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa
try {
byte[] partsBytesZip = U.zip(partsBytes0);
byte[] partCntrsBytesZip = U.zip(partCntrsBytes0);
+ byte[] partCntrsBytes2Zip = U.zip(partCntrsBytes20);
byte[] partHistSuppliersBytesZip = U.zip(partHistSuppliersBytes0);
byte[] partsToReloadBytesZip = U.zip(partsToReloadBytes0);
byte[] exsBytesZip = U.zip(errsBytes0);
partsBytes0 = partsBytesZip;
partCntrsBytes0 = partCntrsBytesZip;
+ partCntrsBytes20 = partCntrsBytes2Zip;
partHistSuppliersBytes0 = partHistSuppliersBytesZip;
partsToReloadBytes0 = partsToReloadBytesZip;
errsBytes0 = exsBytesZip;
@@ -378,6 +416,7 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa
partsBytes = partsBytes0;
partCntrsBytes = partCntrsBytes0;
+ partCntrsBytes2 = partCntrsBytes20;
partHistSuppliersBytes = partHistSuppliersBytes0;
partsToReloadBytes = partsToReloadBytes0;
errsBytes = errsBytes0;
@@ -446,6 +485,13 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa
partCntrs = U.unmarshal(ctx, partCntrsBytes, U.resolveClassLoader(ldr, ctx.gridConfig()));
}
+ if (partCntrsBytes2 != null && partCntrs2 == null) {
+ if (compressed())
+ partCntrs2 = U.unmarshalZip(ctx.marshaller(), partCntrsBytes2, U.resolveClassLoader(ldr, ctx.gridConfig()));
+ else
+ partCntrs2 = U.unmarshal(ctx, partCntrsBytes2, U.resolveClassLoader(ldr, ctx.gridConfig()));
+ }
+
if (partHistSuppliersBytes != null && partHistSuppliers == null) {
if (compressed())
partHistSuppliers = U.unmarshalZip(ctx.marshaller(), partHistSuppliersBytes, U.resolveClassLoader(ldr, ctx.gridConfig()));
@@ -520,30 +566,36 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa
writer.incrementState();
case 10:
- if (!writer.writeByteArray("partHistSuppliersBytes", partHistSuppliersBytes))
+ if (!writer.writeByteArray("partCntrsBytes2", partCntrsBytes2))
return false;
writer.incrementState();
case 11:
- if (!writer.writeByteArray("partsBytes", partsBytes))
+ if (!writer.writeByteArray("partHistSuppliersBytes", partHistSuppliersBytes))
return false;
writer.incrementState();
case 12:
- if (!writer.writeByteArray("partsToReloadBytes", partsToReloadBytes))
+ if (!writer.writeByteArray("partsBytes", partsBytes))
return false;
writer.incrementState();
case 13:
- if (!writer.writeMessage("resTopVer", resTopVer))
+ if (!writer.writeByteArray("partsToReloadBytes", partsToReloadBytes))
return false;
writer.incrementState();
case 14:
+ if (!writer.writeMessage("resTopVer", resTopVer))
+ return false;
+
+ writer.incrementState();
+
+ case 15:
if (!writer.writeMessage("topVer", topVer))
return false;
@@ -606,7 +658,7 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa
reader.incrementState();
case 10:
- partHistSuppliersBytes = reader.readByteArray("partHistSuppliersBytes");
+ partCntrsBytes2 = reader.readByteArray("partCntrsBytes2");
if (!reader.isLastRead())
return false;
@@ -614,7 +666,7 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa
reader.incrementState();
case 11:
- partsBytes = reader.readByteArray("partsBytes");
+ partHistSuppliersBytes = reader.readByteArray("partHistSuppliersBytes");
if (!reader.isLastRead())
return false;
@@ -622,7 +674,7 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa
reader.incrementState();
case 12:
- partsToReloadBytes = reader.readByteArray("partsToReloadBytes");
+ partsBytes = reader.readByteArray("partsBytes");
if (!reader.isLastRead())
return false;
@@ -630,7 +682,7 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa
reader.incrementState();
case 13:
- resTopVer = reader.readMessage("resTopVer");
+ partsToReloadBytes = reader.readByteArray("partsToReloadBytes");
if (!reader.isLastRead())
return false;
@@ -638,6 +690,14 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa
reader.incrementState();
case 14:
+ resTopVer = reader.readMessage("resTopVer");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 15:
topVer = reader.readMessage("topVer");
if (!reader.isLastRead())
@@ -657,7 +717,7 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa
/** {@inheritDoc} */
@Override public byte fieldsCount() {
- return 15;
+ return 16;
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/ignite/blob/129be29e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java
index 44815ca..215152d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java
@@ -32,6 +32,7 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartit
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
import org.apache.ignite.internal.util.tostring.GridToStringInclude;
import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.T2;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType;
@@ -61,7 +62,7 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes
/** Partitions update counters. */
@GridToStringInclude
@GridDirectTransient
- private Map<Integer, CachePartitionPartialCountersMap> partCntrs;
+ private Map<Integer, Object> partCntrs;
/** Serialized partitions counters. */
private byte[] partCntrsBytes;
@@ -189,7 +190,7 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes
* @param grpId Cache group ID.
* @param cntrMap Partition update counters.
*/
- public void partitionUpdateCounters(int grpId, CachePartitionPartialCountersMap cntrMap) {
+ public void addPartitionUpdateCounters(int grpId, Object cntrMap) {
if (partCntrs == null)
partCntrs = new HashMap<>();
@@ -198,12 +199,24 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes
/**
* @param grpId Cache group ID.
+ * @param partsCnt Total cache partitions.
* @return Partition update counters.
*/
- public CachePartitionPartialCountersMap partitionUpdateCounters(int grpId) {
- CachePartitionPartialCountersMap res = partCntrs == null ? null : partCntrs.get(grpId);
+ @SuppressWarnings("unchecked")
+ public CachePartitionPartialCountersMap partitionUpdateCounters(int grpId, int partsCnt) {
+ Object res = partCntrs == null ? null : partCntrs.get(grpId);
- return res == null ? CachePartitionPartialCountersMap.EMPTY : res;
+ if (res == null)
+ return CachePartitionPartialCountersMap.EMPTY;
+
+ if (res instanceof CachePartitionPartialCountersMap)
+ return (CachePartitionPartialCountersMap)res;
+
+ assert res instanceof Map : res;
+
+ Map<Integer, T2<Long, Long>> map = (Map<Integer, T2<Long, Long>>)res;
+
+ return CachePartitionPartialCountersMap.fromCountersMap(map, partsCnt);
}
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/129be29e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/IgniteDhtPartitionCountersMap.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/IgniteDhtPartitionCountersMap.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/IgniteDhtPartitionCountersMap.java
index e7954d9..dc2fbf8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/IgniteDhtPartitionCountersMap.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/IgniteDhtPartitionCountersMap.java
@@ -19,8 +19,10 @@
package org.apache.ignite.internal.processors.cache.distributed.dht.preloader;
import java.io.Serializable;
+import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
+import org.apache.ignite.internal.util.typedef.T2;
/**
* Partition counters map.
@@ -30,7 +32,7 @@ public class IgniteDhtPartitionCountersMap implements Serializable {
private static final long serialVersionUID = 0L;
/** */
- private Map<Integer, CachePartitionFullCountersMap> map;
+ private Map<Integer, Map<Integer, T2<Long, Long>>> map;
/**
* @return {@code True} if map is empty.
@@ -43,7 +45,7 @@ public class IgniteDhtPartitionCountersMap implements Serializable {
* @param cacheId Cache ID.
* @param cntrMap Counters map.
*/
- public synchronized void putIfAbsent(int cacheId, CachePartitionFullCountersMap cntrMap) {
+ public synchronized void putIfAbsent(int cacheId, Map<Integer, T2<Long, Long>> cntrMap) {
if (map == null)
map = new HashMap<>();
@@ -55,14 +57,14 @@ public class IgniteDhtPartitionCountersMap implements Serializable {
* @param cacheId Cache ID.
* @return Counters map.
*/
- public synchronized CachePartitionFullCountersMap get(int cacheId) {
+ public synchronized Map<Integer, T2<Long, Long>> get(int cacheId) {
if (map == null)
- return null;
+ map = new HashMap<>();
- CachePartitionFullCountersMap cntrMap = map.get(cacheId);
+ Map<Integer, T2<Long, Long>> cntrMap = map.get(cacheId);
if (cntrMap == null)
- return null;
+ return Collections.emptyMap();
return cntrMap;
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/129be29e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/IgniteDhtPartitionCountersMap2.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/IgniteDhtPartitionCountersMap2.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/IgniteDhtPartitionCountersMap2.java
new file mode 100644
index 0000000..d1e6d99
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/IgniteDhtPartitionCountersMap2.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.dht.preloader;
+
+import java.io.Serializable;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Partition counters map.
+ */
+public class IgniteDhtPartitionCountersMap2 implements Serializable {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** */
+ private Map<Integer, CachePartitionFullCountersMap> map;
+
+ /**
+ * @return {@code True} if map is empty.
+ */
+ public synchronized boolean empty() {
+ return map == null || map.isEmpty();
+ }
+
+ /**
+ * @param cacheId Cache ID.
+ * @param cntrMap Counters map.
+ */
+ public synchronized void putIfAbsent(int cacheId, CachePartitionFullCountersMap cntrMap) {
+ if (map == null)
+ map = new HashMap<>();
+
+ if (!map.containsKey(cacheId))
+ map.put(cacheId, cntrMap);
+ }
+
+ /**
+ * @param cacheId Cache ID.
+ * @return Counters map.
+ */
+ public synchronized CachePartitionFullCountersMap get(int cacheId) {
+ if (map == null)
+ return null;
+
+ CachePartitionFullCountersMap cntrMap = map.get(cacheId);
+
+ if (cntrMap == null)
+ return null;
+
+ return cntrMap;
+ }
+}