You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2017/07/13 14:36:03 UTC
[34/50] ignite git commit: ignite-5578 Affinity for local join
ignite-5578 Affinity for local join
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/4a46272c
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/4a46272c
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/4a46272c
Branch: refs/heads/ignite-5578
Commit: 4a46272c61821e90e48c1e843f5dd1eda0320a09
Parents: 0e7064d
Author: sboikov <sb...@gridgain.com>
Authored: Wed Jul 12 18:25:39 2017 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Wed Jul 12 18:37:55 2017 +0300
----------------------------------------------------------------------
.../communication/GridIoMessageFactory.java | 4 +-
.../cache/CacheAffinitySharedManager.java | 1 +
.../dht/preloader/CacheGroupAffinity.java | 159 -------------
.../preloader/CacheGroupAffinityMessage.java | 229 +++++++++++++++++++
.../GridDhtPartitionsExchangeFuture.java | 91 ++------
.../preloader/GridDhtPartitionsFullMessage.java | 11 +-
.../CacheLateAffinityAssignmentTest.java | 2 +-
7 files changed, 262 insertions(+), 235 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/4a46272c/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
index 261a619..003c2f0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
@@ -81,7 +81,7 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNe
import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicUpdateResponse;
import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.NearCacheUpdates;
import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.UpdateErrors;
-import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.CacheGroupAffinity;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.CacheGroupAffinityMessage;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtForceKeysRequest;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtForceKeysResponse;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionDemandMessage;
@@ -871,7 +871,7 @@ public class GridIoMessageFactory implements MessageFactory {
break;
case 128:
- msg = new CacheGroupAffinity();
+ msg = new CacheGroupAffinityMessage();
break;
http://git-wip-us.apache.org/repos/asf/ignite/blob/4a46272c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
index 3f24547..879e6a9a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
@@ -1460,6 +1460,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
/**
* @param fut Exchange future.
+ * @param newAff {@code True} if there are no older nodes with affinity info available.
* @throws IgniteCheckedException If failed.
* @return Future completed when caches initialization is done.
*/
http://git-wip-us.apache.org/repos/asf/ignite/blob/4a46272c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/CacheGroupAffinity.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/CacheGroupAffinity.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/CacheGroupAffinity.java
deleted file mode 100644
index 1e1509a..0000000
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/CacheGroupAffinity.java
+++ /dev/null
@@ -1,159 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.cache.distributed.dht.preloader;
-
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.List;
-import org.apache.ignite.cluster.ClusterNode;
-import org.apache.ignite.internal.GridDirectCollection;
-import org.apache.ignite.internal.util.GridLongList;
-import org.apache.ignite.plugin.extensions.communication.Message;
-import org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType;
-import org.apache.ignite.plugin.extensions.communication.MessageReader;
-import org.apache.ignite.plugin.extensions.communication.MessageWriter;
-
-/**
- *
- */
-public class CacheGroupAffinity implements Message {
- /** */
- private static final long serialVersionUID = 0L;
-
- /** */
- private int grpId;
-
- /** */
- @GridDirectCollection(GridLongList.class)
- private List<GridLongList> assign;
-
- /**
- *
- */
- public CacheGroupAffinity() {
- // No-op.
- }
-
- /**
- * @param grpId Group ID.
- * @param assign0 Assignment.
- */
- CacheGroupAffinity(int grpId, List<List<ClusterNode>> assign0) {
- this.grpId = grpId;
-
- assign = new ArrayList<>(assign0.size());
-
- for (int i = 0; i < assign0.size(); i++) {
- List<ClusterNode> nodes = assign0.get(i);
-
- GridLongList l = new GridLongList(nodes.size());
-
- for (int n = 0; n < nodes.size(); n++)
- l.add(nodes.get(n).order());
-
- assign.add(l);
- }
- }
-
- /**
- * @return Cache group ID.
- */
- int groupId() {
- return grpId;
- }
-
- /**
- * @return Assignments.
- */
- List<GridLongList> assignments() {
- return assign;
- }
-
- /** {@inheritDoc} */
- @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
- writer.setBuffer(buf);
-
- if (!writer.isHeaderWritten()) {
- if (!writer.writeHeader(directType(), fieldsCount()))
- return false;
-
- writer.onHeaderWritten();
- }
-
- switch (writer.state()) {
- case 0:
- if (!writer.writeCollection("assign", assign, MessageCollectionItemType.MSG))
- return false;
-
- writer.incrementState();
-
- case 1:
- if (!writer.writeInt("grpId", grpId))
- return false;
-
- writer.incrementState();
-
- }
-
- return true;
- }
-
- /** {@inheritDoc} */
- @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
- reader.setBuffer(buf);
-
- if (!reader.beforeMessageRead())
- return false;
-
- switch (reader.state()) {
- case 0:
- assign = reader.readCollection("assign", MessageCollectionItemType.MSG);
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 1:
- grpId = reader.readInt("grpId");
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- }
-
- return reader.afterMessageRead(CacheGroupAffinity.class);
- }
-
- /** {@inheritDoc} */
- @Override public short directType() {
- return 128;
- }
-
- /** {@inheritDoc} */
- @Override public byte fieldsCount() {
- return 2;
- }
-
- /** {@inheritDoc} */
- @Override public void onAckReceived() {
- // No-op.
- }
-}
http://git-wip-us.apache.org/repos/asf/ignite/blob/4a46272c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/CacheGroupAffinityMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/CacheGroupAffinityMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/CacheGroupAffinityMessage.java
new file mode 100644
index 0000000..5cd5d26
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/CacheGroupAffinityMessage.java
@@ -0,0 +1,229 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.dht.preloader;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.internal.GridDirectCollection;
+import org.apache.ignite.internal.managers.discovery.DiscoCache;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
+import org.apache.ignite.internal.util.GridLongList;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.plugin.extensions.communication.Message;
+import org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType;
+import org.apache.ignite.plugin.extensions.communication.MessageReader;
+import org.apache.ignite.plugin.extensions.communication.MessageWriter;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ *
+ */
+public class CacheGroupAffinityMessage implements Message {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** */
+ private int grpId;
+
+ /** */
+ @GridDirectCollection(GridLongList.class)
+ private List<GridLongList> assigns;
+
+ /**
+ *
+ */
+ public CacheGroupAffinityMessage() {
+ // No-op.
+ }
+
+ /**
+ * @param grpId Group ID.
+ * @param assign0 Assignment.
+ */
+ private CacheGroupAffinityMessage(int grpId, List<List<ClusterNode>> assign0) {
+ this.grpId = grpId;
+
+ assigns = new ArrayList<>(assign0.size());
+
+ for (int i = 0; i < assign0.size(); i++) {
+ List<ClusterNode> nodes = assign0.get(i);
+
+ GridLongList l = new GridLongList(nodes.size());
+
+ for (int n = 0; n < nodes.size(); n++)
+ l.add(nodes.get(n).order());
+
+ assigns.add(l);
+ }
+ }
+
+ /**
+ * @return Cache group ID.
+ */
+ int groupId() {
+ return grpId;
+ }
+
+ /**
+ * @param cctx Context.
+ * @param topVer Topology version.
+ * @param affReq Cache group IDs.
+ * @param cachesAff Optional already prepared affinity.
+ * @return Affinity.
+ */
+ static Map<Integer, CacheGroupAffinityMessage> createAffinityMessages(
+ GridCacheSharedContext cctx,
+ AffinityTopologyVersion topVer,
+ Collection<Integer> affReq,
+ @Nullable Map<Integer, CacheGroupAffinityMessage> cachesAff) {
+ assert !F.isEmpty(affReq);
+
+ if (cachesAff == null)
+ cachesAff = U.newHashMap(affReq.size());
+
+ for (Integer grpId : affReq) {
+ if (!cachesAff.containsKey(grpId)) {
+ List<List<ClusterNode>> assign = cctx.affinity().affinity(grpId).assignments(topVer);
+
+ cachesAff.put(grpId, new CacheGroupAffinityMessage(grpId, assign));
+ }
+ }
+
+ return cachesAff;
+ }
+
+ /**
+ * @param nodesByOrder Nodes by order cache.
+ * @param discoCache Discovery data cache.
+ * @return Assignments.
+ */
+ List<List<ClusterNode>> createAssignments(Map<Long, ClusterNode> nodesByOrder, DiscoCache discoCache) {
+ List<List<ClusterNode>> assignments0 = new ArrayList<>(assigns.size());
+
+ for (int p = 0; p < assigns.size(); p++) {
+ GridLongList assign = assigns.get(p);
+ List<ClusterNode> assign0 = new ArrayList<>(assign.size());
+
+ for (int n = 0; n < assign.size(); n++) {
+ long order = assign.get(n);
+
+ ClusterNode affNode = nodesByOrder.get(order);
+
+ if (affNode == null) {
+ affNode = discoCache.serverNodeByOrder(order);
+
+ assert affNode != null : order;
+
+ nodesByOrder.put(order, affNode);
+ }
+
+ assign0.add(affNode);
+ }
+
+ assignments0.add(assign0);
+ }
+
+ return assignments0;
+ }
+
+
+ /** {@inheritDoc} */
+ @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
+ writer.setBuffer(buf);
+
+ if (!writer.isHeaderWritten()) {
+ if (!writer.writeHeader(directType(), fieldsCount()))
+ return false;
+
+ writer.onHeaderWritten();
+ }
+
+ switch (writer.state()) {
+ case 0:
+ if (!writer.writeCollection("assigns", assigns, MessageCollectionItemType.MSG))
+ return false;
+
+ writer.incrementState();
+
+ case 1:
+ if (!writer.writeInt("grpId", grpId))
+ return false;
+
+ writer.incrementState();
+
+ }
+
+ return true;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
+ reader.setBuffer(buf);
+
+ if (!reader.beforeMessageRead())
+ return false;
+
+ switch (reader.state()) {
+ case 0:
+ assigns = reader.readCollection("assigns", MessageCollectionItemType.MSG);
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 1:
+ grpId = reader.readInt("grpId");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ }
+
+ return reader.afterMessageRead(CacheGroupAffinityMessage.class);
+ }
+
+ /** {@inheritDoc} */
+ @Override public short directType() {
+ return 128;
+ }
+
+ /** {@inheritDoc} */
+ @Override public byte fieldsCount() {
+ return 2;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onAckReceived() {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(CacheGroupAffinityMessage.class, this);
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/4a46272c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
index 4a39bae..ab66df3 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
@@ -77,7 +77,6 @@ import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
import org.apache.ignite.internal.processors.cluster.ChangeGlobalStateFinishMessage;
import org.apache.ignite.internal.processors.cluster.ChangeGlobalStateMessage;
import org.apache.ignite.internal.processors.timeout.GridTimeoutObjectAdapter;
-import org.apache.ignite.internal.util.GridLongList;
import org.apache.ignite.internal.util.GridPartitionStateMap;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
import org.apache.ignite.internal.util.tostring.GridToStringExclude;
@@ -1183,7 +1182,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
* @param cachesAff Affinity if was requested by some nodes.
* @throws IgniteCheckedException If failed.
*/
- private void sendAllPartitions(Collection<ClusterNode> nodes, Collection<CacheGroupAffinity> cachesAff)
+ private void sendAllPartitions(Collection<ClusterNode> nodes, Collection<CacheGroupAffinityMessage> cachesAff)
throws IgniteCheckedException {
boolean singleNode = nodes.size() == 1;
@@ -1443,7 +1442,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
return;
}
- processMessage(node.id(), msg);
+ processSingleMessage(node.id(), msg);
}
});
}
@@ -1453,7 +1452,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
* @param nodeId Sender node.
* @param msg Message.
*/
- private void processMessage(UUID nodeId, GridDhtPartitionsSingleMessage msg) {
+ private void processSingleMessage(UUID nodeId, GridDhtPartitionsSingleMessage msg) {
boolean allReceived = false;
boolean updateSingleMap = false;
@@ -1723,29 +1722,6 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
}
/**
- * @param affReq Cache group IDs.
- * @param cachesAff Optional already prepared affinity.
- * @return Affinity.
- */
- private Map<Integer, CacheGroupAffinity> initCachesAffinity(Collection<Integer> affReq,
- @Nullable Map<Integer, CacheGroupAffinity> cachesAff) {
- assert !F.isEmpty(affReq);
-
- if (cachesAff == null)
- cachesAff = U.newHashMap(affReq.size());
-
- for (Integer grpId : affReq) {
- if (!cachesAff.containsKey(grpId)) {
- List<List<ClusterNode>> assign = cctx.affinity().affinity(grpId).assignments(topologyVersion());
-
- cachesAff.put(grpId, new CacheGroupAffinity(grpId, assign));
- }
- }
-
- return cachesAff;
- }
-
- /**
*
*/
private void onAllReceived() {
@@ -1761,7 +1737,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
}
}
- Map<Integer, CacheGroupAffinity> cachesAff = null;
+ Map<Integer, CacheGroupAffinityMessage> cachesAff = null;
for (Map.Entry<UUID, GridDhtPartitionsSingleMessage> e : msgs.entrySet()) {
GridDhtPartitionsSingleMessage msg = e.getValue();
@@ -1784,7 +1760,10 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
Collection<Integer> affReq = msg.cacheGroupsAffinityRequest();
if (affReq != null) {
- cachesAff = initCachesAffinity(affReq, cachesAff);
+ cachesAff = CacheGroupAffinityMessage.createAffinityMessages(cctx,
+ topologyVersion(),
+ affReq,
+ cachesAff);
UUID nodeId = e.getKey();
@@ -1930,10 +1909,14 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
if (n != null) {
Collection<Integer> affReq = msg.cacheGroupsAffinityRequest();
- Collection<CacheGroupAffinity> cachesAff = null;
+ Collection<CacheGroupAffinityMessage> cachesAff = null;
if (affReq != null) {
- Map<Integer, CacheGroupAffinity> affMap = initCachesAffinity(affReq, null);
+ Map<Integer, CacheGroupAffinityMessage> affMap = CacheGroupAffinityMessage.createAffinityMessages(
+ cctx,
+ msg.exchangeId().topologyVersion(),
+ affReq,
+ null);
cachesAff = affMap.values();
}
@@ -2004,7 +1987,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
return;
}
- processMessage(node, msg);
+ processFullMessage(node, msg);
}
});
}
@@ -2038,7 +2021,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
* @param node Sender node.
* @param msg Message.
*/
- private void processMessage(ClusterNode node, GridDhtPartitionsFullMessage msg) {
+ private void processFullMessage(ClusterNode node, GridDhtPartitionsFullMessage msg) {
assert exchId.equals(msg.exchangeId()) : msg;
assert msg.lastVersion() != null : msg;
@@ -2065,50 +2048,27 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
if (localJoinExchange() && affReq != null) {
Map<Long, ClusterNode> nodesByOrder = new HashMap<>();
- Collection<CacheGroupAffinity> cachesAff = msg.cachesAffinity();
+ Collection<CacheGroupAffinityMessage> cachesAff = msg.cachesAffinity();
assert !F.isEmpty(cachesAff) : msg;
assert cachesAff.size() >= affReq.size();
int cnt = 0;
- for (CacheGroupAffinity aff : cachesAff) {
+ for (CacheGroupAffinityMessage aff : cachesAff) {
if (affReq.contains(aff.groupId())) {
CacheGroupContext grp = cctx.cache().cacheGroup(aff.groupId());
assert grp != null : aff.groupId();
assert AffinityTopologyVersion.NONE.equals(grp.affinity().lastVersion());
- List<GridLongList> assignments = aff.assignments();
- List<List<ClusterNode>> assignments0 = new ArrayList<>(assignments.size());
-
- for (int p = 0; p < assignments.size(); p++) {
- GridLongList assign = assignments.get(p);
- List<ClusterNode> assign0 = new ArrayList<>(assign.size());
-
- for (int n = 0; n < assign.size(); n++) {
- long order = assign.get(n);
-
- ClusterNode affNode = nodesByOrder.get(order);
-
- if (affNode == null) {
- affNode = discoCache.serverNodeByOrder(order);
-
- assert affNode != null : order;
-
- nodesByOrder.put(order, affNode);
- }
-
- assign0.add(affNode);
- }
-
- assignments0.add(assign0);
- }
+ List<List<ClusterNode>> assignments = aff.createAssignments(nodesByOrder, discoCache);
+ // Calculate ideal assignments.
if (!grp.affinity().centralizedAffinityFunction())
grp.affinity().calculate(topologyVersion(), discoEvt, discoCache);
- grp.affinity().initialize(topologyVersion(), assignments0);
+ grp.affinity().initialize(topologyVersion(), assignments);
try {
grp.topology().initPartitions(this);
@@ -2309,7 +2269,6 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
try {
boolean crdChanged = false;
boolean allReceived = false;
- Set<UUID> remaining0 = null;
ClusterNode crd0;
@@ -2328,9 +2287,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
}
if (crd != null && crd.isLocal()) {
- if (crdChanged)
- remaining0 = new HashSet<>(remaining);
- else if (crdReady && rmvd)
+ if (!crdChanged && crdReady && rmvd)
allReceived = remaining.isEmpty();
}
@@ -2390,7 +2347,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
sendPartitions(crd0);
for (Map.Entry<ClusterNode, GridDhtPartitionsFullMessage> m : fullMsgs.entrySet())
- processMessage(m.getKey(), m.getValue());
+ processFullMessage(m.getKey(), m.getValue());
}
}
}
@@ -2447,7 +2404,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
}
for (Map.Entry<UUID, GridDhtPartitionsSingleMessage> m : singleMsgs.entrySet())
- processMessage(m.getKey(), m.getValue());
+ processSingleMessage(m.getKey(), m.getValue());
}
else {
awaitSingleMapUpdates();
http://git-wip-us.apache.org/repos/asf/ignite/blob/4a46272c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java
index 1ef383a..edc9c9e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java
@@ -33,7 +33,6 @@ import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
-import org.apache.ignite.internal.util.GridLongList;
import org.apache.ignite.internal.util.tostring.GridToStringInclude;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.T2;
@@ -104,8 +103,8 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa
private transient boolean compress;
/** */
- @GridDirectCollection(CacheGroupAffinity.class)
- private Collection<CacheGroupAffinity> cachesAff;
+ @GridDirectCollection(CacheGroupAffinityMessage.class)
+ private Collection<CacheGroupAffinityMessage> cachesAff;
/**
* Required by {@link Externalizable}.
@@ -156,7 +155,7 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa
* @param cachesAff Affinity.
* @return Message copy.
*/
- GridDhtPartitionsFullMessage copyWithAffinity(Collection<CacheGroupAffinity> cachesAff) {
+ GridDhtPartitionsFullMessage copyWithAffinity(Collection<CacheGroupAffinityMessage> cachesAff) {
assert !F.isEmpty(cachesAff) : cachesAff;
GridDhtPartitionsFullMessage cp = new GridDhtPartitionsFullMessage();
@@ -171,14 +170,14 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa
/**
* @return Affinity.
*/
- @Nullable Collection<CacheGroupAffinity> cachesAffinity() {
+ @Nullable Collection<CacheGroupAffinityMessage> cachesAffinity() {
return cachesAff;
}
/**
* @param cachesAff Affinity.
*/
- void cachesAffinity(Collection<CacheGroupAffinity> cachesAff) {
+ void cachesAffinity(Collection<CacheGroupAffinityMessage> cachesAff) {
this.cachesAff = cachesAff;
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/4a46272c/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheLateAffinityAssignmentTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheLateAffinityAssignmentTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheLateAffinityAssignmentTest.java
index bb99266..23043d1 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheLateAffinityAssignmentTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheLateAffinityAssignmentTest.java
@@ -1458,7 +1458,7 @@ public class CacheLateAffinityAssignmentTest extends GridCommonAbstractTest {
/**
* @throws Exception If failed.
*/
- public void _testRandomOperations() throws Exception {
+ public void testRandomOperations() throws Exception {
forceSrvMode = true;
final int MAX_SRVS = 10;