You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2017/07/19 10:13:31 UTC

[2/6] ignite git commit: ignite-5578 join

ignite-5578 join


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/056847c0
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/056847c0
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/056847c0

Branch: refs/heads/ignite-5578
Commit: 056847c091d091d678f6c96432d00e196115c3e7
Parents: dd44df9
Author: sboikov <sb...@gridgain.com>
Authored: Wed Jul 19 12:19:03 2017 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Wed Jul 19 12:19:03 2017 +0300

----------------------------------------------------------------------
 .../GridCachePartitionExchangeManager.java      | 17 -----
 .../preloader/CacheGroupAffinityMessage.java    | 75 ++++++++------------
 .../GridDhtPartitionsExchangeFuture.java        |  4 +-
 .../preloader/GridDhtPartitionsFullMessage.java | 37 +++-------
 4 files changed, 43 insertions(+), 90 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/056847c0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
index cfc3671..80121e2 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
@@ -2256,23 +2256,6 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
             return super.values();
         }
 
-        /**
-         * @param exchangeId Exchange ID.
-         * @return Future.
-         */
-        public synchronized GridDhtPartitionsExchangeFuture find(GridDhtPartitionExchangeId exchangeId) {
-            ListIterator<GridDhtPartitionsExchangeFuture> it = listIterator(size() - 1);
-
-            while (it.hasPrevious()) {
-                GridDhtPartitionsExchangeFuture fut0 = it.previous();
-
-                if (fut0.exchangeId().equals(exchangeId))
-                    return fut0;
-            }
-
-            return null;
-        }
-
         /** {@inheritDoc} */
         @Override public synchronized String toString() {
             return S.toString(ExchangeFutureSet.class, this, super.toString());

http://git-wip-us.apache.org/repos/asf/ignite/blob/056847c0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/CacheGroupAffinityMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/CacheGroupAffinityMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/CacheGroupAffinityMessage.java
index 5cd5d26..ee4ef45 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/CacheGroupAffinityMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/CacheGroupAffinityMessage.java
@@ -45,9 +45,6 @@ public class CacheGroupAffinityMessage implements Message {
     private static final long serialVersionUID = 0L;
 
     /** */
-    private int grpId;
-
-    /** */
     @GridDirectCollection(GridLongList.class)
     private List<GridLongList> assigns;
 
@@ -59,12 +56,9 @@ public class CacheGroupAffinityMessage implements Message {
     }
 
     /**
-     * @param grpId Group ID.
      * @param assign0 Assignment.
      */
-    private CacheGroupAffinityMessage(int grpId, List<List<ClusterNode>> assign0) {
-        this.grpId = grpId;
-
+    private CacheGroupAffinityMessage(List<List<ClusterNode>> assign0) {
         assigns = new ArrayList<>(assign0.size());
 
         for (int i = 0; i < assign0.size(); i++) {
@@ -80,13 +74,6 @@ public class CacheGroupAffinityMessage implements Message {
     }
 
     /**
-     * @return Cache group ID.
-     */
-    int groupId() {
-        return grpId;
-    }
-
-    /**
      * @param cctx Context.
      * @param topVer Topology version.
      * @param affReq Cache group IDs.
@@ -115,34 +102,46 @@ public class CacheGroupAffinityMessage implements Message {
     }
 
     /**
+     * @param assign Nodes orders.
      * @param nodesByOrder Nodes by order cache.
      * @param discoCache Discovery data cache.
-     * @return Assignments.
+     * @return Nodes list.
      */
-    List<List<ClusterNode>> createAssignments(Map<Long, ClusterNode> nodesByOrder, DiscoCache discoCache) {
-        List<List<ClusterNode>> assignments0 = new ArrayList<>(assigns.size());
+    public static List<ClusterNode> toNodes(GridLongList assign, Map<Long, ClusterNode> nodesByOrder, DiscoCache discoCache) {
+        List<ClusterNode> assign0 = new ArrayList<>(assign.size());
 
-        for (int p = 0; p < assigns.size(); p++) {
-            GridLongList assign = assigns.get(p);
-            List<ClusterNode> assign0 = new ArrayList<>(assign.size());
+        for (int n = 0; n < assign.size(); n++) {
+            long order = assign.get(n);
 
-            for (int n = 0; n < assign.size(); n++) {
-                long order = assign.get(n);
+            ClusterNode affNode = nodesByOrder.get(order);
 
-                ClusterNode affNode = nodesByOrder.get(order);
+            if (affNode == null) {
+                affNode = discoCache.serverNodeByOrder(order);
 
-                if (affNode == null) {
-                    affNode = discoCache.serverNodeByOrder(order);
+                assert affNode != null : "Failed to find node by order [order=" + order +
+                    ", topVer=" + discoCache.version() + ']';
 
-                    assert affNode != null : order;
+                nodesByOrder.put(order, affNode);
+            }
 
-                    nodesByOrder.put(order, affNode);
-                }
+            assign0.add(affNode);
+        }
 
-                assign0.add(affNode);
-            }
+        return assign0;
+    }
 
-            assignments0.add(assign0);
+    /**
+     * @param nodesByOrder Nodes by order cache.
+     * @param discoCache Discovery data cache.
+     * @return Assignments.
+     */
+    public List<List<ClusterNode>> createAssignments(Map<Long, ClusterNode> nodesByOrder, DiscoCache discoCache) {
+        List<List<ClusterNode>> assignments0 = new ArrayList<>(assigns.size());
+
+        for (int p = 0; p < assigns.size(); p++) {
+            GridLongList assign = assigns.get(p);
+
+            assignments0.add(toNodes(assign, nodesByOrder, discoCache));
         }
 
         return assignments0;
@@ -167,12 +166,6 @@ public class CacheGroupAffinityMessage implements Message {
 
                 writer.incrementState();
 
-            case 1:
-                if (!writer.writeInt("grpId", grpId))
-                    return false;
-
-                writer.incrementState();
-
         }
 
         return true;
@@ -194,14 +187,6 @@ public class CacheGroupAffinityMessage implements Message {
 
                 reader.incrementState();
 
-            case 1:
-                grpId = reader.readInt("grpId");
-
-                if (!reader.isLastRead())
-                    return false;
-
-                reader.incrementState();
-
         }
 
         return reader.afterMessageRead(CacheGroupAffinityMessage.class);

http://git-wip-us.apache.org/repos/asf/ignite/blob/056847c0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
index fa30fa2..6da7876 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
@@ -227,6 +227,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
     private ExchangeContext exchCtx;
 
     /** */
+    @GridToStringExclude
     private FinishState finishState;
 
     /**
@@ -873,7 +874,8 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
             if (grp.isLocal() || cacheGroupStopping(grp.groupId()))
                 continue;
 
-            if (!localJoinExchange() || grp.affinity().lastVersion().topologyVersion() > 0)
+            // It is possible affinity is not initialized yet if node joins to cluster.
+            if (grp.affinity().lastVersion().topologyVersion() > 0)
                 grp.topology().beforeExchange(this, !centralizedAff);
         }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/056847c0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java
index edc9c9e..1ea8757 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java
@@ -103,8 +103,8 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa
     private transient boolean compress;
 
     /** */
-    @GridDirectCollection(CacheGroupAffinityMessage.class)
-    private Collection<CacheGroupAffinityMessage> cachesAff;
+    @GridDirectMap(keyType = Integer.class, valueType = CacheGroupAffinityMessage.class)
+    private Map<Integer, CacheGroupAffinityMessage> joinedNodeAff;
 
     /**
      * Required by {@link Externalizable}.
@@ -148,37 +148,32 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa
         cp.partsToReload = partsToReload;
         cp.partsToReloadBytes = partsToReloadBytes;
         cp.topVer = topVer;
-        cp.cachesAff = cachesAff;
+        cp.joinedNodeAff = joinedNodeAff;
     }
 
     /**
-     * @param cachesAff Affinity.
      * @return Message copy.
      */
-    GridDhtPartitionsFullMessage copyWithAffinity(Collection<CacheGroupAffinityMessage> cachesAff) {
-        assert !F.isEmpty(cachesAff) : cachesAff;
-
+    GridDhtPartitionsFullMessage copy() {
         GridDhtPartitionsFullMessage cp = new GridDhtPartitionsFullMessage();
 
         copyStateTo(cp);
 
-        cp.cachesAff = cachesAff;
-
         return cp;
     }
 
     /**
-     * @return Affinity.
+     * @return Caches affinity for joining nodes.
      */
-    @Nullable Collection<CacheGroupAffinityMessage> cachesAffinity() {
-        return cachesAff;
+    @Nullable public Map<Integer, CacheGroupAffinityMessage> joinedNodeAffinity() {
+        return joinedNodeAff;
     }
 
     /**
-     * @param cachesAff Affinity.
+     * @param joinedNodeAff Caches affinity for joining nodes.
      */
-    void cachesAffinity(Collection<CacheGroupAffinityMessage> cachesAff) {
-        this.cachesAff = cachesAff;
+    void joinedNodeAffinity(Map<Integer, CacheGroupAffinityMessage> joinedNodeAff) {
+        this.joinedNodeAff = joinedNodeAff;
     }
 
     /** {@inheritDoc} */
@@ -461,11 +456,6 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa
         }
 
         switch (writer.state()) {
-            case 5:
-                if (!writer.writeCollection("cachesAff", cachesAff, MessageCollectionItemType.MSG))
-                    return false;
-
-                writer.incrementState();
 
             case 6:
                 if (!writer.writeMap("dupPartsData", dupPartsData, MessageCollectionItemType.INT, MessageCollectionItemType.INT))
@@ -525,13 +515,6 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa
             return false;
 
         switch (reader.state()) {
-            case 5:
-                cachesAff = reader.readCollection("cachesAff", MessageCollectionItemType.MSG);
-
-                if (!reader.isLastRead())
-                    return false;
-
-                reader.incrementState();
 
             case 6:
                 dupPartsData = reader.readMap("dupPartsData", MessageCollectionItemType.INT, MessageCollectionItemType.INT, false);