You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ag...@apache.org on 2017/08/15 16:05:27 UTC

[21/21] ignite git commit: IGNITE-5872 - Backward compatibility

IGNITE-5872 - Backward compatibility


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/dbcae598
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/dbcae598
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/dbcae598

Branch: refs/heads/ignite-5872
Commit: dbcae598db90135f0a50a649fd4ee56408589596
Parents: 4834b87
Author: Alexey Goncharuk <al...@gmail.com>
Authored: Tue Aug 15 19:04:41 2017 +0300
Committer: Alexey Goncharuk <al...@gmail.com>
Committed: Tue Aug 15 19:04:41 2017 +0300

----------------------------------------------------------------------
 .../internal/managers/discovery/DiscoCache.java | 23 ++++++++++++++----
 .../discovery/GridDiscoveryManager.java         | 16 +++++++++++--
 .../GridCachePartitionExchangeManager.java      | 25 +++++++++++++++-----
 .../CachePartitionFullCountersMap.java          |  7 ++++++
 .../GridDhtPartitionsExchangeFuture.java        | 14 +++++++----
 .../preloader/GridDhtPartitionsFullMessage.java | 15 +++++++++++-
 .../IgniteDhtPartitionCountersMap.java          | 24 +++++++++++++++++++
 .../IgniteDhtPartitionCountersPrimitiveMap.java | 23 ++++++++++++++++++
 8 files changed, 130 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/dbcae598/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoCache.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoCache.java
index 4c1077b..4a0185c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoCache.java
@@ -32,6 +32,7 @@ import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.P1;
 import org.apache.ignite.internal.util.typedef.internal.CU;
 import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.lang.IgniteProductVersion;
 import org.jetbrains.annotations.Nullable;
 
 /**
@@ -81,6 +82,9 @@ public class DiscoCache {
     /** Alive nodes. */
     private final Set<UUID> alives = new GridConcurrentHashSet<>();
 
+    /** */
+    private IgniteProductVersion minRmtProdVer;
+
     /**
      * @param state Current cluster state.
      * @param loc Local node.
@@ -95,6 +99,7 @@ public class DiscoCache {
      * @param cacheGrpAffNodes Affinity nodes by cache group ID.
      * @param nodeMap Node map.
      * @param alives Alive nodes.
+     * @param minRmtProdVer Minimal product version seen for remote nodes.
      */
     DiscoCache(
         DiscoveryDataClusterState state,
@@ -109,7 +114,9 @@ public class DiscoCache {
         Map<Integer, List<ClusterNode>> allCacheNodes,
         Map<Integer, List<ClusterNode>> cacheGrpAffNodes,
         Map<UUID, ClusterNode> nodeMap,
-        Set<UUID> alives) {
+        Set<UUID> alives,
+        IgniteProductVersion minRmtProdVer
+    ) {
         this.state = state;
         this.loc = loc;
         this.rmtNodes = rmtNodes;
@@ -123,6 +130,7 @@ public class DiscoCache {
         this.cacheGrpAffNodes = cacheGrpAffNodes;
         this.nodeMap = nodeMap;
         this.alives.addAll(alives);
+        this.minRmtProdVer = minRmtProdVer;
     }
 
     /**
@@ -209,7 +217,7 @@ public class DiscoCache {
     /**
      * @return Oldest alive server node.
      */
-    public @Nullable ClusterNode oldestAliveServerNode(){
+    @Nullable public ClusterNode oldestAliveServerNode(){
         Iterator<ClusterNode> it = aliveServerNodes().iterator();
         return it.hasNext() ? it.next() : null;
     }
@@ -217,7 +225,7 @@ public class DiscoCache {
     /**
      * @return Oldest alive server node with at least one cache configured.
      */
-    public @Nullable ClusterNode oldestAliveServerNodeWithCache(){
+    @Nullable public ClusterNode oldestAliveServerNodeWithCache(){
         Iterator<ClusterNode> it = aliveServerNodesWithCaches().iterator();
         return it.hasNext() ? it.next() : null;
     }
@@ -254,7 +262,7 @@ public class DiscoCache {
      * @param id Node ID.
      * @return Node.
      */
-    public @Nullable ClusterNode node(UUID id) {
+    @Nullable public ClusterNode node(UUID id) {
         return nodeMap.get(id);
     }
 
@@ -280,6 +288,13 @@ public class DiscoCache {
     }
 
     /**
+     * @return Minumim seen remote nodes version.
+     */
+    public IgniteProductVersion minimumRemoteNodesVersion() {
+        return minRmtProdVer;
+    }
+
+    /**
      * @param nodes Cluster nodes.
      * @return Empty collection if nodes list is {@code null}
      */

http://git-wip-us.apache.org/repos/asf/ignite/blob/dbcae598/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
index 834ba4d..d7a9b4f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
@@ -102,6 +102,7 @@ import org.apache.ignite.internal.util.worker.GridWorker;
 import org.apache.ignite.lang.IgniteFuture;
 import org.apache.ignite.lang.IgniteInClosure;
 import org.apache.ignite.lang.IgnitePredicate;
+import org.apache.ignite.lang.IgniteProductVersion;
 import org.apache.ignite.lang.IgniteUuid;
 import org.apache.ignite.plugin.security.SecurityCredentials;
 import org.apache.ignite.plugin.segmentation.SegmentationPolicy;
@@ -2165,6 +2166,8 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
         ArrayList<ClusterNode> rmtNodes = new ArrayList<>(topSnapshot.size());
         ArrayList<ClusterNode> allNodes = new ArrayList<>(topSnapshot.size());
 
+        IgniteProductVersion minProdVer = null;
+
         for (ClusterNode node : topSnapshot) {
             if (alive(node))
                 alives.add(node.id());
@@ -2174,9 +2177,17 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
             else {
                 allNodes.add(node);
 
-                if (!node.isLocal())
+                if (!node.isLocal()) {
                     rmtNodes.add(node);
 
+                    if (minProdVer == null)
+                        minProdVer = node.version();
+                    else {
+                        if (minProdVer.compareTo(node.version()) > 0)
+                            minProdVer = node.version();
+                    }
+                }
+
                 if (!CU.clientNode(node))
                     srvNodes.add(node);
             }
@@ -2243,7 +2254,8 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
             Collections.unmodifiableMap(allCacheNodes),
             Collections.unmodifiableMap(cacheGrpAffNodes),
             Collections.unmodifiableMap(nodeMap),
-            alives);
+            alives,
+            minProdVer);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/dbcae598/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
index 2880860..4001922 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
@@ -102,6 +102,7 @@ import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.internal.util.worker.GridWorker;
 import org.apache.ignite.lang.IgniteBiInClosure;
+import org.apache.ignite.lang.IgniteProductVersion;
 import org.apache.ignite.lang.IgniteUuid;
 import org.apache.ignite.thread.IgniteThread;
 import org.jetbrains.annotations.Nullable;
@@ -118,6 +119,7 @@ import static org.apache.ignite.events.EventType.EVT_NODE_LEFT;
 import static org.apache.ignite.internal.GridTopic.TOPIC_CACHE;
 import static org.apache.ignite.internal.events.DiscoveryCustomEvent.EVT_DISCOVERY_CUSTOM_EVT;
 import static org.apache.ignite.internal.managers.communication.GridIoPolicy.SYSTEM_POOL;
+import static org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture.PRIMITIVE_UPD_CNTRS_SINCE;
 import static org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture.nextDumpTimeout;
 import static org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPreloader.DFLT_PRELOAD_RESEND_TIMEOUT;
 
@@ -929,9 +931,19 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
      * @param nodes Nodes.
      * @param msgTopVer Topology version. Will be added to full message.
      */
-    private void sendAllPartitions(Collection<ClusterNode> nodes,
-        AffinityTopologyVersion msgTopVer) {
-        GridDhtPartitionsFullMessage m = createPartitionsFullMessage(true, null, null, null, null);
+    private void sendAllPartitions(
+        Collection<ClusterNode> nodes,
+        AffinityTopologyVersion msgTopVer
+    ) {
+        IgniteProductVersion minVer = cctx.discovery().discoCache(msgTopVer).minimumRemoteNodesVersion();
+
+        GridDhtPartitionsFullMessage m = createPartitionsFullMessage(
+            true,
+            minVer.compareTo(PRIMITIVE_UPD_CNTRS_SINCE) < 0,
+            null,
+            null,
+            null,
+            null);
 
         m.topologyVersion(msgTopVer);
 
@@ -966,6 +978,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
      */
     public GridDhtPartitionsFullMessage createPartitionsFullMessage(
         boolean compress,
+        boolean compatibility,
         @Nullable final GridDhtPartitionExchangeId exchId,
         @Nullable GridCacheVersion lastVer,
         @Nullable IgniteDhtPartitionHistorySuppliersMap partHistSuppliers,
@@ -975,8 +988,8 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
             lastVer,
             exchId != null ? exchId.topologyVersion() : AffinityTopologyVersion.NONE,
             partHistSuppliers,
-            partsToReload
-            );
+            partsToReload,
+            compatibility);
 
         m.compress(compress);
 
@@ -1110,7 +1123,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
         boolean sndCounters,
         ExchangeActions exchActions
     ) {
-        boolean compat = targetNode.version().compareTo(GridDhtPartitionsExchangeFuture.PRIMITIVE_UPD_CNTRS_SINCE) < 0;
+        boolean compat = targetNode.version().compareTo(PRIMITIVE_UPD_CNTRS_SINCE) < 0;
 
         GridDhtPartitionsSingleMessage m = new GridDhtPartitionsSingleMessage(exchangeId,
             clientOnlyExchange,

http://git-wip-us.apache.org/repos/asf/ignite/blob/dbcae598/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/CachePartitionFullCountersMap.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/CachePartitionFullCountersMap.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/CachePartitionFullCountersMap.java
index 1384a55..fde0599 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/CachePartitionFullCountersMap.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/CachePartitionFullCountersMap.java
@@ -50,6 +50,13 @@ public class CachePartitionFullCountersMap implements Serializable {
     }
 
     /**
+     * @return Total number of partitions.
+     */
+    public int size() {
+        return initialUpdCntrs.length;
+    }
+
+    /**
      * Gets an initial update counter by the partition ID.
      *
      * @param p Partition ID.

http://git-wip-us.apache.org/repos/asf/ignite/blob/dbcae598/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
index e8da0b9..d6183fc 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
@@ -118,7 +118,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
         IgniteSystemProperties.getInteger(IGNITE_PARTITION_RELEASE_FUTURE_DUMP_THRESHOLD, 0);
 
     /** */
-    public static IgniteProductVersion PRIMITIVE_UPD_CNTRS_SINCE = IgniteProductVersion.fromString("8.1.4");
+    public static final IgniteProductVersion PRIMITIVE_UPD_CNTRS_SINCE = IgniteProductVersion.fromString("8.1.4");
 
     /** */
     @GridToStringExclude
@@ -1167,11 +1167,12 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
      * @param compress Message compress flag.
      * @return Message.
      */
-    private GridDhtPartitionsFullMessage createPartitionsMessage(boolean compress) {
+    private GridDhtPartitionsFullMessage createPartitionsMessage(boolean compress, boolean compatibility) {
         GridCacheVersion last = lastVer.get();
 
         GridDhtPartitionsFullMessage m = cctx.exchange().createPartitionsFullMessage(
             compress,
+            compatibility,
             exchangeId(),
             last != null ? last : cctx.versions().last(),
             partHistSuppliers,
@@ -1188,7 +1189,9 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
      * @throws IgniteCheckedException If failed.
      */
     private void sendAllPartitions(Collection<ClusterNode> nodes) throws IgniteCheckedException {
-        GridDhtPartitionsFullMessage m = createPartitionsMessage(true);
+        IgniteProductVersion minVer = cctx.discovery().discoCache(topologyVersion()).minimumRemoteNodesVersion();
+
+        GridDhtPartitionsFullMessage m = createPartitionsMessage(true, minVer.compareTo(PRIMITIVE_UPD_CNTRS_SINCE) < 0);
 
         assert !nodes.contains(cctx.localNode());
 
@@ -1510,7 +1513,10 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
 
             Map<Integer, Map<Integer, List<UUID>>> assignmentChange = fut.get();
 
-            GridDhtPartitionsFullMessage m = createPartitionsMessage(false);
+            IgniteProductVersion minVer = cctx.discovery().discoCache(topologyVersion()).minimumRemoteNodesVersion();
+
+            GridDhtPartitionsFullMessage m = createPartitionsMessage(false,
+                minVer.compareTo(PRIMITIVE_UPD_CNTRS_SINCE) < 0);
 
             CacheAffinityChangeMessage msg = new CacheAffinityChangeMessage(exchId, m, assignmentChange);
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/dbcae598/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java
index 4e04f19..8d259ab 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java
@@ -260,7 +260,7 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa
                 partsBytes0 = U.marshal(ctx, parts);
 
             if (primPartCntrs != null && partCntrsBytes == null)
-                partCntrsBytes0 = U.marshal(ctx, compatibilityMode ? convertToOld(primPartCntrs) : primPartCntrs);
+                partCntrsBytes0 = U.marshal(ctx, compatibilityMode ? primPartCntrs.toOldMap() : primPartCntrs);
 
             if (partHistSuppliers != null && partHistSuppliersBytes == null)
                 partHistSuppliersBytes0 = U.marshal(ctx, partHistSuppliers);
@@ -543,4 +543,17 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa
         return S.toString(GridDhtPartitionsFullMessage.class, this, "partCnt", parts != null ? parts.size() : 0,
             "super", super.toString());
     }
+
+    /**
+     * @param obj New or old counters map.
+     * @return New map.
+     */
+    private static IgniteDhtPartitionCountersPrimitiveMap convertToNew(Object obj) {
+        if (obj instanceof IgniteDhtPartitionCountersPrimitiveMap)
+            return (IgniteDhtPartitionCountersPrimitiveMap)obj;
+
+        IgniteDhtPartitionCountersMap old = (IgniteDhtPartitionCountersMap)obj;
+
+        return old.convertToNew();
+    }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/dbcae598/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/IgniteDhtPartitionCountersMap.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/IgniteDhtPartitionCountersMap.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/IgniteDhtPartitionCountersMap.java
index 9db80ae..1e5c8e8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/IgniteDhtPartitionCountersMap.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/IgniteDhtPartitionCountersMap.java
@@ -61,4 +61,28 @@ public class IgniteDhtPartitionCountersMap implements Serializable {
 
         return cntrMap;
     }
+
+    /**
+     * @return New partition counters map.
+     */
+    public synchronized IgniteDhtPartitionCountersPrimitiveMap convertToNew() {
+        IgniteDhtPartitionCountersPrimitiveMap res = new IgniteDhtPartitionCountersPrimitiveMap();
+
+        for (Map.Entry<Integer, Map<Integer, T2<Long, Long>>> entry : map.entrySet()) {
+            Map<Integer, T2<Long, Long>> val = entry.getValue();
+
+            CachePartitionFullCountersMap fullCntrsMap = new CachePartitionFullCountersMap(val.size());
+
+            for (Map.Entry<Integer, T2<Long, Long>> cacheEntry : val.entrySet()) {
+                int part = cacheEntry.getKey();
+
+                fullCntrsMap.initialUpdateCounter(part, cacheEntry.getValue().get1());
+                fullCntrsMap.updateCounter(part, cacheEntry.getValue().get2());
+            }
+
+            res.putIfAbsent(entry.getKey(), fullCntrsMap);
+        }
+
+        return res;
+    }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/dbcae598/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/IgniteDhtPartitionCountersPrimitiveMap.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/IgniteDhtPartitionCountersPrimitiveMap.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/IgniteDhtPartitionCountersPrimitiveMap.java
index af9be7f..270d29b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/IgniteDhtPartitionCountersPrimitiveMap.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/IgniteDhtPartitionCountersPrimitiveMap.java
@@ -21,6 +21,8 @@ package org.apache.ignite.internal.processors.cache.distributed.dht.preloader;
 import java.io.Serializable;
 import java.util.HashMap;
 import java.util.Map;
+import org.apache.ignite.internal.util.typedef.T2;
+import org.apache.ignite.internal.util.typedef.internal.U;
 
 /**
  * Partition counters map.
@@ -59,4 +61,25 @@ public class IgniteDhtPartitionCountersPrimitiveMap implements Serializable {
 
         return cntrMap;
     }
+
+    /**
+     * @return Old map representation.
+     */
+    public synchronized IgniteDhtPartitionCountersMap toOldMap() {
+        IgniteDhtPartitionCountersMap res = new IgniteDhtPartitionCountersMap();
+
+        for (Map.Entry<Integer, CachePartitionFullCountersMap> entry : map.entrySet()) {
+            CachePartitionFullCountersMap val = entry.getValue();
+
+            Map<Integer, T2<Long, Long>> fullParts = U.newHashMap(val.size());
+
+            for (int i = 0; i < val.size(); i++) {
+                fullParts.put(i, new T2<>(val.initialUpdateCounter(i), val.updateCounter(i)));
+            }
+
+            res.putIfAbsent(entry.getKey(), fullParts);
+        }
+
+        return res;
+    }
 }