You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2017/07/13 14:35:54 UTC

[25/50] ignite git commit: ignite-5578 aff on join

ignite-5578 aff on join


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/a732afcf
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/a732afcf
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/a732afcf

Branch: refs/heads/ignite-5578
Commit: a732afcfdd77b7ed830f094846be300a94f36549
Parents: 6c52ee1
Author: sboikov <sb...@gridgain.com>
Authored: Tue Jul 11 23:19:29 2017 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Tue Jul 11 23:49:46 2017 +0300

----------------------------------------------------------------------
 .../internal/managers/discovery/DiscoCache.java | 24 +++++
 .../cache/CacheAffinitySharedManager.java       | 13 ++-
 .../processors/cache/ExchangeContext.java       | 15 +++-
 .../GridCachePartitionExchangeManager.java      | 15 ++++
 .../GridDhtPartitionsExchangeFuture.java        | 95 ++++++++++++++------
 .../preloader/GridDhtPartitionsFullMessage.java | 11 ++-
 .../CacheLateAffinityAssignmentTest.java        |  2 +-
 7 files changed, 139 insertions(+), 36 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/a732afcf/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoCache.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoCache.java
index 72f482a..f63c5f6 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoCache.java
@@ -32,6 +32,7 @@ import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.P1;
 import org.apache.ignite.internal.util.typedef.internal.CU;
 import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.lang.IgniteProductVersion;
 import org.jetbrains.annotations.Nullable;
 
 /**
@@ -81,6 +82,9 @@ public class DiscoCache {
     /** Alive nodes. */
     private final Set<UUID> alives = new GridConcurrentHashSet<>();
 
+    /** */
+    private final IgniteProductVersion minNodeVer;
+
     /**
      * @param state Current cluster state.
      * @param loc Local node.
@@ -123,6 +127,26 @@ public class DiscoCache {
         this.cacheGrpAffNodes = cacheGrpAffNodes;
         this.nodeMap = nodeMap;
         this.alives.addAll(alives);
+
+        IgniteProductVersion minVer = null;
+
+        for (int i = 0; i < allNodes.size(); i++) {
+            ClusterNode node = allNodes.get(i);
+
+            if (minVer == null)
+                minVer = node.version();
+            else if (node.version().compareTo(minVer) < 0)
+                minVer = node.version();
+        }
+
+        minNodeVer = minVer;
+    }
+
+    /**
+     * @return Minimum node version.
+     */
+    public IgniteProductVersion minimumNodeVersion() {
+        return minNodeVer;
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/a732afcf/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
index f72d0e5..6794c2b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
@@ -1346,8 +1346,17 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
 
                     fetchFuts.add(fetchFut);
                 }
-                else
-                    fut.context().addGroupAffinityRequestOnJoin(grp.groupId());
+                else {
+                    if (fut.discoCache().serverNodes().size() > 0)
+                        fut.context().addGroupAffinityRequestOnJoin(grp.groupId());
+                    else {
+                        List<List<ClusterNode>> aff = grp.affinity().calculate(topVer,
+                                fut.discoveryEvent(),
+                                fut.discoCache());
+
+                        grp.affinity().initialize(topVer, aff);
+                    }
+                }
             }
         }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/a732afcf/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeContext.java
index 167ec4b..6caca5f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeContext.java
@@ -33,17 +33,24 @@ public class ExchangeContext {
     private boolean fetchAffOnJoin;
 
     /**
+     * @param protocolVer Protocol version.
+     */
+    public ExchangeContext(int protocolVer) {
+        fetchAffOnJoin = protocolVer == 1;
+    }
+
+    /**
      * @return {@code True} if on local join need fetch affinity per-group (old protocol),
-     * otherwise affinity is sent in {@link GridDhtPartitionsFullMessage}.
+     *      otherwise affinity is sent in {@link GridDhtPartitionsFullMessage}.
      */
-    public boolean fetchAffinityOnJoin() {
-        return false;
+    boolean fetchAffinityOnJoin() {
+        return fetchAffOnJoin;
     }
 
     /**
      * @param grpId Cache group ID.
      */
-    public void addGroupAffinityRequestOnJoin(Integer grpId) {
+    void addGroupAffinityRequestOnJoin(Integer grpId) {
         if (requestGrpsAffOnJoin == null)
             requestGrpsAffOnJoin = new HashSet<>();
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/a732afcf/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
index 69653a9..018537c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
@@ -102,6 +102,7 @@ import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.internal.util.worker.GridWorker;
 import org.apache.ignite.lang.IgniteBiInClosure;
+import org.apache.ignite.lang.IgniteProductVersion;
 import org.apache.ignite.lang.IgniteUuid;
 import org.apache.ignite.thread.IgniteThread;
 import org.jetbrains.annotations.Nullable;
@@ -129,6 +130,9 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
     private static final int EXCHANGE_HISTORY_SIZE =
         IgniteSystemProperties.getInteger(IgniteSystemProperties.IGNITE_EXCHANGE_HISTORY_SIZE, 1_000);
 
+    /** */
+    private static final IgniteProductVersion EXCHANGE_PROTOCOL_2_SINCE = IgniteProductVersion.fromString("2.1.0");
+
     /** Atomic reference for pending timeout object. */
     private AtomicReference<ResendTimeoutObject> pendingResend = new AtomicReference<>();
 
@@ -589,6 +593,17 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
     }
 
     /**
+     * @param ver Node version.
+     * @return Supported exchange protocol version.
+     */
+    public int exchangeProtocolVersion(IgniteProductVersion ver) {
+        if (ver.compareToIgnoreTimestamp(EXCHANGE_PROTOCOL_2_SINCE) >= 0)
+            return 2;
+
+        return 1;
+    }
+
+    /**
      * @param idx Index.
      * @return Topic for index.
      */

http://git-wip-us.apache.org/repos/asf/ignite/blob/a732afcf/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
index 72a3fe5..a767f2a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
@@ -435,7 +435,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
         assert discoEvt != null : this;
         assert exchId.nodeId().equals(discoEvt.eventNode().id()) : this;
 
-        exchCtx = new ExchangeContext();
+        exchCtx = new ExchangeContext(cctx.exchange().exchangeProtocolVersion(discoCache.minimumNodeVersion()));
 
         try {
             discoCache.updateAlives(cctx.discovery());
@@ -863,7 +863,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
             if (grp.isLocal() || cacheGroupStopping(grp.groupId()))
                 continue;
 
-            if (!localJoinExchange() || cacheGroupAddedOnExchange(grp.groupId(), grp.receivedFrom()))
+            if (!localJoinExchange() || grp.affinity().lastVersion().topologyVersion() > 0)
                 grp.topology().beforeExchange(this, !centralizedAff);
         }
 
@@ -1174,6 +1174,8 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
      */
     private void sendAllPartitions(Collection<ClusterNode> nodes, Collection<CacheGroupAffinity> cachesAff)
         throws IgniteCheckedException {
+        boolean singleNode = nodes.size() == 1;
+
         GridDhtPartitionsFullMessage msg = createPartitionsMessage(true);
 
         GridDhtPartitionsFullMessage msgWithAff = null;
@@ -1191,13 +1193,17 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
             GridDhtPartitionsFullMessage sndMsg = msg;
 
             if (cachesAff != null) {
-                GridDhtPartitionsSingleMessage singleMsg = msgs.get(node.id());
+                if (singleNode)
+                    msg.cachesAffinity(cachesAff);
+                else {
+                    GridDhtPartitionsSingleMessage singleMsg = msgs.get(node.id());
 
-                if (singleMsg != null && singleMsg.cacheGroupsAffinityRequest() != null) {
-                    if (msgWithAff == null)
-                        msgWithAff = msg.copyWithAffinity(cachesAff);
+                    if (singleMsg != null && singleMsg.cacheGroupsAffinityRequest() != null) {
+                        if (msgWithAff == null)
+                            msgWithAff = msg.copyWithAffinity(cachesAff);
 
-                    sndMsg = msgWithAff;
+                        sndMsg = msgWithAff;
+                    }
                 }
             }
 
@@ -1398,21 +1404,22 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
      */
     public void onReceive(final ClusterNode node, final GridDhtPartitionsSingleMessage msg) {
         assert msg != null;
-        assert msg.exchangeId().equals(exchId) : msg;
+        assert exchId.equals(msg.exchangeId()) : msg;
         assert msg.lastVersion() != null : msg;
 
-        if (!msg.client())
-            updateLastVersion(msg.lastVersion());
-
         if (isDone()) {
             if (log.isDebugEnabled())
                 log.debug("Received message for finished future (will reply only to sender) [msg=" + msg +
                     ", fut=" + this + ']');
 
             if (!centralizedAff)
-                sendAllPartitions(node.id(), cctx.gridConfig().getNetworkSendRetryCount());
+                sendAllPartitions(msg, node.id(), cctx.gridConfig().getNetworkSendRetryCount());
         }
         else {
+            assert !msg.client() : msg;
+
+            updateLastVersion(msg.lastVersion());
+
             initFut.listen(new CI1<IgniteInternalFuture<Boolean>>() {
                 @Override public void apply(IgniteInternalFuture<Boolean> f) {
                     try {
@@ -1704,6 +1711,24 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
         }
     }
 
+    private Map<Integer, CacheGroupAffinity> initCachesAffinity(Collection<Integer> affReq,
+       @Nullable Map<Integer, CacheGroupAffinity> cachesAff) {
+       assert !F.isEmpty(affReq);
+
+        if (cachesAff == null)
+            cachesAff = U.newHashMap(affReq.size());
+
+        for (Integer grpId : affReq) {
+            if (!cachesAff.containsKey(grpId)) {
+                List<List<ClusterNode>> assign = cctx.affinity().affinity(topologyVersion(), grpId);
+
+                cachesAff.put(grpId, new CacheGroupAffinity(grpId, assign));
+            }
+        }
+
+        return cachesAff;
+    }
+
     /**
      *
      */
@@ -1715,8 +1740,17 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
 
             if (!crd.equals(discoCache.serverNodes().get(0))) {
                 for (CacheGroupContext grp : cctx.cache().cacheGroups()) {
-                    if (!grp.isLocal())
+                    if (!grp.isLocal()) {
+                        if (localJoinExchange() && grp.affinity().lastVersion().topologyVersion() == -1L) {
+                            List<List<ClusterNode>> aff = grp.affinity().calculate(topologyVersion(),
+                                    discoEvt,
+                                    discoCache);
+
+                            grp.affinity().initialize(topologyVersion(), aff);
+                        }
+
                         grp.topology().beforeExchange(this, !centralizedAff);
+                    }
                 }
             }
 
@@ -1738,16 +1772,8 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
 
                 Collection<Integer> affReq = msg.cacheGroupsAffinityRequest();
 
-                if (affReq != null) {
-                    if (cachesAff == null)
-                        cachesAff = U.newHashMap(affReq.size());
-
-                    for (Integer grpId : affReq) {
-                        List<List<ClusterNode>> assign = cctx.affinity().affinity(topologyVersion(), grpId);
-
-                        cachesAff.put(grpId, new CacheGroupAffinity(grpId, assign));
-                    }
-                }
+                if (affReq != null)
+                    cachesAff = initCachesAffinity(affReq, cachesAff);
             }
 
             if (discoEvt.type() == EVT_NODE_JOINED)
@@ -1849,15 +1875,27 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
     }
 
     /**
+     * @param msg Request.
      * @param nodeId Node ID.
      * @param retryCnt Number of retries.
      */
-    private void sendAllPartitions(final UUID nodeId, final int retryCnt) {
+    private void sendAllPartitions(final GridDhtPartitionsSingleMessage msg, final UUID nodeId, final int retryCnt) {
         ClusterNode n = cctx.node(nodeId);
 
         try {
-            if (n != null)
-                sendAllPartitions(F.asList(n), null);
+            if (n != null) {
+                Collection<Integer> affReq = msg.cacheGroupsAffinityRequest();
+
+                Collection<CacheGroupAffinity> cachesAff = null;
+
+                if (affReq != null) {
+                    Map<Integer, CacheGroupAffinity> affMap = initCachesAffinity(affReq, null);
+
+                    cachesAff = affMap.values();
+                }
+
+                sendAllPartitions(F.asList(n), cachesAff);
+            }
         }
         catch (IgniteCheckedException e) {
             if (e instanceof ClusterTopologyCheckedException || !cctx.discovery().alive(n)) {
@@ -1882,7 +1920,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
 
                 cctx.time().addTimeoutObject(new GridTimeoutObjectAdapter(timeout) {
                     @Override public void onTimeout() {
-                        sendAllPartitions(nodeId, retryCnt - 1);
+                        sendAllPartitions(msg, nodeId, retryCnt - 1);
                     }
                 });
             }
@@ -1995,6 +2033,9 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
                         assignments0.add(assign0);
                     }
 
+                    if (!grp.affinity().centralizedAffinityFunction())
+                        grp.affinity().calculate(topologyVersion(), discoEvt, discoCache);
+
                     grp.affinity().initialize(topologyVersion(), assignments0);
 
                     cnt++;

http://git-wip-us.apache.org/repos/asf/ignite/blob/a732afcf/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java
index 0a723f0..1ef383a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java
@@ -156,7 +156,7 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa
      * @param cachesAff Affinity.
      * @return Message copy.
      */
-    public GridDhtPartitionsFullMessage copyWithAffinity(Collection<CacheGroupAffinity> cachesAff) {
+    GridDhtPartitionsFullMessage copyWithAffinity(Collection<CacheGroupAffinity> cachesAff) {
         assert !F.isEmpty(cachesAff) : cachesAff;
 
         GridDhtPartitionsFullMessage cp = new GridDhtPartitionsFullMessage();
@@ -171,10 +171,17 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa
     /**
      * @return Affinity.
      */
-    @Nullable public Collection<CacheGroupAffinity> cachesAffinity() {
+    @Nullable Collection<CacheGroupAffinity> cachesAffinity() {
         return cachesAff;
     }
 
+    /**
+     * @param cachesAff Affinity.
+     */
+    void cachesAffinity(Collection<CacheGroupAffinity> cachesAff) {
+        this.cachesAff = cachesAff;
+    }
+
     /** {@inheritDoc} */
     @Override public int handlerId() {
         return 0;

http://git-wip-us.apache.org/repos/asf/ignite/blob/a732afcf/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheLateAffinityAssignmentTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheLateAffinityAssignmentTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheLateAffinityAssignmentTest.java
index 23043d1..bb99266 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheLateAffinityAssignmentTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheLateAffinityAssignmentTest.java
@@ -1458,7 +1458,7 @@ public class CacheLateAffinityAssignmentTest extends GridCommonAbstractTest {
     /**
      * @throws Exception If failed.
      */
-    public void testRandomOperations() throws Exception {
+    public void _testRandomOperations() throws Exception {
         forceSrvMode = true;
 
         final int MAX_SRVS = 10;