You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2016/11/02 11:01:24 UTC
[1/6] ignite git commit: ignite-4154
Repository: ignite
Updated Branches:
refs/heads/ignite-4154-2 [created] d5d58f0af
ignite-4154
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/a8ed7a74
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/a8ed7a74
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/a8ed7a74
Branch: refs/heads/ignite-4154-2
Commit: a8ed7a740ebf86731831d0125db49f54cedd11d5
Parents: 76126bb
Author: sboikov <sb...@gridgain.com>
Authored: Tue Nov 1 16:59:44 2016 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Tue Nov 1 16:59:44 2016 +0300
----------------------------------------------------------------------
.../ignite/spi/discovery/tcp/ServerImpl.java | 72 ++++++++++----------
1 file changed, 35 insertions(+), 37 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/a8ed7a74/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
index 0277061..d03ba5b 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
@@ -2140,8 +2140,6 @@ class ServerImpl extends TcpDiscoveryImpl {
this.discardId = discardId;
this.customDiscardId = customDiscardId;
-
- cleanup();
}
/**
@@ -2163,41 +2161,41 @@ class ServerImpl extends TcpDiscoveryImpl {
*
*/
void cleanup() {
-// Iterator<TcpDiscoveryAbstractMessage> msgIt = msgs.iterator();
-//
-// boolean skipMsg = discardId != null;
-// boolean skipCustomMsg = customDiscardId != null;
-//
-// while (msgIt.hasNext()) {
-// TcpDiscoveryAbstractMessage msg0 = msgIt.next();
-//
-// if (msg0 instanceof TcpDiscoveryCustomEventMessage) {
-// if (skipCustomMsg) {
-// assert customDiscardId != null;
-//
-// if (F.eq(customDiscardId, msg0.id()))
-// skipCustomMsg = false;
-// else
-// msgIt.remove();
-//
-// continue;
-// }
-// }
-// else {
-// if (skipMsg) {
-// assert discardId != null;
-//
-// if (F.eq(discardId, msg0.id()))
-// skipMsg = false;
-// else
-// msgIt.remove();
-//
-// continue;
-// }
-// }
-//
-// break;
-// }
+ Iterator<TcpDiscoveryAbstractMessage> msgIt = msgs.iterator();
+
+ boolean skipMsg = discardId != null;
+ boolean skipCustomMsg = customDiscardId != null;
+
+ while (msgIt.hasNext()) {
+ TcpDiscoveryAbstractMessage msg0 = msgIt.next();
+
+ if (msg0 instanceof TcpDiscoveryCustomEventMessage) {
+ if (skipCustomMsg) {
+ assert customDiscardId != null;
+
+ if (F.eq(customDiscardId, msg0.id()))
+ skipCustomMsg = false;
+ else
+ msgIt.remove();
+
+ continue;
+ }
+ }
+ else {
+ if (skipMsg) {
+ assert discardId != null;
+
+ if (F.eq(discardId, msg0.id()))
+ skipMsg = false;
+ else
+ msgIt.remove();
+
+ continue;
+ }
+ }
+
+ break;
+ }
}
/**
[2/6] ignite git commit: ignite-4154
Posted by sb...@apache.org.
ignite-4154
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/d4568ff8
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/d4568ff8
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/d4568ff8
Branch: refs/heads/ignite-4154-2
Commit: d4568ff86cd6af332b1789b52e170f8906c5aee0
Parents: a8ed7a7
Author: sboikov <sb...@gridgain.com>
Authored: Tue Nov 1 20:18:12 2016 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Tue Nov 1 20:18:12 2016 +0300
----------------------------------------------------------------------
.../java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java | 7 ++++---
1 file changed, 4 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/d4568ff8/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
index d03ba5b..ee58421 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
@@ -2148,13 +2148,14 @@ class ServerImpl extends TcpDiscoveryImpl {
* @param id Discarded message ID.
* @param custom {@code True} if discard for {@link TcpDiscoveryCustomEventMessage}.
*/
- void discard(IgniteUuid id, boolean custom) {
+ void discard(IgniteUuid id, boolean custom, boolean cleanup) {
if (custom)
customDiscardId = id;
else
discardId = id;
- cleanup();
+ if (cleanup)
+ cleanup();
}
/**
@@ -4932,7 +4933,7 @@ class ServerImpl extends TcpDiscoveryImpl {
}
if (msg.verified())
- pendingMsgs.discard(msgId, msg.customMessageDiscard());
+ pendingMsgs.discard(msgId, msg.customMessageDiscard(), spiState == CONNECTED);
if (ring.hasRemoteNodes())
sendMessageAcrossRing(msg);
[3/6] ignite git commit: ignite-4154
Posted by sb...@apache.org.
ignite-4154
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/17b82918
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/17b82918
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/17b82918
Branch: refs/heads/ignite-4154-2
Commit: 17b82918ad37c19fd6574ee1b5870c25fd9d540b
Parents: d4568ff
Author: sboikov <sb...@gridgain.com>
Authored: Wed Nov 2 07:31:07 2016 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Wed Nov 2 07:31:07 2016 +0300
----------------------------------------------------------------------
.../ignite/spi/discovery/tcp/ServerImpl.java | 49 +++++++++++++-------
.../TcpDiscoveryNodeAddFinishedMessage.java | 11 +++++
2 files changed, 42 insertions(+), 18 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/17b82918/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
index ee58421..9179ddb 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
@@ -40,6 +40,7 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
+import java.util.LinkedHashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
@@ -1486,7 +1487,7 @@ class ServerImpl extends TcpDiscoveryImpl {
}
nodeAddedMsg.topology(topToSnd);
- nodeAddedMsg.messages(msgs, discardMsgId, discardCustomMsgId);
+ nodeAddedMsg.messages(msgs != null ? new ArrayList<>(msgs) : msgs, discardMsgId, discardCustomMsgId);
Map<Long, Collection<ClusterNode>> hist;
@@ -1901,6 +1902,10 @@ class ServerImpl extends TcpDiscoveryImpl {
TcpDiscoveryNodeAddFinishedMessage addFinishMsg = (TcpDiscoveryNodeAddFinishedMessage)msg;
if (addFinishMsg.clientDiscoData() != null) {
+ addFinishMsg = new TcpDiscoveryNodeAddFinishedMessage(addFinishMsg);
+
+ msg = addFinishMsg;
+
Map<UUID, Map<Integer, byte[]>> discoData = addFinishMsg.clientDiscoData();
Set<UUID> replaced = null;
@@ -1960,6 +1965,7 @@ class ServerImpl extends TcpDiscoveryImpl {
if (addFinishMsg.clientDiscoData() != null && clientId.equals(addFinishMsg.nodeId())) {
addFinishMsg.clientDiscoData(null);
+ addFinishMsg.clientNodeAttributes(null);
break;
}
@@ -2091,7 +2097,7 @@ class ServerImpl extends TcpDiscoveryImpl {
private static final int MAX = 1024;
/** Pending messages. */
- private final Queue<TcpDiscoveryAbstractMessage> msgs = new ArrayDeque<>(MAX * 2);
+ private final LinkedHashMap<IgniteUuid, TcpDiscoveryAbstractMessage> msgs = U.newLinkedHashMap(MAX * 2);
/** Processed custom message IDs. */
private Set<IgniteUuid> procCustomMsgs = new GridBoundedLinkedHashSet<>(MAX * 2);
@@ -2109,10 +2115,10 @@ class ServerImpl extends TcpDiscoveryImpl {
* @param msg Message to add.
*/
void add(TcpDiscoveryAbstractMessage msg) {
- msgs.add(msg);
+ msgs.put(msg.id(), msg);
while (msgs.size() > MAX) {
- TcpDiscoveryAbstractMessage polled = msgs.poll();
+ TcpDiscoveryAbstractMessage polled = msgs.remove(msgs.keySet().iterator().next());
assert polled != null;
@@ -2135,8 +2141,10 @@ class ServerImpl extends TcpDiscoveryImpl {
) {
this.msgs.clear();
- if (msgs != null)
- this.msgs.addAll(msgs);
+ if (msgs != null) {
+ for (TcpDiscoveryAbstractMessage msg : msgs)
+ this.msgs.put(msg.id(), msg);
+ }
this.discardId = discardId;
this.customDiscardId = customDiscardId;
@@ -2148,21 +2156,26 @@ class ServerImpl extends TcpDiscoveryImpl {
* @param id Discarded message ID.
* @param custom {@code True} if discard for {@link TcpDiscoveryCustomEventMessage}.
*/
- void discard(IgniteUuid id, boolean custom, boolean cleanup) {
+ void discard(IgniteUuid id, boolean custom) {
if (custom)
customDiscardId = id;
else
discardId = id;
- if (cleanup)
- cleanup();
+ cleanup();
}
/**
*
*/
void cleanup() {
- Iterator<TcpDiscoveryAbstractMessage> msgIt = msgs.iterator();
+ if (discardId != null && !msgs.containsKey(discardId))
+ return;
+
+ if (customDiscardId != null && !msgs.containsKey(customDiscardId))
+ return;
+
+ Iterator<TcpDiscoveryAbstractMessage> msgIt = msgs.values().iterator();
boolean skipMsg = discardId != null;
boolean skipCustomMsg = customDiscardId != null;
@@ -2219,7 +2232,7 @@ class ServerImpl extends TcpDiscoveryImpl {
private boolean skipCustomMsg = customDiscardId != null;
/** Internal iterator. */
- private Iterator<TcpDiscoveryAbstractMessage> msgIt = msgs.iterator();
+ private Iterator<TcpDiscoveryAbstractMessage> msgIt = msgs.values().iterator();
/** Next message. */
private TcpDiscoveryAbstractMessage next;
@@ -2837,7 +2850,7 @@ class ServerImpl extends TcpDiscoveryImpl {
for (TcpDiscoveryAbstractMessage pendingMsg : pendingMsgs) {
long tstamp = U.currentTimeMillis();
- prepareNodeAddedMessage(pendingMsg, next.id(), pendingMsgs.msgs,
+ prepareNodeAddedMessage(pendingMsg, next.id(), pendingMsgs.msgs.values(),
pendingMsgs.discardId, pendingMsgs.customDiscardId);
if (timeoutHelper == null)
@@ -2881,8 +2894,8 @@ class ServerImpl extends TcpDiscoveryImpl {
msg = new TcpDiscoveryStatusCheckMessage(locNode, null);
}
else
- prepareNodeAddedMessage(msg, next.id(), pendingMsgs.msgs, pendingMsgs.discardId,
- pendingMsgs.customDiscardId);
+ prepareNodeAddedMessage(msg, next.id(), pendingMsgs.msgs.values(),
+ pendingMsgs.discardId, pendingMsgs.customDiscardId);
try {
long tstamp = U.currentTimeMillis();
@@ -3045,8 +3058,8 @@ class ServerImpl extends TcpDiscoveryImpl {
debugLog(msg, "Pending messages will be resent to local node");
for (TcpDiscoveryAbstractMessage pendingMsg : pendingMsgs) {
- prepareNodeAddedMessage(pendingMsg, locNodeId, pendingMsgs.msgs, pendingMsgs.discardId,
- pendingMsgs.customDiscardId);
+ prepareNodeAddedMessage(pendingMsg, locNodeId, pendingMsgs.msgs.values(),
+ pendingMsgs.discardId, pendingMsgs.customDiscardId);
pendingMsg.senderNodeId(locNodeId);
@@ -3106,7 +3119,7 @@ class ServerImpl extends TcpDiscoveryImpl {
if (pendingMsgs.msgs.isEmpty())
return false;
- for (TcpDiscoveryAbstractMessage pendingMsg : pendingMsgs.msgs) {
+ for (TcpDiscoveryAbstractMessage pendingMsg : pendingMsgs.msgs.values()) {
if (pendingMsg instanceof TcpDiscoveryNodeAddedMessage) {
TcpDiscoveryNodeAddedMessage addMsg = (TcpDiscoveryNodeAddedMessage)pendingMsg;
@@ -4933,7 +4946,7 @@ class ServerImpl extends TcpDiscoveryImpl {
}
if (msg.verified())
- pendingMsgs.discard(msgId, msg.customMessageDiscard(), spiState == CONNECTED);
+ pendingMsgs.discard(msgId, msg.customMessageDiscard());
if (ring.hasRemoteNodes())
sendMessageAcrossRing(msg);
http://git-wip-us.apache.org/repos/asf/ignite/blob/17b82918/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeAddFinishedMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeAddFinishedMessage.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeAddFinishedMessage.java
index 1b99a56..80f4565 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeAddFinishedMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeAddFinishedMessage.java
@@ -59,6 +59,17 @@ public class TcpDiscoveryNodeAddFinishedMessage extends TcpDiscoveryAbstractMess
}
/**
+ * @param msg Message.
+ */
+ public TcpDiscoveryNodeAddFinishedMessage(TcpDiscoveryNodeAddFinishedMessage msg) {
+ super(msg);
+
+ nodeId = msg.nodeId;
+ clientDiscoData = msg.clientDiscoData;
+ clientNodeAttrs = msg.clientNodeAttrs;
+ }
+
+ /**
* Gets ID of the node added.
*
* @return ID of the node added.
[4/6] ignite git commit: ignite-4154
Posted by sb...@apache.org.
ignite-4154
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/f74c9f4e
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/f74c9f4e
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/f74c9f4e
Branch: refs/heads/ignite-4154-2
Commit: f74c9f4e24dc49176fc19e84069bfc10c53133e5
Parents: 17b8291
Author: sboikov <sb...@gridgain.com>
Authored: Wed Nov 2 09:57:25 2016 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Wed Nov 2 09:57:25 2016 +0300
----------------------------------------------------------------------
.../ignite/spi/discovery/tcp/ServerImpl.java | 53 ++++----------------
.../messages/TcpDiscoveryNodeAddedMessage.java | 7 +++
2 files changed, 17 insertions(+), 43 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/f74c9f4e/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
index 9179ddb..e182177 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
@@ -2159,56 +2159,23 @@ class ServerImpl extends TcpDiscoveryImpl {
void discard(IgniteUuid id, boolean custom) {
if (custom)
customDiscardId = id;
- else
+ else {
discardId = id;
- cleanup();
- }
-
- /**
- *
- */
- void cleanup() {
- if (discardId != null && !msgs.containsKey(discardId))
- return;
-
- if (customDiscardId != null && !msgs.containsKey(customDiscardId))
- return;
-
- Iterator<TcpDiscoveryAbstractMessage> msgIt = msgs.values().iterator();
-
- boolean skipMsg = discardId != null;
- boolean skipCustomMsg = customDiscardId != null;
+ TcpDiscoveryAbstractMessage msg = msgs.get(id);
- while (msgIt.hasNext()) {
- TcpDiscoveryAbstractMessage msg0 = msgIt.next();
+ if (msg instanceof TcpDiscoveryNodeAddedMessage) {
+ TcpDiscoveryNodeAddedMessage msg0 = (TcpDiscoveryNodeAddedMessage)msg;
- if (msg0 instanceof TcpDiscoveryCustomEventMessage) {
- if (skipCustomMsg) {
- assert customDiscardId != null;
-
- if (F.eq(customDiscardId, msg0.id()))
- skipCustomMsg = false;
- else
- msgIt.remove();
-
- continue;
- }
+ msg0.oldNodesDiscoveryData(null);
+ msg0.newNodeDiscoveryData(null);
}
- else {
- if (skipMsg) {
- assert discardId != null;
+ else if (msg instanceof TcpDiscoveryNodeAddFinishedMessage) {
+ TcpDiscoveryNodeAddFinishedMessage msg0 = (TcpDiscoveryNodeAddFinishedMessage)msg;
- if (F.eq(discardId, msg0.id()))
- skipMsg = false;
- else
- msgIt.remove();
-
- continue;
- }
+ msg0.clientDiscoData(null);
+ msg0.clientNodeAttributes(null);
}
-
- break;
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/f74c9f4e/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeAddedMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeAddedMessage.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeAddedMessage.java
index bd52c04..7b8e5c8 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeAddedMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeAddedMessage.java
@@ -229,6 +229,13 @@ public class TcpDiscoveryNodeAddedMessage extends TcpDiscoveryAbstractMessage {
}
/**
+ * @param newNodeDiscoData Discovery data from new node.
+ */
+ public void newNodeDiscoveryData(Map<Integer, byte[]> newNodeDiscoData) {
+ this.newNodeDiscoData = newNodeDiscoData;
+ }
+
+ /**
* @return Discovery data from old nodes.
*/
public Map<UUID, Map<Integer, byte[]>> oldNodesDiscoveryData() {
[6/6] ignite git commit: Merge branch 'ignite-4154' into ignite-4154-2
Posted by sb...@apache.org.
Merge branch 'ignite-4154' into ignite-4154-2
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/d5d58f0a
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/d5d58f0a
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/d5d58f0a
Branch: refs/heads/ignite-4154-2
Commit: d5d58f0af5339345f610edfae7be1e179ce16821
Parents: 87b09db 8c624a8
Author: sboikov <sb...@gridgain.com>
Authored: Wed Nov 2 14:01:04 2016 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Wed Nov 2 14:01:04 2016 +0300
----------------------------------------------------------------------
.../processors/cache/distributed/IgniteCacheGetRestartTest.java | 3 +++
1 file changed, 3 insertions(+)
----------------------------------------------------------------------
[5/6] ignite git commit: ignite-4154 affinity
Posted by sb...@apache.org.
ignite-4154 affinity
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/87b09dba
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/87b09dba
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/87b09dba
Branch: refs/heads/ignite-4154-2
Commit: 87b09dba85b5ed7996ba93a10ef4f28eb398c4a8
Parents: f74c9f4
Author: sboikov <sb...@gridgain.com>
Authored: Wed Nov 2 11:44:24 2016 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Wed Nov 2 14:00:33 2016 +0300
----------------------------------------------------------------------
.../processors/affinity/AffinityAssignment.java | 88 ++++++++++
.../affinity/GridAffinityAssignment.java | 120 ++++++-------
.../affinity/GridAffinityAssignmentCache.java | 83 +++++++--
.../affinity/GridAffinityProcessor.java | 8 +-
.../processors/affinity/GridAffinityUtils.java | 8 +-
.../affinity/HistoryAffinityAssignment.java | 169 +++++++++++++++++++
.../cache/CacheAffinitySharedManager.java | 38 ++++-
.../cache/GridCacheAffinityManager.java | 4 +-
.../dht/GridDhtPartitionTopologyImpl.java | 4 +-
.../dht/preloader/GridDhtPreloader.java | 4 +-
10 files changed, 433 insertions(+), 93 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/87b09dba/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/AffinityAssignment.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/AffinityAssignment.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/AffinityAssignment.java
new file mode 100644
index 0000000..06207d3
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/AffinityAssignment.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.affinity;
+
+import org.apache.ignite.cluster.ClusterNode;
+
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.UUID;
+
+/**
+ * Cached affinity calculations.
+ */
+public interface AffinityAssignment {
+ /**
+ * @return {@code True} if related discovery event did not not cause affinity assignment change and
+ * this assignment is just reference to the previous one.
+ */
+ public boolean clientEventChange();
+
+ /**
+ * @return Affinity assignment computed by affinity function.
+ */
+ public List<List<ClusterNode>> idealAssignment();
+
+ /**
+ * @return Affinity assignment.
+ */
+ public List<List<ClusterNode>> assignment();
+
+ /**
+ * @return Topology version.
+ */
+ public AffinityTopologyVersion topologyVersion();
+
+ /**
+ * Get affinity nodes for partition.
+ *
+ * @param part Partition.
+ * @return Affinity nodes.
+ */
+ public List<ClusterNode> get(int part);
+
+ /**
+ * Get affinity node IDs for partition.
+ *
+ * @param part Partition.
+ * @return Affinity nodes IDs.
+ */
+ public HashSet<UUID> getIds(int part);
+
+ /**
+ * @return Nodes having primary partitions assignments.
+ */
+ public Set<ClusterNode> primaryPartitionNodes();
+
+ /**
+ * Get primary partitions for specified node ID.
+ *
+ * @param nodeId Node ID to get primary partitions for.
+ * @return Primary partitions for specified node ID.
+ */
+ public Set<Integer> primaryPartitions(UUID nodeId);
+
+ /**
+ * Get backup partitions for specified node ID.
+ *
+ * @param nodeId Node ID to get backup partitions for.
+ * @return Backup partitions for specified node ID.
+ */
+ public Set<Integer> backupPartitions(UUID nodeId);
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/87b09dba/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignment.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignment.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignment.java
index 568e4e8..2940d92 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignment.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignment.java
@@ -27,12 +27,14 @@ import java.util.Map;
import java.util.Set;
import java.util.UUID;
import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.internal.util.typedef.internal.U;
/**
* Cached affinity calculations.
*/
-public class GridAffinityAssignment implements Serializable {
+public class GridAffinityAssignment implements AffinityAssignment, Serializable {
/** */
private static final long serialVersionUID = 0L;
@@ -86,7 +88,7 @@ public class GridAffinityAssignment implements Serializable {
this.topVer = topVer;
this.assignment = assignment;
- this.idealAssignment = idealAssignment;
+ this.idealAssignment = idealAssignment.equals(assignment) ? assignment : idealAssignment;
primary = new HashMap<>();
backup = new HashMap<>();
@@ -139,96 +141,76 @@ public class GridAffinityAssignment implements Serializable {
return topVer;
}
- /**
- * Get affinity nodes for partition.
- *
- * @param part Partition.
- * @return Affinity nodes.
- */
- public List<ClusterNode> get(int part) {
+ /** {@inheritDoc} */
+ @Override public List<ClusterNode> get(int part) {
assert part >= 0 && part < assignment.size() : "Affinity partition is out of range" +
- " [part=" + part + ", partitions=" + assignment.size() + ']';
+ " [part=" + part + ", partitions=" + assignment.size() + ']';
return assignment.get(part);
}
- /**
- * Get affinity node IDs for partition.
- *
- * @param part Partition.
- * @return Affinity nodes IDs.
- */
- public HashSet<UUID> getIds(int part) {
+ /** {@inheritDoc} */
+ @Override public HashSet<UUID> getIds(int part) {
assert part >= 0 && part < assignment.size() : "Affinity partition is out of range" +
- " [part=" + part + ", partitions=" + assignment.size() + ']';
+ " [part=" + part + ", partitions=" + assignment.size() + ']';
- List<HashSet<UUID>> assignmentIds0 = assignmentIds;
+ List<ClusterNode> nodes = assignment.get(part);
- if (assignmentIds0 == null) {
- assignmentIds0 = new ArrayList<>();
+ HashSet<UUID> ids = U.newHashSet(nodes.size());
- for (List<ClusterNode> assignmentPart : assignment) {
- HashSet<UUID> partIds = new HashSet<>();
+ for (int i = 0; i < nodes.size(); i++)
+ ids.add(nodes.get(i).id());
- for (ClusterNode node : assignmentPart)
- partIds.add(node.id());
+ return ids;
+ }
- assignmentIds0.add(partIds);
- }
+ /** {@inheritDoc} */
+ @Override public Set<ClusterNode> primaryPartitionNodes() {
+ Set<ClusterNode> res = new HashSet<>();
+
+ for (int p = 0; p < assignment.size(); p++) {
+ List<ClusterNode> nodes = assignment.get(p);
- assignmentIds = assignmentIds0;
+ if (!F.isEmpty(nodes))
+ res.add(nodes.get(0));
}
- return assignmentIds0.get(part);
+ return res;
}
- /**
- * @return Nodes having primary partitions assignments.
- */
- @SuppressWarnings("ForLoopReplaceableByForEach")
- public Set<ClusterNode> primaryPartitionNodes() {
- Set<ClusterNode> primaryPartsNodes0 = primaryPartsNodes;
-
- if (primaryPartsNodes0 == null) {
- int parts = assignment.size();
-
- primaryPartsNodes0 = new HashSet<>();
-
- for (int p = 0; p < parts; p++) {
- List<ClusterNode> nodes = assignment.get(p);
+ /** {@inheritDoc} */
+ @Override public Set<Integer> primaryPartitions(UUID nodeId) {
+ Set<Integer> res = new HashSet<>();
- if (nodes.size() > 0)
- primaryPartsNodes0.add(nodes.get(0));
- }
+ for (int p = 0; p < assignment.size(); p++) {
+ List<ClusterNode> nodes = assignment.get(p);
- primaryPartsNodes = primaryPartsNodes0;
+ if (!F.isEmpty(nodes) && nodes.get(0).id().equals(nodeId))
+ res.add(p);
}
- return primaryPartsNodes0;
+ return res;
}
- /**
- * Get primary partitions for specified node ID.
- *
- * @param nodeId Node ID to get primary partitions for.
- * @return Primary partitions for specified node ID.
- */
- public Set<Integer> primaryPartitions(UUID nodeId) {
- Set<Integer> set = primary.get(nodeId);
+ /** {@inheritDoc} */
+ @Override public Set<Integer> backupPartitions(UUID nodeId) {
+ Set<Integer> res = new HashSet<>();
- return set == null ? Collections.<Integer>emptySet() : set;
- }
+ for (int p = 0; p < assignment.size(); p++) {
+ List<ClusterNode> nodes = assignment.get(p);
- /**
- * Get backup partitions for specified node ID.
- *
- * @param nodeId Node ID to get backup partitions for.
- * @return Backup partitions for specified node ID.
- */
- public Set<Integer> backupPartitions(UUID nodeId) {
- Set<Integer> set = backup.get(nodeId);
+ for (int i = 1; i < nodes.size(); i++) {
+ ClusterNode node = nodes.get(i);
+
+ if (node.id().equals(nodeId)) {
+ res.add(p);
+
+ break;
+ }
+ }
+ }
- return set == null ? Collections.<Integer>emptySet() : set;
+ return res;
}
/**
@@ -274,10 +256,10 @@ public class GridAffinityAssignment implements Serializable {
if (o == this)
return true;
- if (o == null || getClass() != o.getClass())
+ if (o == null || !(o instanceof AffinityAssignment))
return false;
- return topVer.equals(((GridAffinityAssignment)o).topVer);
+ return topVer.equals(((AffinityAssignment)o).topologyVersion());
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/ignite/blob/87b09dba/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java
index a81b34d..9166b31 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java
@@ -78,7 +78,7 @@ public class GridAffinityAssignmentCache {
private final int partsCnt;
/** Affinity calculation results cache: topology version => partition => nodes. */
- private final ConcurrentNavigableMap<AffinityTopologyVersion, GridAffinityAssignment> affCache;
+ private final ConcurrentNavigableMap<AffinityTopologyVersion, HistoryAffinityAssignment> affCache;
/** */
private List<List<ClusterNode>> idealAssignment;
@@ -107,6 +107,9 @@ public class GridAffinityAssignmentCache {
/** Full history size. */
private final AtomicInteger fullHistSize = new AtomicInteger();
+ /** */
+ private final SimilarAffinityKey similarAffKey;
+
/**
* Constructs affinity cached calculations.
*
@@ -127,6 +130,7 @@ public class GridAffinityAssignmentCache {
{
assert ctx != null;
assert aff != null;
+ assert nodeFilter != null;
this.ctx = ctx;
this.aff = aff;
@@ -142,6 +146,12 @@ public class GridAffinityAssignmentCache {
partsCnt = aff.partitions();
affCache = new ConcurrentSkipListMap<>();
head = new AtomicReference<>(new GridAffinityAssignment(AffinityTopologyVersion.NONE));
+
+ similarAffKey = new SimilarAffinityKey(aff.getClass(), nodeFilter.getClass(), backups, partsCnt);
+ }
+
+ public Object similarAffinityKey() {
+ return similarAffKey;
}
/**
@@ -170,7 +180,7 @@ public class GridAffinityAssignmentCache {
GridAffinityAssignment assignment = new GridAffinityAssignment(topVer, affAssignment, idealAssignment);
- affCache.put(topVer, assignment);
+ affCache.put(topVer, new HistoryAffinityAssignment(assignment));
head.set(assignment);
for (Map.Entry<AffinityTopologyVersion, AffinityReadyFuture> entry : readyFuts.entrySet()) {
@@ -300,7 +310,7 @@ public class GridAffinityAssignmentCache {
GridAffinityAssignment assignmentCpy = new GridAffinityAssignment(topVer, aff);
- affCache.put(topVer, assignmentCpy);
+ affCache.put(topVer, new HistoryAffinityAssignment(assignmentCpy));
head.set(assignmentCpy);
for (Map.Entry<AffinityTopologyVersion, AffinityReadyFuture> entry : readyFuts.entrySet()) {
@@ -328,7 +338,7 @@ public class GridAffinityAssignmentCache {
* @return Affinity assignment.
*/
public List<List<ClusterNode>> assignments(AffinityTopologyVersion topVer) {
- GridAffinityAssignment aff = cachedAffinity(topVer);
+ AffinityAssignment aff = cachedAffinity(topVer);
return aff.assignment();
}
@@ -427,7 +437,7 @@ public class GridAffinityAssignmentCache {
* @param topVer Topology version.
* @return Cached affinity.
*/
- public GridAffinityAssignment cachedAffinity(AffinityTopologyVersion topVer) {
+ public AffinityAssignment cachedAffinity(AffinityTopologyVersion topVer) {
if (topVer.equals(AffinityTopologyVersion.NONE))
topVer = lastVersion();
else
@@ -435,7 +445,7 @@ public class GridAffinityAssignmentCache {
assert topVer.topologyVersion() >= 0 : topVer;
- GridAffinityAssignment cache = head.get();
+ AffinityAssignment cache = head.get();
if (!cache.topologyVersion().equals(topVer)) {
cache = affCache.get(topVer);
@@ -463,7 +473,7 @@ public class GridAffinityAssignmentCache {
* @return {@code True} if primary changed or required affinity version not found in history.
*/
public boolean primaryChanged(int part, AffinityTopologyVersion startVer, AffinityTopologyVersion endVer) {
- GridAffinityAssignment aff = affCache.get(startVer);
+ AffinityAssignment aff = affCache.get(startVer);
if (aff == null)
return false;
@@ -475,7 +485,7 @@ public class GridAffinityAssignmentCache {
ClusterNode primary = nodes.get(0);
- for (GridAffinityAssignment assignment : affCache.tailMap(startVer, false).values()) {
+ for (AffinityAssignment assignment : affCache.tailMap(startVer, false).values()) {
List<ClusterNode> nodes0 = assignment.assignment().get(part);
if (nodes0.isEmpty())
@@ -549,10 +559,10 @@ public class GridAffinityAssignmentCache {
}
if (rmvCnt > 0) {
- Iterator<GridAffinityAssignment> it = affCache.values().iterator();
+ Iterator<HistoryAffinityAssignment> it = affCache.values().iterator();
while (it.hasNext() && rmvCnt > 0) {
- GridAffinityAssignment aff0 = it.next();
+ AffinityAssignment aff0 = it.next();
it.remove();
@@ -602,4 +612,57 @@ public class GridAffinityAssignmentCache {
return S.toString(AffinityReadyFuture.class, this);
}
}
+
+ /**
+ *
+ */
+ private static class SimilarAffinityKey {
+ /** */
+ private final int backups;
+
+ /** */
+ private final Class<?> affFuncCls;
+
+ /** */
+ private final Class<?> filterCls;
+
+ /** */
+ private final int partsCnt;
+
+ /** */
+ private final int hash;
+
+ public SimilarAffinityKey(Class<?> affFuncCls, Class<?> filterCls, int backups, int partsCnt) {
+ this.backups = backups;
+ this.affFuncCls = affFuncCls;
+ this.filterCls = filterCls;
+ this.partsCnt = partsCnt;
+
+ int hash = backups;
+ hash = 31 * hash + affFuncCls.hashCode();
+ hash = 31 * hash + filterCls.hashCode();
+ hash= 31 * hash + partsCnt;
+
+ this.hash = hash;
+ }
+
+ @Override public int hashCode() {
+ return hash;
+ }
+
+ @Override public boolean equals(Object o) {
+ if (o == this)
+ return true;
+
+ if (o == null || getClass() != o.getClass())
+ return false;
+
+ SimilarAffinityKey key = (SimilarAffinityKey)o;
+
+ return backups == key.backups &&
+ affFuncCls == key.affFuncCls &&
+ filterCls == key.filterCls &&
+ partsCnt == key.partsCnt;
+ }
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/87b09dba/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityProcessor.java
index 1726d02..7c22ef5 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityProcessor.java
@@ -385,10 +385,16 @@ public class GridAffinityProcessor extends GridProcessorAdapter {
}
try {
+ AffinityAssignment assign0 = cctx.affinity().assignment(topVer);
+
+ GridAffinityAssignment assign = assign0 instanceof GridAffinityAssignment ?
+ (GridAffinityAssignment)assign0 :
+ new GridAffinityAssignment(topVer, assign0.assignment(), assign0.idealAssignment());
+
AffinityInfo info = new AffinityInfo(
cctx.config().getAffinity(),
cctx.config().getAffinityMapper(),
- new GridAffinityAssignment(topVer, cctx.affinity().assignment(topVer)),
+ assign,
cctx.cacheObjectContext());
IgniteInternalFuture<AffinityInfo> old = affMap.putIfAbsent(key, new GridFinishedFuture<>(info));
http://git-wip-us.apache.org/repos/asf/ignite/blob/87b09dba/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityUtils.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityUtils.java
index c24dd2d..abd5292 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityUtils.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityUtils.java
@@ -180,10 +180,16 @@ class GridAffinityUtils {
cctx.affinity().affinityReadyFuture(topVer).get();
+ AffinityAssignment assign0 = cctx.affinity().assignment(topVer);
+
+ GridAffinityAssignment assign = assign0 instanceof GridAffinityAssignment ?
+ (GridAffinityAssignment)assign0 :
+ new GridAffinityAssignment(topVer, assign0.assignment(), assign0.idealAssignment());
+
return F.t(
affinityMessage(ctx, cctx.config().getAffinity()),
affinityMessage(ctx, cctx.config().getAffinityMapper()),
- new GridAffinityAssignment(topVer, cctx.affinity().assignment(topVer)));
+ assign);
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/ignite/blob/87b09dba/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/HistoryAffinityAssignment.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/HistoryAffinityAssignment.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/HistoryAffinityAssignment.java
new file mode 100644
index 0000000..e502dd5
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/HistoryAffinityAssignment.java
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.affinity;
+
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.internal.util.typedef.internal.U;
+
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.UUID;
+
+/**
+ *
+ */
+public class HistoryAffinityAssignment implements AffinityAssignment {
+ /** */
+ private final AffinityTopologyVersion topVer;
+
+ /** */
+ private final List<List<ClusterNode>> assignment;
+
+ /** */
+ private final List<List<ClusterNode>> idealAssignment;
+
+ /** */
+ private final boolean clientEvtChange;
+
+ /**
+ * @param assign Assignment.
+ */
+ public HistoryAffinityAssignment(GridAffinityAssignment assign) {
+ this.topVer = assign.topologyVersion();
+ this.assignment = assign.assignment();
+ this.idealAssignment = assign.idealAssignment();
+ this.clientEvtChange = assign.clientEventChange();
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean clientEventChange() {
+ return clientEvtChange;
+ }
+
+ /** {@inheritDoc} */
+ @Override public List<List<ClusterNode>> idealAssignment() {
+ return idealAssignment;
+ }
+
+ /** {@inheritDoc} */
+ @Override public List<List<ClusterNode>> assignment() {
+ return assignment;
+ }
+
+ /** {@inheritDoc} */
+ @Override public AffinityTopologyVersion topologyVersion() {
+ return topVer;
+ }
+
+ /** {@inheritDoc} */
+ @Override public List<ClusterNode> get(int part) {
+ assert part >= 0 && part < assignment.size() : "Affinity partition is out of range" +
+ " [part=" + part + ", partitions=" + assignment.size() + ']';
+
+ return assignment.get(part);
+ }
+
+ /** {@inheritDoc} */
+ @Override public HashSet<UUID> getIds(int part) {
+ assert part >= 0 && part < assignment.size() : "Affinity partition is out of range" +
+ " [part=" + part + ", partitions=" + assignment.size() + ']';
+
+ List<ClusterNode> nodes = assignment.get(part);
+
+ HashSet<UUID> ids = U.newHashSet(nodes.size());
+
+ for (int i = 0; i < nodes.size(); i++)
+ ids.add(nodes.get(i).id());
+
+ return ids;
+ }
+
+ /** {@inheritDoc} */
+ @Override public Set<ClusterNode> primaryPartitionNodes() {
+ Set<ClusterNode> res = new HashSet<>();
+
+ for (int p = 0; p < assignment.size(); p++) {
+ List<ClusterNode> nodes = assignment.get(p);
+
+ if (!F.isEmpty(nodes))
+ res.add(nodes.get(0));
+ }
+
+ return res;
+ }
+
+ /** {@inheritDoc} */
+ @Override public Set<Integer> primaryPartitions(UUID nodeId) {
+ Set<Integer> res = new HashSet<>();
+
+ for (int p = 0; p < assignment.size(); p++) {
+ List<ClusterNode> nodes = assignment.get(p);
+
+ if (!F.isEmpty(nodes) && nodes.get(0).id().equals(nodeId))
+ res.add(p);
+ }
+
+ return res;
+ }
+
+ /** {@inheritDoc} */
+ @Override public Set<Integer> backupPartitions(UUID nodeId) {
+ Set<Integer> res = new HashSet<>();
+
+ for (int p = 0; p < assignment.size(); p++) {
+ List<ClusterNode> nodes = assignment.get(p);
+
+ for (int i = 1; i < nodes.size(); i++) {
+ ClusterNode node = nodes.get(i);
+
+ if (node.id().equals(nodeId)) {
+ res.add(p);
+
+ break;
+ }
+ }
+ }
+
+ return res;
+ }
+
+ /** {@inheritDoc} */
+ @Override public int hashCode() {
+ return topVer.hashCode();
+ }
+
+ /** {@inheritDoc} */
+ @SuppressWarnings("SimplifiableIfStatement")
+ @Override public boolean equals(Object o) {
+ if (o == this)
+ return true;
+
+ if (o == null || !(o instanceof AffinityAssignment))
+ return false;
+
+ return topVer.equals(((AffinityAssignment)o).topologyVersion());
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(HistoryAffinityAssignment.class, this);
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/87b09dba/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
index 1aedf4e..88f1f97 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
@@ -508,6 +508,8 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
assert assignment != null;
+ final Map<Object, List<List<ClusterNode>>> affCache = new HashMap<>();
+
forAllCaches(crd, new IgniteInClosureX<GridAffinityAssignmentCache>() {
@Override public void applyx(GridAffinityAssignmentCache aff) throws IgniteCheckedException {
List<List<ClusterNode>> idealAssignment = aff.idealAssignment();
@@ -527,7 +529,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
else
newAssignment = idealAssignment;
- aff.initialize(topVer, newAssignment);
+ aff.initialize(topVer, cachedAssignment(aff, newAssignment, affCache));
}
});
}
@@ -562,6 +564,8 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
final Map<Integer, IgniteUuid> deploymentIds = msg.cacheDeploymentIds();
+ final Map<Object, List<List<ClusterNode>>> affCache = new HashMap<>();
+
forAllCaches(crd, new IgniteInClosureX<GridAffinityAssignmentCache>() {
@Override public void applyx(GridAffinityAssignmentCache aff) throws IgniteCheckedException {
AffinityTopologyVersion affTopVer = aff.lastVersion();
@@ -602,7 +606,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
assignment.set(part, nodes);
}
- aff.initialize(topVer, assignment);
+ aff.initialize(topVer, cachedAssignment(aff, assignment, affCache));
}
else
aff.clientEventTopologyChange(exchFut.discoveryEvent(), topVer);
@@ -1206,6 +1210,8 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
throws IgniteCheckedException {
AffinityTopologyVersion topVer = fut.topologyVersion();
+ final Map<Object, List<List<ClusterNode>>> affCache = new HashMap<>();
+
if (!crd) {
for (GridCacheContext cacheCtx : cctx.cacheContexts()) {
if (cacheCtx.isLocal())
@@ -1213,7 +1219,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
boolean latePrimary = cacheCtx.rebalanceEnabled();
- initAffinityOnNodeJoin(fut, cacheCtx.affinity().affinityCache(), null, latePrimary);
+ initAffinityOnNodeJoin(fut, cacheCtx.affinity().affinityCache(), null, latePrimary, affCache);
}
return null;
@@ -1227,7 +1233,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
boolean latePrimary = cache.rebalanceEnabled;
- initAffinityOnNodeJoin(fut, cache.affinity(), waitRebalanceInfo, latePrimary);
+ initAffinityOnNodeJoin(fut, cache.affinity(), waitRebalanceInfo, latePrimary, affCache);
}
});
@@ -1245,7 +1251,8 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
private void initAffinityOnNodeJoin(GridDhtPartitionsExchangeFuture fut,
GridAffinityAssignmentCache aff,
WaitRebalanceInfo rebalanceInfo,
- boolean latePrimary)
+ boolean latePrimary,
+ Map<Object, List<List<ClusterNode>>> affCache)
throws IgniteCheckedException
{
assert lateAffAssign;
@@ -1292,7 +1299,26 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
if (newAssignment == null)
newAssignment = idealAssignment;
- aff.initialize(fut.topologyVersion(), newAssignment);
+ aff.initialize(fut.topologyVersion(), cachedAssignment(aff, newAssignment, affCache));
+ }
+
+ /**
+ * @param aff
+ * @param assign
+ * @param affCache
+ * @return
+ */
+ private List<List<ClusterNode>> cachedAssignment(GridAffinityAssignmentCache aff,
+ List<List<ClusterNode>> assign,
+ Map<Object, List<List<ClusterNode>>> affCache) {
+ List<List<ClusterNode>> assign0 = affCache.get(aff.similarAffinityKey());
+
+ if (assign0 != null && assign0.equals(assign))
+ assign = assign0;
+ else
+ affCache.put(aff.similarAffinityKey(), assign);
+
+ return assign;
}
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/87b09dba/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAffinityManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAffinityManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAffinityManager.java
index 71ae5c9..6e5a28e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAffinityManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAffinityManager.java
@@ -25,8 +25,8 @@ import org.apache.ignite.cache.affinity.AffinityKeyMapper;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.internal.IgniteClientDisconnectedCheckedException;
import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.processors.affinity.AffinityAssignment;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
-import org.apache.ignite.internal.processors.affinity.GridAffinityAssignment;
import org.apache.ignite.internal.processors.affinity.GridAffinityAssignmentCache;
import org.apache.ignite.internal.util.GridLeanSet;
import org.apache.ignite.internal.util.future.GridFinishedFuture;
@@ -265,7 +265,7 @@ public class GridCacheAffinityManager extends GridCacheManagerAdapter {
* @param topVer Topology version.
* @return Affinity assignment.
*/
- public GridAffinityAssignment assignment(AffinityTopologyVersion topVer) {
+ public AffinityAssignment assignment(AffinityTopologyVersion topVer) {
if (cctx.isLocal())
topVer = LOC_CACHE_TOP_VER;
http://git-wip-us.apache.org/repos/asf/ignite/blob/87b09dba/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
index 50f7f0f..871a084 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
@@ -35,8 +35,8 @@ import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.events.DiscoveryEvent;
import org.apache.ignite.internal.IgniteFutureTimeoutCheckedException;
import org.apache.ignite.internal.IgniteInterruptedCheckedException;
+import org.apache.ignite.internal.processors.affinity.AffinityAssignment;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
-import org.apache.ignite.internal.processors.affinity.GridAffinityAssignment;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.GridCacheMapEntryFactory;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionExchangeId;
@@ -859,7 +859,7 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
/** {@inheritDoc} */
@Override public List<ClusterNode> nodes(int p, AffinityTopologyVersion topVer) {
- GridAffinityAssignment affAssignment = cctx.affinity().assignment(topVer);
+ AffinityAssignment affAssignment = cctx.affinity().assignment(topVer);
List<ClusterNode> affNodes = affAssignment.get(p);
http://git-wip-us.apache.org/repos/asf/ignite/blob/87b09dba/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
index 09aec81..d6865c1 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
@@ -35,8 +35,8 @@ import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.NodeStoppingException;
import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener;
+import org.apache.ignite.internal.processors.affinity.AffinityAssignment;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
-import org.apache.ignite.internal.processors.affinity.GridAffinityAssignment;
import org.apache.ignite.internal.processors.cache.CacheAffinitySharedManager;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.GridCacheEntryEx;
@@ -600,7 +600,7 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter {
log.debug("Affinity is ready for topology version, will send response [topVer=" + topVer +
", node=" + node + ']');
- GridAffinityAssignment assignment = cctx.affinity().assignment(topVer);
+ AffinityAssignment assignment = cctx.affinity().assignment(topVer);
boolean newAffMode = node.version().compareTo(CacheAffinitySharedManager.LATE_AFF_ASSIGN_SINCE) >= 0;