You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ag...@apache.org on 2017/08/15 16:05:27 UTC
[21/21] ignite git commit: IGNITE-5872 - Backward compatibility
IGNITE-5872 - Backward compatibility
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/dbcae598
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/dbcae598
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/dbcae598
Branch: refs/heads/ignite-5872
Commit: dbcae598db90135f0a50a649fd4ee56408589596
Parents: 4834b87
Author: Alexey Goncharuk <al...@gmail.com>
Authored: Tue Aug 15 19:04:41 2017 +0300
Committer: Alexey Goncharuk <al...@gmail.com>
Committed: Tue Aug 15 19:04:41 2017 +0300
----------------------------------------------------------------------
.../internal/managers/discovery/DiscoCache.java | 23 ++++++++++++++----
.../discovery/GridDiscoveryManager.java | 16 +++++++++++--
.../GridCachePartitionExchangeManager.java | 25 +++++++++++++++-----
.../CachePartitionFullCountersMap.java | 7 ++++++
.../GridDhtPartitionsExchangeFuture.java | 14 +++++++----
.../preloader/GridDhtPartitionsFullMessage.java | 15 +++++++++++-
.../IgniteDhtPartitionCountersMap.java | 24 +++++++++++++++++++
.../IgniteDhtPartitionCountersPrimitiveMap.java | 23 ++++++++++++++++++
8 files changed, 130 insertions(+), 17 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/dbcae598/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoCache.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoCache.java
index 4c1077b..4a0185c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoCache.java
@@ -32,6 +32,7 @@ import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.P1;
import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.lang.IgniteProductVersion;
import org.jetbrains.annotations.Nullable;
/**
@@ -81,6 +82,9 @@ public class DiscoCache {
/** Alive nodes. */
private final Set<UUID> alives = new GridConcurrentHashSet<>();
+ /** */
+ private IgniteProductVersion minRmtProdVer;
+
/**
* @param state Current cluster state.
* @param loc Local node.
@@ -95,6 +99,7 @@ public class DiscoCache {
* @param cacheGrpAffNodes Affinity nodes by cache group ID.
* @param nodeMap Node map.
* @param alives Alive nodes.
+ * @param minRmtProdVer Minimal product version seen for remote nodes.
*/
DiscoCache(
DiscoveryDataClusterState state,
@@ -109,7 +114,9 @@ public class DiscoCache {
Map<Integer, List<ClusterNode>> allCacheNodes,
Map<Integer, List<ClusterNode>> cacheGrpAffNodes,
Map<UUID, ClusterNode> nodeMap,
- Set<UUID> alives) {
+ Set<UUID> alives,
+ IgniteProductVersion minRmtProdVer
+ ) {
this.state = state;
this.loc = loc;
this.rmtNodes = rmtNodes;
@@ -123,6 +130,7 @@ public class DiscoCache {
this.cacheGrpAffNodes = cacheGrpAffNodes;
this.nodeMap = nodeMap;
this.alives.addAll(alives);
+ this.minRmtProdVer = minRmtProdVer;
}
/**
@@ -209,7 +217,7 @@ public class DiscoCache {
/**
* @return Oldest alive server node.
*/
- public @Nullable ClusterNode oldestAliveServerNode(){
+ @Nullable public ClusterNode oldestAliveServerNode(){
Iterator<ClusterNode> it = aliveServerNodes().iterator();
return it.hasNext() ? it.next() : null;
}
@@ -217,7 +225,7 @@ public class DiscoCache {
/**
* @return Oldest alive server node with at least one cache configured.
*/
- public @Nullable ClusterNode oldestAliveServerNodeWithCache(){
+ @Nullable public ClusterNode oldestAliveServerNodeWithCache(){
Iterator<ClusterNode> it = aliveServerNodesWithCaches().iterator();
return it.hasNext() ? it.next() : null;
}
@@ -254,7 +262,7 @@ public class DiscoCache {
* @param id Node ID.
* @return Node.
*/
- public @Nullable ClusterNode node(UUID id) {
+ @Nullable public ClusterNode node(UUID id) {
return nodeMap.get(id);
}
@@ -280,6 +288,13 @@ public class DiscoCache {
}
/**
+ * @return Minumim seen remote nodes version.
+ */
+ public IgniteProductVersion minimumRemoteNodesVersion() {
+ return minRmtProdVer;
+ }
+
+ /**
* @param nodes Cluster nodes.
* @return Empty collection if nodes list is {@code null}
*/
http://git-wip-us.apache.org/repos/asf/ignite/blob/dbcae598/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
index 834ba4d..d7a9b4f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
@@ -102,6 +102,7 @@ import org.apache.ignite.internal.util.worker.GridWorker;
import org.apache.ignite.lang.IgniteFuture;
import org.apache.ignite.lang.IgniteInClosure;
import org.apache.ignite.lang.IgnitePredicate;
+import org.apache.ignite.lang.IgniteProductVersion;
import org.apache.ignite.lang.IgniteUuid;
import org.apache.ignite.plugin.security.SecurityCredentials;
import org.apache.ignite.plugin.segmentation.SegmentationPolicy;
@@ -2165,6 +2166,8 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
ArrayList<ClusterNode> rmtNodes = new ArrayList<>(topSnapshot.size());
ArrayList<ClusterNode> allNodes = new ArrayList<>(topSnapshot.size());
+ IgniteProductVersion minProdVer = null;
+
for (ClusterNode node : topSnapshot) {
if (alive(node))
alives.add(node.id());
@@ -2174,9 +2177,17 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
else {
allNodes.add(node);
- if (!node.isLocal())
+ if (!node.isLocal()) {
rmtNodes.add(node);
+ if (minProdVer == null)
+ minProdVer = node.version();
+ else {
+ if (minProdVer.compareTo(node.version()) > 0)
+ minProdVer = node.version();
+ }
+ }
+
if (!CU.clientNode(node))
srvNodes.add(node);
}
@@ -2243,7 +2254,8 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
Collections.unmodifiableMap(allCacheNodes),
Collections.unmodifiableMap(cacheGrpAffNodes),
Collections.unmodifiableMap(nodeMap),
- alives);
+ alives,
+ minProdVer);
}
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/dbcae598/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
index 2880860..4001922 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
@@ -102,6 +102,7 @@ import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.internal.util.worker.GridWorker;
import org.apache.ignite.lang.IgniteBiInClosure;
+import org.apache.ignite.lang.IgniteProductVersion;
import org.apache.ignite.lang.IgniteUuid;
import org.apache.ignite.thread.IgniteThread;
import org.jetbrains.annotations.Nullable;
@@ -118,6 +119,7 @@ import static org.apache.ignite.events.EventType.EVT_NODE_LEFT;
import static org.apache.ignite.internal.GridTopic.TOPIC_CACHE;
import static org.apache.ignite.internal.events.DiscoveryCustomEvent.EVT_DISCOVERY_CUSTOM_EVT;
import static org.apache.ignite.internal.managers.communication.GridIoPolicy.SYSTEM_POOL;
+import static org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture.PRIMITIVE_UPD_CNTRS_SINCE;
import static org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture.nextDumpTimeout;
import static org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPreloader.DFLT_PRELOAD_RESEND_TIMEOUT;
@@ -929,9 +931,19 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
* @param nodes Nodes.
* @param msgTopVer Topology version. Will be added to full message.
*/
- private void sendAllPartitions(Collection<ClusterNode> nodes,
- AffinityTopologyVersion msgTopVer) {
- GridDhtPartitionsFullMessage m = createPartitionsFullMessage(true, null, null, null, null);
+ private void sendAllPartitions(
+ Collection<ClusterNode> nodes,
+ AffinityTopologyVersion msgTopVer
+ ) {
+ IgniteProductVersion minVer = cctx.discovery().discoCache(msgTopVer).minimumRemoteNodesVersion();
+
+ GridDhtPartitionsFullMessage m = createPartitionsFullMessage(
+ true,
+ minVer.compareTo(PRIMITIVE_UPD_CNTRS_SINCE) < 0,
+ null,
+ null,
+ null,
+ null);
m.topologyVersion(msgTopVer);
@@ -966,6 +978,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
*/
public GridDhtPartitionsFullMessage createPartitionsFullMessage(
boolean compress,
+ boolean compatibility,
@Nullable final GridDhtPartitionExchangeId exchId,
@Nullable GridCacheVersion lastVer,
@Nullable IgniteDhtPartitionHistorySuppliersMap partHistSuppliers,
@@ -975,8 +988,8 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
lastVer,
exchId != null ? exchId.topologyVersion() : AffinityTopologyVersion.NONE,
partHistSuppliers,
- partsToReload
- );
+ partsToReload,
+ compatibility);
m.compress(compress);
@@ -1110,7 +1123,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
boolean sndCounters,
ExchangeActions exchActions
) {
- boolean compat = targetNode.version().compareTo(GridDhtPartitionsExchangeFuture.PRIMITIVE_UPD_CNTRS_SINCE) < 0;
+ boolean compat = targetNode.version().compareTo(PRIMITIVE_UPD_CNTRS_SINCE) < 0;
GridDhtPartitionsSingleMessage m = new GridDhtPartitionsSingleMessage(exchangeId,
clientOnlyExchange,
http://git-wip-us.apache.org/repos/asf/ignite/blob/dbcae598/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/CachePartitionFullCountersMap.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/CachePartitionFullCountersMap.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/CachePartitionFullCountersMap.java
index 1384a55..fde0599 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/CachePartitionFullCountersMap.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/CachePartitionFullCountersMap.java
@@ -50,6 +50,13 @@ public class CachePartitionFullCountersMap implements Serializable {
}
/**
+ * @return Total number of partitions.
+ */
+ public int size() {
+ return initialUpdCntrs.length;
+ }
+
+ /**
* Gets an initial update counter by the partition ID.
*
* @param p Partition ID.
http://git-wip-us.apache.org/repos/asf/ignite/blob/dbcae598/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
index e8da0b9..d6183fc 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
@@ -118,7 +118,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
IgniteSystemProperties.getInteger(IGNITE_PARTITION_RELEASE_FUTURE_DUMP_THRESHOLD, 0);
/** */
- public static IgniteProductVersion PRIMITIVE_UPD_CNTRS_SINCE = IgniteProductVersion.fromString("8.1.4");
+ public static final IgniteProductVersion PRIMITIVE_UPD_CNTRS_SINCE = IgniteProductVersion.fromString("8.1.4");
/** */
@GridToStringExclude
@@ -1167,11 +1167,12 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
* @param compress Message compress flag.
* @return Message.
*/
- private GridDhtPartitionsFullMessage createPartitionsMessage(boolean compress) {
+ private GridDhtPartitionsFullMessage createPartitionsMessage(boolean compress, boolean compatibility) {
GridCacheVersion last = lastVer.get();
GridDhtPartitionsFullMessage m = cctx.exchange().createPartitionsFullMessage(
compress,
+ compatibility,
exchangeId(),
last != null ? last : cctx.versions().last(),
partHistSuppliers,
@@ -1188,7 +1189,9 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
* @throws IgniteCheckedException If failed.
*/
private void sendAllPartitions(Collection<ClusterNode> nodes) throws IgniteCheckedException {
- GridDhtPartitionsFullMessage m = createPartitionsMessage(true);
+ IgniteProductVersion minVer = cctx.discovery().discoCache(topologyVersion()).minimumRemoteNodesVersion();
+
+ GridDhtPartitionsFullMessage m = createPartitionsMessage(true, minVer.compareTo(PRIMITIVE_UPD_CNTRS_SINCE) < 0);
assert !nodes.contains(cctx.localNode());
@@ -1510,7 +1513,10 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
Map<Integer, Map<Integer, List<UUID>>> assignmentChange = fut.get();
- GridDhtPartitionsFullMessage m = createPartitionsMessage(false);
+ IgniteProductVersion minVer = cctx.discovery().discoCache(topologyVersion()).minimumRemoteNodesVersion();
+
+ GridDhtPartitionsFullMessage m = createPartitionsMessage(false,
+ minVer.compareTo(PRIMITIVE_UPD_CNTRS_SINCE) < 0);
CacheAffinityChangeMessage msg = new CacheAffinityChangeMessage(exchId, m, assignmentChange);
http://git-wip-us.apache.org/repos/asf/ignite/blob/dbcae598/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java
index 4e04f19..8d259ab 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java
@@ -260,7 +260,7 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa
partsBytes0 = U.marshal(ctx, parts);
if (primPartCntrs != null && partCntrsBytes == null)
- partCntrsBytes0 = U.marshal(ctx, compatibilityMode ? convertToOld(primPartCntrs) : primPartCntrs);
+ partCntrsBytes0 = U.marshal(ctx, compatibilityMode ? primPartCntrs.toOldMap() : primPartCntrs);
if (partHistSuppliers != null && partHistSuppliersBytes == null)
partHistSuppliersBytes0 = U.marshal(ctx, partHistSuppliers);
@@ -543,4 +543,17 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa
return S.toString(GridDhtPartitionsFullMessage.class, this, "partCnt", parts != null ? parts.size() : 0,
"super", super.toString());
}
+
+ /**
+ * @param obj New or old counters map.
+ * @return New map.
+ */
+ private static IgniteDhtPartitionCountersPrimitiveMap convertToNew(Object obj) {
+ if (obj instanceof IgniteDhtPartitionCountersPrimitiveMap)
+ return (IgniteDhtPartitionCountersPrimitiveMap)obj;
+
+ IgniteDhtPartitionCountersMap old = (IgniteDhtPartitionCountersMap)obj;
+
+ return old.convertToNew();
+ }
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/dbcae598/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/IgniteDhtPartitionCountersMap.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/IgniteDhtPartitionCountersMap.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/IgniteDhtPartitionCountersMap.java
index 9db80ae..1e5c8e8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/IgniteDhtPartitionCountersMap.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/IgniteDhtPartitionCountersMap.java
@@ -61,4 +61,28 @@ public class IgniteDhtPartitionCountersMap implements Serializable {
return cntrMap;
}
+
+ /**
+ * @return New partition counters map.
+ */
+ public synchronized IgniteDhtPartitionCountersPrimitiveMap convertToNew() {
+ IgniteDhtPartitionCountersPrimitiveMap res = new IgniteDhtPartitionCountersPrimitiveMap();
+
+ for (Map.Entry<Integer, Map<Integer, T2<Long, Long>>> entry : map.entrySet()) {
+ Map<Integer, T2<Long, Long>> val = entry.getValue();
+
+ CachePartitionFullCountersMap fullCntrsMap = new CachePartitionFullCountersMap(val.size());
+
+ for (Map.Entry<Integer, T2<Long, Long>> cacheEntry : val.entrySet()) {
+ int part = cacheEntry.getKey();
+
+ fullCntrsMap.initialUpdateCounter(part, cacheEntry.getValue().get1());
+ fullCntrsMap.updateCounter(part, cacheEntry.getValue().get2());
+ }
+
+ res.putIfAbsent(entry.getKey(), fullCntrsMap);
+ }
+
+ return res;
+ }
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/dbcae598/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/IgniteDhtPartitionCountersPrimitiveMap.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/IgniteDhtPartitionCountersPrimitiveMap.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/IgniteDhtPartitionCountersPrimitiveMap.java
index af9be7f..270d29b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/IgniteDhtPartitionCountersPrimitiveMap.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/IgniteDhtPartitionCountersPrimitiveMap.java
@@ -21,6 +21,8 @@ package org.apache.ignite.internal.processors.cache.distributed.dht.preloader;
import java.io.Serializable;
import java.util.HashMap;
import java.util.Map;
+import org.apache.ignite.internal.util.typedef.T2;
+import org.apache.ignite.internal.util.typedef.internal.U;
/**
* Partition counters map.
@@ -59,4 +61,25 @@ public class IgniteDhtPartitionCountersPrimitiveMap implements Serializable {
return cntrMap;
}
+
+ /**
+ * @return Old map representation.
+ */
+ public synchronized IgniteDhtPartitionCountersMap toOldMap() {
+ IgniteDhtPartitionCountersMap res = new IgniteDhtPartitionCountersMap();
+
+ for (Map.Entry<Integer, CachePartitionFullCountersMap> entry : map.entrySet()) {
+ CachePartitionFullCountersMap val = entry.getValue();
+
+ Map<Integer, T2<Long, Long>> fullParts = U.newHashMap(val.size());
+
+ for (int i = 0; i < val.size(); i++) {
+ fullParts.put(i, new T2<>(val.initialUpdateCounter(i), val.updateCounter(i)));
+ }
+
+ res.putIfAbsent(entry.getKey(), fullParts);
+ }
+
+ return res;
+ }
}