You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ag...@apache.org on 2015/11/18 09:50:07 UTC
[03/15] ignite git commit: ignite-1758 Discovery fixes
ignite-1758 Discovery fixes
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/80147128
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/80147128
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/80147128
Branch: refs/heads/ignite-perftest-merge
Commit: 80147128a3b07f927dec65f0a6934f6782efab5c
Parents: 5a116cb
Author: sboikov <sb...@gridgain.com>
Authored: Tue Nov 17 09:48:58 2015 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Tue Nov 17 09:48:58 2015 +0300
----------------------------------------------------------------------
.../ignite/spi/discovery/tcp/ServerImpl.java | 360 +++++++++++----
.../spi/discovery/tcp/TcpDiscoveryImpl.java | 4 +-
.../tcp/internal/TcpDiscoveryNodesRing.java | 95 ++--
.../messages/TcpDiscoveryAbstractMessage.java | 37 ++
.../TcpDiscoveryStatusCheckMessage.java | 11 +
.../tcp/TcpDiscoveryMultiThreadedTest.java | 158 ++++---
.../discovery/tcp/TcpDiscoveryRestartTest.java | 10 +-
.../spi/discovery/tcp/TcpDiscoverySelfTest.java | 459 ++++++++++++++++++-
.../TcpDiscoveryMulticastIpFinderSelfTest.java | 28 +-
.../testframework/junits/GridAbstractTest.java | 29 +-
10 files changed, 942 insertions(+), 249 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/80147128/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
index 0fe2881..ae23d0e 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
@@ -159,6 +159,10 @@ class ServerImpl extends TcpDiscoveryImpl {
private static final int ENSURED_MSG_HIST_SIZE = getInteger(IGNITE_DISCOVERY_HISTORY_SIZE, 1024 * 10);
/** */
+ private static final IgniteProductVersion CUSTOM_MSG_ALLOW_JOINING_FOR_VERIFIED_SINCE =
+ IgniteProductVersion.fromString("1.5.0");
+
+ /** */
private final ThreadPoolExecutor utilityPool = new ThreadPoolExecutor(0, 1, 2000, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
@@ -191,10 +195,10 @@ class ServerImpl extends TcpDiscoveryImpl {
private StatisticsPrinter statsPrinter;
/** Failed nodes (but still in topology). */
- private Collection<TcpDiscoveryNode> failedNodes = new HashSet<>();
+ private final Collection<TcpDiscoveryNode> failedNodes = new HashSet<>();
/** Leaving nodes (but still in topology). */
- private Collection<TcpDiscoveryNode> leavingNodes = new HashSet<>();
+ private final Collection<TcpDiscoveryNode> leavingNodes = new HashSet<>();
/** If non-shared IP finder is used this flag shows whether IP finder contains local address. */
private boolean ipFinderHasLocAddr;
@@ -1080,13 +1084,34 @@ class ServerImpl extends TcpDiscoveryImpl {
openSock = true;
+ TcpDiscoveryHandshakeRequest req = new TcpDiscoveryHandshakeRequest(locNodeId);
+
// Handshake.
- spi.writeToSocket(sock, new TcpDiscoveryHandshakeRequest(locNodeId), timeoutHelper.nextTimeoutChunk(
- spi.getSocketTimeout()));
+ spi.writeToSocket(sock, req, timeoutHelper.nextTimeoutChunk(spi.getSocketTimeout()));
TcpDiscoveryHandshakeResponse res = spi.readMessage(sock, null, timeoutHelper.nextTimeoutChunk(
ackTimeout0));
+ if (msg instanceof TcpDiscoveryJoinRequestMessage) {
+ boolean ignore = false;
+
+ synchronized (failedNodes) {
+ for (TcpDiscoveryNode failedNode : failedNodes) {
+ if (failedNode.id().equals(res.creatorNodeId())) {
+ if (log.isDebugEnabled())
+ log.debug("Ignore response from node from failed list: " + res);
+
+ ignore = true;
+
+ break;
+ }
+ }
+ }
+
+ if (ignore)
+ break;
+ }
+
if (locNodeId.equals(res.creatorNodeId())) {
if (log.isDebugEnabled())
log.debug("Handshake response from local node: " + res);
@@ -1104,7 +1129,7 @@ class ServerImpl extends TcpDiscoveryImpl {
spi.stats.onMessageSent(msg, U.currentTimeMillis() - tstamp);
if (debugMode)
- debugLog("Message has been sent directly to address [msg=" + msg + ", addr=" + addr +
+ debugLog(msg, "Message has been sent directly to address [msg=" + msg + ", addr=" + addr +
", rmtNodeId=" + res.creatorNodeId() + ']');
if (log.isDebugEnabled())
@@ -1754,6 +1779,32 @@ class ServerImpl extends TcpDiscoveryImpl {
}
/**
+ * Adds failed nodes specified in the received message to the local failed nodes list.
+ *
+ * @param msg Message.
+ */
+ private void processMessageFailedNodes(TcpDiscoveryAbstractMessage msg) {
+ if (msg.failedNodes() != null) {
+ for (UUID nodeId : msg.failedNodes()) {
+ TcpDiscoveryNode failedNode = ring.node(nodeId);
+
+ if (failedNode != null) {
+ if (!failedNode.isLocal()) {
+ boolean added;
+
+ synchronized (mux) {
+ added = failedNodes.add(failedNode);
+ }
+
+ if (added && log.isDebugEnabled())
+ log.debug("Added node to failed nodes list [node=" + failedNode + ", msg=" + msg + ']');
+ }
+ }
+ }
+ }
+ }
+
+ /**
* Discovery messages history used for client reconnect.
*/
private class EnsuredMessageHistory {
@@ -2131,10 +2182,28 @@ class ServerImpl extends TcpDiscoveryImpl {
log.debug("Processing message [cls=" + msg.getClass().getSimpleName() + ", id=" + msg.id() + ']');
if (debugMode)
- debugLog("Processing message [cls=" + msg.getClass().getSimpleName() + ", id=" + msg.id() + ']');
+ debugLog(msg, "Processing message [cls=" + msg.getClass().getSimpleName() + ", id=" + msg.id() + ']');
+
+ if (locNode.internalOrder() == 0) {
+ boolean process = false;
+
+ if (msg instanceof TcpDiscoveryNodeAddedMessage)
+ process = ((TcpDiscoveryNodeAddedMessage)msg).node().equals(locNode);
+
+ if (!process) {
+ if (log.isDebugEnabled()) {
+ log.debug("Ignore message, local node order is not initialized [msg=" + msg +
+ ", locNode=" + locNode + ']');
+ }
+
+ return;
+ }
+ }
spi.stats.onMessageProcessingStarted(msg);
+ processMessageFailedNodes(msg);
+
if (msg instanceof TcpDiscoveryJoinRequestMessage)
processJoinRequestMessage((TcpDiscoveryJoinRequestMessage)msg);
@@ -2200,6 +2269,8 @@ class ServerImpl extends TcpDiscoveryImpl {
checkHeartbeatsReceiving();
checkPendingCustomMessages();
+
+ checkFailedNodesList();
}
/**
@@ -2262,50 +2333,50 @@ class ServerImpl extends TcpDiscoveryImpl {
boolean sent = false;
- boolean searchNext = true;
+ boolean newNextNode = false;
UUID locNodeId = getLocalNodeId();
while (true) {
- if (searchNext) {
- TcpDiscoveryNode newNext = ring.nextNode(failedNodes);
-
- if (newNext == null) {
- if (log.isDebugEnabled())
- log.debug("No next node in topology.");
+ TcpDiscoveryNode newNext = ring.nextNode(failedNodes);
- if (debugMode)
- debugLog("No next node in topology.");
+ if (newNext == null) {
+ if (log.isDebugEnabled())
+ log.debug("No next node in topology.");
- if (ring.hasRemoteNodes() && !(msg instanceof TcpDiscoveryConnectionCheckMessage) &&
- !(msg instanceof TcpDiscoveryStatusCheckMessage && msg.creatorNodeId().equals(locNodeId))) {
- msg.senderNodeId(locNodeId);
+ if (debugMode)
+ debugLog(msg, "No next node in topology.");
- addMessage(msg);
- }
+ if (ring.hasRemoteNodes() && !(msg instanceof TcpDiscoveryConnectionCheckMessage) &&
+ !(msg instanceof TcpDiscoveryStatusCheckMessage && msg.creatorNodeId().equals(locNodeId))) {
+ msg.senderNodeId(locNodeId);
- break;
+ addMessage(msg);
}
- if (!newNext.equals(next)) {
- if (log.isDebugEnabled())
- log.debug("New next node [newNext=" + newNext + ", formerNext=" + next +
- ", ring=" + ring + ", failedNodes=" + failedNodes + ']');
+ break;
+ }
- if (debugMode)
- debugLog("New next node [newNext=" + newNext + ", formerNext=" + next +
- ", ring=" + ring + ", failedNodes=" + failedNodes + ']');
+ if (!newNext.equals(next)) {
+ if (log.isDebugEnabled())
+ log.debug("New next node [newNext=" + newNext + ", formerNext=" + next +
+ ", ring=" + ring + ", failedNodes=" + failedNodes + ']');
- U.closeQuiet(sock);
+ if (debugMode)
+ debugLog(msg, "New next node [newNext=" + newNext + ", formerNext=" + next +
+ ", ring=" + ring + ", failedNodes=" + failedNodes + ']');
- sock = null;
+ U.closeQuiet(sock);
- next = newNext;
- }
- else if (log.isDebugEnabled())
- log.debug("Next node remains the same [nextId=" + next.id() +
- ", nextOrder=" + next.internalOrder() + ']');
+ sock = null;
+
+ next = newNext;
+
+ newNextNode = true;
}
+ else if (log.isDebugEnabled())
+ log.debug("Next node remains the same [nextId=" + next.id() +
+ ", nextOrder=" + next.internalOrder() + ']');
// Flag that shows whether next node exists and accepts incoming connections.
boolean nextNodeExists = sock != null;
@@ -2379,8 +2450,8 @@ class ServerImpl extends TcpDiscoveryImpl {
"expected [expectedId=" + next.id() + ", rcvdId=" + nextId + ']');
if (debugMode)
- debugLog("Failed to restore ring because next node ID received is not as " +
- "expected [expectedId=" + next.id() + ", rcvdId=" + nextId + ']');
+ debugLog(msg, "Failed to restore ring because next node ID received is not " +
+ "as expected [expectedId=" + next.id() + ", rcvdId=" + nextId + ']');
break;
}
@@ -2401,8 +2472,8 @@ class ServerImpl extends TcpDiscoveryImpl {
", rcvd=" + nextOrder + ", id=" + next.id() + ']');
if (debugMode)
- debugLog("Failed to restore ring because next node order received " +
- "is not as expected [expected=" + next.internalOrder() +
+ debugLog(msg, "Failed to restore ring because next node order " +
+ "received is not as expected [expected=" + next.internalOrder() +
", rcvd=" + nextOrder + ", id=" + next.id() + ']');
break;
@@ -2413,7 +2484,7 @@ class ServerImpl extends TcpDiscoveryImpl {
log.debug("Initialized connection with next node: " + next.id());
if (debugMode)
- debugLog("Initialized connection with next node: " + next.id());
+ debugLog(msg, "Initialized connection with next node: " + next.id());
errs = null;
@@ -2477,13 +2548,20 @@ class ServerImpl extends TcpDiscoveryImpl {
assert !forceSndPending || msg instanceof TcpDiscoveryNodeLeftMessage;
- if (failure || forceSndPending) {
+ boolean sndPending=
+ (newNextNode && ring.minimumNodeVersion().compareTo(CUSTOM_MSG_ALLOW_JOINING_FOR_VERIFIED_SINCE) >= 0) ||
+ failure ||
+ forceSndPending;
+
+ if (sndPending) {
if (log.isDebugEnabled())
log.debug("Pending messages will be sent [failure=" + failure +
+ ", newNextNode=" + newNextNode +
", forceSndPending=" + forceSndPending + ']');
if (debugMode)
- debugLog("Pending messages will be sent [failure=" + failure +
+ debugLog(msg, "Pending messages will be sent [failure=" + failure +
+ ", newNextNode=" + newNextNode +
", forceSndPending=" + forceSndPending + ']');
for (TcpDiscoveryAbstractMessage pendingMsg : pendingMsgs) {
@@ -2513,7 +2591,7 @@ class ServerImpl extends TcpDiscoveryImpl {
", res=" + res + ']');
if (debugMode)
- debugLog("Pending message has been sent to next node [msgId=" + msg.id() +
+ debugLog(msg, "Pending message has been sent to next node [msgId=" + msg.id() +
", pendingMsgId=" + pendingMsg.id() + ", next=" + next.id() +
", res=" + res + ']');
@@ -2540,6 +2618,14 @@ class ServerImpl extends TcpDiscoveryImpl {
if (timeoutHelper == null)
timeoutHelper = new IgniteSpiOperationTimeoutHelper(spi);
+ if (!failedNodes.isEmpty()) {
+ for (TcpDiscoveryNode failedNode : failedNodes) {
+ assert !failedNode.equals(next) : failedNode;
+
+ msg.addFailedNode(failedNode.id());
+ }
+ }
+
writeToSocket(sock, msg, timeoutHelper.nextTimeoutChunk(spi.getSocketTimeout()));
spi.stats.onMessageSent(msg, U.currentTimeMillis() - tstamp);
@@ -2548,15 +2634,17 @@ class ServerImpl extends TcpDiscoveryImpl {
onMessageExchanged();
- if (log.isDebugEnabled())
+ if (log.isDebugEnabled()) {
log.debug("Message has been sent to next node [msg=" + msg +
", next=" + next.id() +
", res=" + res + ']');
+ }
- if (debugMode)
- debugLog("Message has been sent to next node [msg=" + msg +
+ if (debugMode) {
+ debugLog(msg, "Message has been sent to next node [msg=" + msg +
", next=" + next.id() +
", res=" + res + ']');
+ }
}
finally {
clearNodeAddedMessage(msg);
@@ -2635,8 +2723,6 @@ class ServerImpl extends TcpDiscoveryImpl {
next = null;
- searchNext = true;
-
errs = null;
}
else
@@ -2665,25 +2751,30 @@ class ServerImpl extends TcpDiscoveryImpl {
msgWorker.addMessage(new TcpDiscoveryNodeFailedMessage(locNodeId, n.id(), n.internalOrder()));
if (!sent) {
+ assert next == null : next;
+
if (log.isDebugEnabled())
log.debug("Pending messages will be resent to local node");
if (debugMode)
- log.debug("Pending messages will be resent to local node");
+ debugLog(msg, "Pending messages will be resent to local node");
for (TcpDiscoveryAbstractMessage pendingMsg : pendingMsgs) {
prepareNodeAddedMessage(pendingMsg, locNodeId, pendingMsgs.msgs, pendingMsgs.discardId,
pendingMsgs.customDiscardId);
+ pendingMsg.senderNodeId(locNodeId);
+
msgWorker.addMessage(pendingMsg);
if (log.isDebugEnabled())
log.debug("Pending message has been sent to local node [msg=" + msg.id() +
- ", pendingMsgId=" + pendingMsg + ", next=" + next.id() + ']');
+ ", pendingMsgId=" + pendingMsg + ']');
- if (debugMode)
- debugLog("Pending message has been sent to local node [msg=" + msg.id() +
- ", pendingMsgId=" + pendingMsg + ", next=" + next.id() + ']');
+ if (debugMode) {
+ debugLog(msg, "Pending message has been sent to local node [msg=" + msg.id() +
+ ", pendingMsgId=" + pendingMsg + ']');
+ }
}
}
@@ -3317,15 +3408,17 @@ class ServerImpl extends TcpDiscoveryImpl {
if (sendMessageToRemotes(msg))
sendMessageAcrossRing(msg);
- if (log.isDebugEnabled())
+ if (log.isDebugEnabled()) {
log.debug("Local node already has node being added. Passing TcpDiscoveryNodeAddedMessage to " +
- "coordinator for final processing [ring=" + ring + ", node=" + node + ", locNode="
- + locNode + ", msg=" + msg + ']');
+ "coordinator for final processing [ring=" + ring + ", node=" + node + ", locNode="
+ + locNode + ", msg=" + msg + ']');
+ }
- if (debugMode)
- debugLog("Local node already has node being added. Passing TcpDiscoveryNodeAddedMessage to " +
- "coordinator for final processing [ring=" + ring + ", node=" + node + ", locNode="
- + locNode + ", msg=" + msg + ']');
+ if (debugMode) {
+ debugLog(msg, "Local node already has node being added. Passing TcpDiscoveryNodeAddedMessage to " +
+ "coordinator for final processing [ring=" + ring + ", node=" + node + ", locNode="
+ + locNode + ", msg=" + msg + ']');
+ }
return;
}
@@ -3338,7 +3431,7 @@ class ServerImpl extends TcpDiscoveryImpl {
", msg=" + msg + ']');
if (debugMode)
- debugLog("Discarding node added message since new node's order is less than " +
+ debugLog(msg, "Discarding node added message since new node's order is less than " +
"max order in ring [ring=" + ring + ", node=" + node + ", locNode=" + locNode +
", msg=" + msg + ']');
@@ -3427,6 +3520,8 @@ class ServerImpl extends TcpDiscoveryImpl {
spi.onExchange(node.id(), node.id(), data, U.gridClassLoader());
msg.addDiscoveryData(locNodeId, spi.collectExchangeData(node.id()));
+
+ processMessageFailedNodes(msg);
}
if (log.isDebugEnabled())
@@ -3447,6 +3542,9 @@ class ServerImpl extends TcpDiscoveryImpl {
spi.gridStartTime = msg.gridStartTime();
for (TcpDiscoveryNode n : top) {
+ assert n.internalOrder() < node.internalOrder() :
+ "Invalid node [topNode=" + n + ", added=" + node + ']';
+
// Make all preceding nodes and local node visible.
n.visible(true);
}
@@ -3500,6 +3598,8 @@ class ServerImpl extends TcpDiscoveryImpl {
for (Map.Entry<UUID, Map<Integer, byte[]>> entry : dataMap.entrySet())
spi.onExchange(node.id(), entry.getKey(), entry.getValue(), U.gridClassLoader());
}
+
+ processMessageFailedNodes(msg);
}
if (sendMessageToRemotes(msg))
@@ -3733,7 +3833,7 @@ class ServerImpl extends TcpDiscoveryImpl {
interruptPing(leavingNode);
- assert leftNode != null;
+ assert leftNode != null : msg;
if (log.isDebugEnabled())
log.debug("Removed node from topology: " + leftNode);
@@ -3887,6 +3987,8 @@ class ServerImpl extends TcpDiscoveryImpl {
}
if (node != null) {
+ assert !node.isLocal() : msg;
+
synchronized (mux) {
failedNodes.add(node);
}
@@ -4036,32 +4138,46 @@ class ServerImpl extends TcpDiscoveryImpl {
return;
}
+ TcpDiscoveryStatusCheckMessage msg0 = msg;
+
+ if (F.contains(msg.failedNodes(), msg.creatorNodeId())) {
+ msg0 = new TcpDiscoveryStatusCheckMessage(msg);
+
+ msg0.failedNodes(null);
+
+ for (UUID failedNodeId : msg.failedNodes()) {
+ if (!failedNodeId.equals(msg.creatorNodeId()))
+ msg0.addFailedNode(failedNodeId);
+ }
+ }
+
try {
- trySendMessageDirectly(msg.creatorNode(), msg);
+ trySendMessageDirectly(msg0.creatorNode(), msg0);
if (log.isDebugEnabled())
log.debug("Responded to status check message " +
- "[recipient=" + msg.creatorNodeId() + ", status=" + msg.status() + ']');
+ "[recipient=" + msg0.creatorNodeId() + ", status=" + msg0.status() + ']');
}
catch (IgniteSpiException e) {
if (e.hasCause(SocketException.class)) {
if (log.isDebugEnabled())
log.debug("Failed to respond to status check message (connection " +
- "refused) [recipient=" + msg.creatorNodeId() + ", status=" +
- msg.status() + ']');
+ "refused) [recipient=" + msg0.creatorNodeId() + ", status=" +
+ msg0.status() + ']');
onException("Failed to respond to status check message (connection refused) " +
- "[recipient=" + msg.creatorNodeId() + ", status=" + msg.status() + ']', e);
+ "[recipient=" + msg0.creatorNodeId() + ", status=" + msg0.status() + ']', e);
}
- else {
- if (pingNode(msg.creatorNode()))
+ else if (!spi.isNodeStopping0()) {
+ if (pingNode(msg0.creatorNode()))
// Node exists and accepts incoming connections.
U.error(log, "Failed to respond to status check message [recipient=" +
- msg.creatorNodeId() + ", status=" + msg.status() + ']', e);
- else if (log.isDebugEnabled())
- log.debug("Failed to respond to status check message (did the node " +
- "stop?) [recipient=" + msg.creatorNodeId() + ", status=" + msg.status()
- + ']');
+ msg0.creatorNodeId() + ", status=" + msg0.status() + ']', e);
+ else if (log.isDebugEnabled()) {
+ log.debug("Failed to respond to status check message (did the node stop?)" +
+ "[recipient=" + msg0.creatorNodeId() +
+ ", status=" + msg0.status() + ']');
+ }
}
}
}
@@ -4364,27 +4480,42 @@ class ServerImpl extends TcpDiscoveryImpl {
*/
private void processCustomMessage(TcpDiscoveryCustomEventMessage msg) {
if (isLocalNodeCoordinator()) {
- if (!joiningNodes.isEmpty()) {
+ boolean delayMsg;
+
+ assert ring.minimumNodeVersion() != null : ring;
+
+ if (ring.minimumNodeVersion().compareTo(CUSTOM_MSG_ALLOW_JOINING_FOR_VERIFIED_SINCE) >= 0)
+ delayMsg = msg.topologyVersion() == 0L && !joiningNodes.isEmpty();
+ else
+ delayMsg = !joiningNodes.isEmpty();
+
+ if (delayMsg) {
+ if (log.isDebugEnabled()) {
+ log.debug("Delay custom message processing, there are joining nodes [msg=" + msg +
+ ", joiningNodes=" + joiningNodes + ']');
+ }
+
pendingCustomMsgs.add(msg);
return;
}
- boolean sndNext = !msg.verified();
-
- if (sndNext) {
+ if (!msg.verified()) {
msg.verify(getLocalNodeId());
msg.topologyVersion(ring.topologyVersion());
- if (pendingMsgs.procCustomMsgs.add(msg.id()))
+ if (pendingMsgs.procCustomMsgs.add(msg.id())) {
notifyDiscoveryListener(msg);
- else
- sndNext = false;
- }
- if (sndNext && ring.hasRemoteNodes())
- sendMessageAcrossRing(msg);
+ if (sendMessageToRemotes(msg))
+ sendMessageAcrossRing(msg);
+ else
+ processCustomMessage(msg);
+ }
+ }
else {
+ addMessage(new TcpDiscoveryDiscardMessage(getLocalNodeId(), msg.id(), true));
+
spi.stats.onRingMessageReceived(msg);
DiscoverySpiCustomMessage msgObj = null;
@@ -4401,16 +4532,21 @@ class ServerImpl extends TcpDiscoveryImpl {
if (nextMsg != null) {
try {
- addMessage(new TcpDiscoveryCustomEventMessage(getLocalNodeId(), nextMsg,
- spi.marsh.marshal(nextMsg)));
+ TcpDiscoveryCustomEventMessage ackMsg = new TcpDiscoveryCustomEventMessage(
+ getLocalNodeId(), nextMsg, spi.marsh.marshal(nextMsg));
+
+ ackMsg.topologyVersion(msg.topologyVersion());
+
+ processCustomMessage(ackMsg);
+
+ if (ackMsg.verified())
+ msgHist.add(ackMsg);
}
catch (IgniteCheckedException e) {
U.error(log, "Failed to marshal discovery custom message.", e);
}
}
}
-
- addMessage(new TcpDiscoveryDiscardMessage(getLocalNodeId(), msg.id(), true));
}
}
else {
@@ -4428,9 +4564,8 @@ class ServerImpl extends TcpDiscoveryImpl {
}
if (msg.verified() && state0 == CONNECTED && pendingMsgs.procCustomMsgs.add(msg.id())) {
- assert joiningNodes.isEmpty() : "Joining nodes: " + joiningNodes + ", msg=" + msg + ", loc=" + locNode.id() +
- ", topver=" + ring.topologyVersion();
- assert msg.topologyVersion() == ring.topologyVersion() : "msg: " + msg + ", topver=" + ring.topologyVersion();
+ assert msg.topologyVersion() == ring.topologyVersion() :
+ "msg: " + msg + ", topVer=" + ring.topologyVersion();
notifyDiscoveryListener(msg);
}
@@ -4441,6 +4576,38 @@ class ServerImpl extends TcpDiscoveryImpl {
}
/**
+ * Checks failed nodes list and sends {@link TcpDiscoveryNodeFailedMessage} if failed node
+ * is still in the ring.
+ */
+ private void checkFailedNodesList() {
+ List<TcpDiscoveryNodeFailedMessage> msgs = null;
+
+ synchronized (mux) {
+ for (Iterator<TcpDiscoveryNode> it = failedNodes.iterator(); it.hasNext();) {
+ TcpDiscoveryNode node = it.next();
+
+ if (ring.node(node.id()) != null) {
+ if (msgs == null)
+ msgs = new ArrayList<>(failedNodes.size());
+
+ msgs.add(new TcpDiscoveryNodeFailedMessage(getLocalNodeId(), node.id(), node.internalOrder()));
+ }
+ else
+ it.remove();
+ }
+ }
+
+ if (msgs != null) {
+ for (TcpDiscoveryNodeFailedMessage msg : msgs) {
+ if (log.isDebugEnabled())
+ log.debug("Add node failed message for node from failed nodes list: " + msg);
+
+ addMessage(msg);
+ }
+ }
+ }
+
+ /**
* Checks and flushes custom event messages if no nodes are attempting to join the grid.
*/
private void checkPendingCustomMessages() {
@@ -4640,10 +4807,10 @@ class ServerImpl extends TcpDiscoveryImpl {
synchronized (mux) {
readers.add(reader);
-
- reader.start();
}
+ reader.start();
+
spi.stats.onServerSocketInitialized(U.currentTimeMillis() - tstamp);
}
}
@@ -4861,9 +5028,10 @@ class ServerImpl extends TcpDiscoveryImpl {
log.debug("Initialized connection with remote node [nodeId=" + nodeId +
", client=" + req.client() + ']');
- if (debugMode)
- debugLog("Initialized connection with remote node [nodeId=" + nodeId +
+ if (debugMode) {
+ debugLog(msg, "Initialized connection with remote node [nodeId=" + nodeId +
", client=" + req.client() + ']');
+ }
}
catch (IOException e) {
if (log.isDebugEnabled())
@@ -4932,7 +5100,7 @@ class ServerImpl extends TcpDiscoveryImpl {
spi.stats.onMessageReceived(msg);
if (debugMode && recordable(msg))
- debugLog("Message has been received: " + msg);
+ debugLog(msg, "Message has been received: " + msg);
if (msg instanceof TcpDiscoveryConnectionCheckMessage) {
spi.writeToSocket(msg, sock, RES_OK, socketTimeout);
http://git-wip-us.apache.org/repos/asf/ignite/blob/80147128/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java
index 2786d0b..1aef728 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java
@@ -37,6 +37,7 @@ import org.apache.ignite.spi.IgniteSpiException;
import org.apache.ignite.spi.IgniteSpiThread;
import org.apache.ignite.spi.discovery.DiscoverySpiCustomMessage;
import org.apache.ignite.spi.discovery.tcp.internal.TcpDiscoveryNode;
+import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryAbstractMessage;
import org.jetbrains.annotations.Nullable;
/**
@@ -99,9 +100,10 @@ abstract class TcpDiscoveryImpl {
}
/**
+ * @param discoMsg Discovery message.
* @param msg Message.
*/
- protected void debugLog(String msg) {
+ protected void debugLog(@Nullable TcpDiscoveryAbstractMessage discoMsg, String msg) {
assert debugMode;
String msg0 = new SimpleDateFormat("[HH:mm:ss,SSS]").format(new Date(System.currentTimeMillis())) +
http://git-wip-us.apache.org/repos/asf/ignite/blob/80147128/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNodesRing.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNodesRing.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNodesRing.java
index 7ca092c..eb0f74a 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNodesRing.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNodesRing.java
@@ -26,6 +26,7 @@ import org.apache.ignite.internal.util.typedef.PN;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgnitePredicate;
+import org.apache.ignite.lang.IgniteProductVersion;
import org.jetbrains.annotations.Nullable;
import java.util.Collection;
@@ -88,6 +89,23 @@ public class TcpDiscoveryNodesRing {
@GridToStringExclude
private final ReadWriteLock rwLock = new ReentrantReadWriteLock();
+ /** */
+ private IgniteProductVersion minNodeVer;
+
+ /**
+ * @return Minimum node version.
+ */
+ public IgniteProductVersion minimumNodeVersion() {
+ rwLock.readLock().lock();
+
+ try {
+ return minNodeVer;
+ }
+ finally {
+ rwLock.readLock().unlock();
+ }
+ }
+
/**
* Sets local node.
*
@@ -225,6 +243,8 @@ public class TcpDiscoveryNodesRing {
nodeOrder = node.internalOrder();
maxInternalOrder = node.internalOrder();
+
+ initializeMinimumVersion();
}
finally {
rwLock.writeLock().unlock();
@@ -295,6 +315,8 @@ public class TcpDiscoveryNodesRing {
}
nodeOrder = topVer;
+
+ initializeMinimumVersion();
}
finally {
rwLock.writeLock().unlock();
@@ -341,6 +363,8 @@ public class TcpDiscoveryNodesRing {
nodes.remove(rmv);
}
+ initializeMinimumVersion();
+
return rmv;
}
finally {
@@ -372,6 +396,8 @@ public class TcpDiscoveryNodesRing {
maxInternalOrder = 0;
topVer = 0;
+
+ minNodeVer = locNode.version();
}
finally {
rwLock.writeLock().unlock();
@@ -451,61 +477,8 @@ public class TcpDiscoveryNodesRing {
* topology contains less than two nodes.
*/
@Nullable public TcpDiscoveryNode nextNode(@Nullable Collection<TcpDiscoveryNode> excluded) {
- assert excluded == null || excluded.isEmpty() || !excluded.contains(locNode);
-
- rwLock.readLock().lock();
-
- try {
- Collection<TcpDiscoveryNode> filtered = serverNodes(excluded);
-
- if (filtered.size() < 2)
- return null;
-
- Iterator<TcpDiscoveryNode> iter = filtered.iterator();
-
- while (iter.hasNext()) {
- TcpDiscoveryNode node = iter.next();
-
- if (locNode.equals(node))
- break;
- }
-
- return iter.hasNext() ? iter.next() : F.first(filtered);
- }
- finally {
- rwLock.readLock().unlock();
- }
- }
-
- /**
- * Finds previous node in the topology.
- *
- * @return Previous node.
- */
- @Nullable public TcpDiscoveryNode previousNode() {
- rwLock.readLock().lock();
-
- try {
- if (nodes.size() < 2)
- return null;
-
- return previousNode(null);
- }
- finally {
- rwLock.readLock().unlock();
- }
- }
-
- /**
- * Finds previous node in the topology filtering excluded nodes from search.
- *
- * @param excluded Nodes to exclude from the search (optional). If provided,
- * cannot contain local node.
- * @return Previous node or {@code null} if all nodes were filtered out or
- * topology contains less than two nodes.
- */
- @Nullable public TcpDiscoveryNode previousNode(@Nullable Collection<TcpDiscoveryNode> excluded) {
- assert excluded == null || excluded.isEmpty() || !excluded.contains(locNode);
+ assert locNode.internalOrder() > 0 : locNode;
+ assert excluded == null || excluded.isEmpty() || !excluded.contains(locNode) : excluded;
rwLock.readLock().lock();
@@ -638,6 +611,18 @@ public class TcpDiscoveryNodesRing {
});
}
+ /**
+ *
+ */
+ private void initializeMinimumVersion() {
+ minNodeVer = null;
+
+ for (TcpDiscoveryNode node : nodes) {
+ if (minNodeVer == null || node.version().compareTo(minNodeVer) < 0)
+ minNodeVer = node.version();
+ }
+ }
+
/** {@inheritDoc} */
@Override public String toString() {
rwLock.readLock().lock();
http://git-wip-us.apache.org/repos/asf/ignite/blob/80147128/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryAbstractMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryAbstractMessage.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryAbstractMessage.java
index 875d18e..9cb47af 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryAbstractMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryAbstractMessage.java
@@ -19,10 +19,15 @@ package org.apache.ignite.spi.discovery.tcp.messages;
import java.io.Externalizable;
import java.io.Serializable;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Set;
import java.util.UUID;
import org.apache.ignite.internal.util.tostring.GridToStringExclude;
+import org.apache.ignite.internal.util.tostring.GridToStringInclude;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.lang.IgniteUuid;
+import org.jetbrains.annotations.Nullable;
/**
* Base class to implement discovery messages.
@@ -62,6 +67,10 @@ public abstract class TcpDiscoveryAbstractMessage implements Serializable {
/** Pending message index. */
private short pendingIdx;
+ /** */
+ @GridToStringInclude
+ private Set<UUID> failedNodes;
+
/**
* Default no-arg constructor for {@link Externalizable} interface.
*/
@@ -236,6 +245,34 @@ public abstract class TcpDiscoveryAbstractMessage implements Serializable {
return false;
}
+ /**
+ * Adds node ID to the failed nodes list.
+ *
+ * @param nodeId Node ID.
+ */
+ public void addFailedNode(UUID nodeId) {
+ assert nodeId != null;
+
+ if (failedNodes == null)
+ failedNodes = new HashSet<>();
+
+ failedNodes.add(nodeId);
+ }
+
+ /**
+ * @param failedNodes Failed nodes.
+ */
+ public void failedNodes(@Nullable Set<UUID> failedNodes) {
+ this.failedNodes = failedNodes;
+ }
+
+ /**
+ * @return Failed nodes IDs.
+ */
+ @Nullable public Collection<UUID> failedNodes() {
+ return failedNodes;
+ }
+
/** {@inheritDoc} */
@Override public final boolean equals(Object obj) {
if (this == obj)
http://git-wip-us.apache.org/repos/asf/ignite/blob/80147128/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryStatusCheckMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryStatusCheckMessage.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryStatusCheckMessage.java
index 6118f4d..70b0080 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryStatusCheckMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryStatusCheckMessage.java
@@ -62,6 +62,17 @@ public class TcpDiscoveryStatusCheckMessage extends TcpDiscoveryAbstractMessage
}
/**
+ * @param msg Message to copy.
+ */
+ public TcpDiscoveryStatusCheckMessage(TcpDiscoveryStatusCheckMessage msg) {
+ super(msg);
+
+ this.creatorNode = msg.creatorNode;
+ this.failedNodeId = msg.failedNodeId;
+ this.status = msg.status;
+ }
+
+ /**
* Gets creator node.
*
* @return Creator node.
http://git-wip-us.apache.org/repos/asf/ignite/blob/80147128/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryMultiThreadedTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryMultiThreadedTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryMultiThreadedTest.java
index 55474dc..5053c2d 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryMultiThreadedTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryMultiThreadedTest.java
@@ -29,6 +29,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteClientDisconnectedException;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.events.DiscoveryEvent;
@@ -212,6 +213,22 @@ public class TcpDiscoveryMultiThreadedTest extends GridCommonAbstractTest {
public void testMultiThreadedClientsServersRestart() throws Throwable {
fail("https://issues.apache.org/jira/browse/IGNITE-1123");
+ multiThreadedClientsServersRestart(GRID_CNT, CLIENT_GRID_CNT);
+ }
+
+ /**
+ * @throws Exception If any error occurs.
+ */
+ public void _testMultiThreadedServersRestart() throws Throwable {
+ multiThreadedClientsServersRestart(GRID_CNT * 2, 0);
+ }
+
+ /**
+ * @param srvs Number of servers.
+ * @param clients Number of clients.
+ * @throws Exception If any error occurs.
+ */
+ private void multiThreadedClientsServersRestart(int srvs, int clients) throws Throwable {
final AtomicBoolean done = new AtomicBoolean();
try {
@@ -219,91 +236,95 @@ public class TcpDiscoveryMultiThreadedTest extends GridCommonAbstractTest {
info("Test timeout: " + (getTestTimeout() / (60 * 1000)) + " min.");
- startGridsMultiThreaded(GRID_CNT);
-
- clientFlagGlobal = true;
+ startGridsMultiThreaded(srvs);
- startGridsMultiThreaded(GRID_CNT, CLIENT_GRID_CNT);
+ IgniteInternalFuture<?> clientFut = null;
final AtomicReference<Throwable> error = new AtomicReference<>();
- final BlockingQueue<Integer> clientStopIdxs = new LinkedBlockingQueue<>();
-
- for (int i = GRID_CNT; i < GRID_CNT + CLIENT_GRID_CNT; i++)
- clientStopIdxs.add(i);
+ if (clients > 0) {
+ clientFlagGlobal = true;
- final AtomicInteger clientStartIdx = new AtomicInteger(9000);
+ startGridsMultiThreaded(srvs, clients);
- IgniteInternalFuture<?> fut1 = multithreadedAsync(
- new Callable<Object>() {
- @Override public Object call() throws Exception {
- try {
- clientFlagPerThread.set(true);
+ final BlockingQueue<Integer> clientStopIdxs = new LinkedBlockingQueue<>();
- while (!done.get() && error.get() == null) {
- Integer stopIdx = clientStopIdxs.take();
+ for (int i = srvs; i < srvs + clients; i++)
+ clientStopIdxs.add(i);
- log.info("Stop client: " + stopIdx);
+ final AtomicInteger clientStartIdx = new AtomicInteger(9000);
- stopGrid(stopIdx);
+ clientFut = multithreadedAsync(
+ new Callable<Object>() {
+ @Override public Object call() throws Exception {
+ try {
+ clientFlagPerThread.set(true);
while (!done.get() && error.get() == null) {
- // Generate unique name to simplify debugging.
- int startIdx = clientStartIdx.getAndIncrement();
+ Integer stopIdx = clientStopIdxs.take();
- log.info("Start client: " + startIdx);
+ log.info("Stop client: " + stopIdx);
- UUID id = UUID.randomUUID();
+ stopGrid(stopIdx);
- nodeId.set(id);
+ while (!done.get() && error.get() == null) {
+ // Generate unique name to simplify debugging.
+ int startIdx = clientStartIdx.getAndIncrement();
- try {
- Ignite ignite = startGrid(startIdx);
+ log.info("Start client: " + startIdx);
- assertTrue(ignite.configuration().isClientMode());
+ UUID id = UUID.randomUUID();
- clientStopIdxs.add(startIdx);
+ nodeId.set(id);
- break;
- }
- catch (Exception e) {
- if (X.hasCause(e, IgniteClientDisconnectedCheckedException.class) ||
- X.hasCause(e, IgniteClientDisconnectedException.class))
- log.info("Client disconnected: " + e);
- else if (X.hasCause(e, ClusterTopologyCheckedException.class))
- log.info("Client failed to start: " + e);
- else {
- if (failedNodes.contains(id) && X.hasCause(e, IgniteSpiException.class))
- log.info("Client failed: " + e);
- else
- throw e;
+ try {
+ Ignite ignite = startGrid(startIdx);
+
+ assertTrue(ignite.configuration().isClientMode());
+
+ clientStopIdxs.add(startIdx);
+
+ break;
+ }
+ catch (Exception e) {
+ if (X.hasCause(e, IgniteClientDisconnectedCheckedException.class) ||
+ X.hasCause(e, IgniteClientDisconnectedException.class))
+ log.info("Client disconnected: " + e);
+ else if (X.hasCause(e, ClusterTopologyCheckedException.class))
+ log.info("Client failed to start: " + e);
+ else {
+ if (failedNodes.contains(id) && X.hasCause(e, IgniteSpiException.class))
+ log.info("Client failed: " + e);
+ else
+ throw e;
+ }
}
}
}
}
- }
- catch (Throwable e) {
- log.error("Unexpected error: " + e, e);
+ catch (Throwable e) {
+ log.error("Unexpected error: " + e, e);
- error.compareAndSet(null, e);
+ error.compareAndSet(null, e);
+
+ return null;
+ }
return null;
}
-
- return null;
- }
- },
- CLIENT_GRID_CNT,
- "client-restart");
+ },
+ clients,
+ "client-restart");
+ }
final BlockingQueue<Integer> srvStopIdxs = new LinkedBlockingQueue<>();
- for (int i = 0; i < GRID_CNT; i++)
+ for (int i = 0; i < srvs; i++)
srvStopIdxs.add(i);
- final AtomicInteger srvStartIdx = new AtomicInteger(GRID_CNT + CLIENT_GRID_CNT);
+ final AtomicInteger srvStartIdx = new AtomicInteger(srvs + clients);
- IgniteInternalFuture<?> fut2 = multithreadedAsync(
+ IgniteInternalFuture<?> srvFut = multithreadedAsync(
new Callable<Object>() {
@Override public Object call() throws Exception {
try {
@@ -312,6 +333,10 @@ public class TcpDiscoveryMultiThreadedTest extends GridCommonAbstractTest {
while (!done.get() && error.get() == null) {
int stopIdx = srvStopIdxs.take();
+ U.sleep(50);
+
+ Thread.currentThread().setName("stop-server-" + getTestGridName(stopIdx));
+
log.info("Stop server: " + stopIdx);
stopGrid(stopIdx);
@@ -319,13 +344,20 @@ public class TcpDiscoveryMultiThreadedTest extends GridCommonAbstractTest {
// Generate unique name to simplify debugging.
int startIdx = srvStartIdx.getAndIncrement();
+ Thread.currentThread().setName("start-server-" + getTestGridName(startIdx));
+
log.info("Start server: " + startIdx);
- Ignite ignite = startGrid(startIdx);
+ try {
+ Ignite ignite = startGrid(startIdx);
- assertFalse(ignite.configuration().isClientMode());
+ assertFalse(ignite.configuration().isClientMode());
- srvStopIdxs.add(startIdx);
+ srvStopIdxs.add(startIdx);
+ }
+ catch (IgniteCheckedException e) {
+ log.info("Failed to start: " + e);
+ }
}
}
catch (Throwable e) {
@@ -339,7 +371,7 @@ public class TcpDiscoveryMultiThreadedTest extends GridCommonAbstractTest {
return null;
}
},
- GRID_CNT - 1,
+ srvs - 1,
"server-restart");
final long timeToExec = getTestTimeout() - 60_000;
@@ -356,8 +388,10 @@ public class TcpDiscoveryMultiThreadedTest extends GridCommonAbstractTest {
done.set(true);
- fut1.cancel();
- fut2.cancel();
+ if (clientFut != null)
+ clientFut.cancel();
+
+ srvFut.cancel();
throw err;
}
@@ -367,8 +401,10 @@ public class TcpDiscoveryMultiThreadedTest extends GridCommonAbstractTest {
done.set(true);
- fut1.get();
- fut2.get();
+ if (clientFut != null)
+ clientFut.get();
+
+ srvFut.get();
}
finally {
done.set(true);
http://git-wip-us.apache.org/repos/asf/ignite/blob/80147128/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryRestartTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryRestartTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryRestartTest.java
index 8b94f54..7beeb41 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryRestartTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryRestartTest.java
@@ -30,6 +30,7 @@ import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.events.DiscoveryEvent;
import org.apache.ignite.events.Event;
import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.util.lang.GridAbsPredicate;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.lang.IgnitePredicate;
import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
@@ -196,8 +197,15 @@ public class TcpDiscoveryRestartTest extends GridCommonAbstractTest {
/**
* @param nodeId Node ID.
+ * @throws Exception If failed.
*/
- void checkEvents(UUID nodeId) {
+ void checkEvents(final UUID nodeId) throws Exception {
+ GridTestUtils.waitForCondition(new GridAbsPredicate() {
+ @Override public boolean apply() {
+ return joinIds.contains(nodeId) && leftIds.contains(nodeId);
+ }
+ }, 5000);
+
assertTrue("No join event: " + nodeId, joinIds.contains(nodeId));
assertTrue("No left event: " + nodeId, leftIds.contains(nodeId));
http://git-wip-us.apache.org/repos/asf/ignite/blob/80147128/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java
index 51d8a2d..379a3a6 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java
@@ -21,16 +21,19 @@ import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.Socket;
+import java.net.SocketTimeoutException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedHashSet;
+import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
@@ -38,6 +41,8 @@ import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
+import org.apache.ignite.IgniteIllegalStateException;
+import org.apache.ignite.Ignition;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.events.DiscoveryEvent;
@@ -46,12 +51,16 @@ import org.apache.ignite.events.EventType;
import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.IgniteKernal;
+import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage;
+import org.apache.ignite.internal.processors.continuous.StartRoutineAckDiscoveryMessage;
import org.apache.ignite.internal.processors.port.GridPortRecord;
import org.apache.ignite.internal.util.io.GridByteArrayOutputStream;
+import org.apache.ignite.internal.util.lang.GridAbsPredicate;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.G;
import org.apache.ignite.internal.util.typedef.X;
import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteBiPredicate;
import org.apache.ignite.lang.IgniteInClosure;
import org.apache.ignite.lang.IgnitePredicate;
import org.apache.ignite.spi.IgniteSpiException;
@@ -64,8 +73,8 @@ import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryAbstractMessage;
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryCustomEventMessage;
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryNodeAddFinishedMessage;
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryNodeAddedMessage;
+import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryNodeFailedMessage;
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryNodeLeftMessage;
-import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryPingResponse;
import org.apache.ignite.testframework.GridTestUtils;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import org.jetbrains.annotations.Nullable;
@@ -94,7 +103,7 @@ public class TcpDiscoverySelfTest extends GridCommonAbstractTest {
private UUID nodeId;
/** */
- private TcpDiscoverySpi nodeSpi;
+ private static ThreadLocal<TcpDiscoverySpi> nodeSpi = new ThreadLocal<>();
/**
* @throws Exception If fails.
@@ -104,15 +113,17 @@ public class TcpDiscoverySelfTest extends GridCommonAbstractTest {
}
/** {@inheritDoc} */
- @SuppressWarnings({"IfMayBeConditional", "deprecation"})
@Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
IgniteConfiguration cfg = super.getConfiguration(gridName);
- TcpDiscoverySpi spi = nodeSpi;
+ TcpDiscoverySpi spi = nodeSpi.get();
- if (spi == null)
+ if (spi == null) {
spi = gridName.contains("testPingInterruptedOnNodeFailedFailingNode") ?
new TestTcpDiscoverySpi() : new TcpDiscoverySpi();
+ }
+ else
+ nodeSpi.set(null);
discoMap.put(gridName, spi);
@@ -176,6 +187,13 @@ public class TcpDiscoverySelfTest extends GridCommonAbstractTest {
return cfg;
}
+ /** {@inheritDoc} */
+ @Override protected void afterTest() throws Exception {
+ discoMap = null;
+
+ super.afterTest();
+ }
+
/**
* @throws Exception If any error occurs.
*/
@@ -1202,11 +1220,11 @@ public class TcpDiscoverySelfTest extends GridCommonAbstractTest {
private void customEventRace1(final boolean cacheStartFrom1, boolean stopCrd) throws Exception {
TestCustomEventRaceSpi spi0 = new TestCustomEventRaceSpi();
- nodeSpi = spi0;
+ nodeSpi.set(spi0);
final Ignite ignite0 = startGrid(0);
- nodeSpi = new TestCustomEventRaceSpi();
+ nodeSpi.set(new TestCustomEventRaceSpi());
final Ignite ignite1 = startGrid(1);
@@ -1221,7 +1239,7 @@ public class TcpDiscoverySelfTest extends GridCommonAbstractTest {
@Override public Void call() throws Exception {
log.info("Start 2");
- nodeSpi = new TestCustomEventRaceSpi();
+ nodeSpi.set(new TestCustomEventRaceSpi());
Ignite ignite2 = startGrid(2);
@@ -1271,7 +1289,7 @@ public class TcpDiscoverySelfTest extends GridCommonAbstractTest {
assertEquals(1, cache.get(1));
- nodeSpi = new TestCustomEventRaceSpi();
+ nodeSpi.set(new TestCustomEventRaceSpi());
Ignite ignite = startGrid(3);
@@ -1314,15 +1332,15 @@ public class TcpDiscoverySelfTest extends GridCommonAbstractTest {
private void customEventCoordinatorFailure(boolean twoNodes) throws Exception {
TestCustomEventCoordinatorFailureSpi spi0 = new TestCustomEventCoordinatorFailureSpi();
- nodeSpi = spi0;
+ nodeSpi.set(spi0);
Ignite ignite0 = startGrid(0);
- nodeSpi = new TestCustomEventCoordinatorFailureSpi();
+ nodeSpi.set(new TestCustomEventCoordinatorFailureSpi());
Ignite ignite1 = startGrid(1);
- nodeSpi = new TestCustomEventCoordinatorFailureSpi();
+ nodeSpi.set(new TestCustomEventCoordinatorFailureSpi());
Ignite ignite2 = twoNodes ? null : startGrid(2);
@@ -1366,7 +1384,7 @@ public class TcpDiscoverySelfTest extends GridCommonAbstractTest {
log.info("Try start one more node.");
- nodeSpi = new TestCustomEventCoordinatorFailureSpi();
+ nodeSpi.set(new TestCustomEventCoordinatorFailureSpi());
Ignite ignite = startGrid(twoNodes ? 2 : 3);
@@ -1381,6 +1399,421 @@ public class TcpDiscoverySelfTest extends GridCommonAbstractTest {
}
/**
+ * Coordinator is added in failed list during node start.
+ *
+ * @throws Exception If failed.
+ */
+ public void testFailedNodes1() throws Exception {
+ try {
+ final int FAIL_ORDER = 3;
+
+ nodeSpi.set(new TestFailedNodesSpi(FAIL_ORDER));
+
+ final Ignite ignite0 = startGrid(0);
+
+ nodeSpi.set(new TestFailedNodesSpi(FAIL_ORDER));
+
+ startGrid(1);
+
+ nodeSpi.set(new TestFailedNodesSpi(FAIL_ORDER));
+
+ Ignite ignite2 = startGrid(2);
+
+ assertEquals(2, ignite2.cluster().nodes().size());
+
+ waitNodeStop(ignite0.name());
+
+ tryCreateCache(2);
+ }
+ finally {
+ stopAllGrids();
+ }
+ }
+
+ /**
+ * Coordinator is added in failed list, concurrent nodes start.
+ *
+ * @throws Exception If failed.
+ */
+ public void testFailedNodes2() throws Exception {
+ try {
+ final int FAIL_ORDER = 3;
+
+ nodeSpi.set(new TestFailedNodesSpi(FAIL_ORDER));
+
+ Ignite ignite0 = startGrid(0);
+
+ nodeSpi.set(new TestFailedNodesSpi(FAIL_ORDER));
+
+ startGrid(1);
+
+ final AtomicInteger nodeIdx = new AtomicInteger(1);
+
+ GridTestUtils.runMultiThreaded(new Callable<Void>() {
+ @Override public Void call() throws Exception {
+ int idx = nodeIdx.incrementAndGet();
+
+ nodeSpi.set(new TestFailedNodesSpi(FAIL_ORDER));
+
+ startGrid(idx);
+
+ return null;
+ }
+ }, 3, "start-node");
+
+ Ignite ignite2 = ignite(2);
+
+ waitForRemoteNodes(ignite2, 3);
+
+ waitNodeStop(ignite0.name());
+
+ tryCreateCache(4);
+ }
+ finally {
+ stopAllGrids();
+ }
+ }
+
+ /**
+ * Coordinator is added in failed list during node start, test with two nodes.
+ *
+ * @throws Exception If failed.
+ */
+ public void testFailedNodes3() throws Exception {
+ try {
+ nodeSpi.set(new TestFailedNodesSpi(-1));
+
+ Ignite ignite0 = startGrid(0);
+
+ nodeSpi.set(new TestFailedNodesSpi(2));
+
+ Ignite ignite1 = startGrid(1);
+
+ assertEquals(1, ignite1.cluster().nodes().size());
+
+ waitNodeStop(ignite0.name());
+
+ ignite1.getOrCreateCache(new CacheConfiguration<>()).put(1, 1);
+
+ startGrid(2);
+
+ assertEquals(2, ignite1.cluster().nodes().size());
+
+ tryCreateCache(2);
+ }
+ finally {
+ stopAllGrids();
+ }
+ }
+
+ /**
+ * Coordinator is added in failed list during node start, but node detected failure dies before
+ * sending {@link TcpDiscoveryNodeFailedMessage}.
+ *
+ * @throws Exception If failed.
+ */
+ public void testFailedNodes4() throws Exception {
+ try {
+ final int FAIL_ORDER = 3;
+
+ nodeSpi.set(new TestFailedNodesSpi(FAIL_ORDER));
+
+ final Ignite ignite0 = startGrid(0);
+
+ nodeSpi.set(new TestFailedNodesSpi(FAIL_ORDER));
+
+ Ignite ignite1 = startGrid(1);
+
+ TestFailedNodesSpi spi = new TestFailedNodesSpi(FAIL_ORDER);
+
+ spi.stopBeforeSndFail = true;
+
+ nodeSpi.set(spi);
+
+ Ignite ignite2 = startGrid(2);
+
+ waitNodeStop(ignite2.name());
+
+ log.info("Try start new node.");
+
+ Ignite ignite3 = startGrid(3);
+
+ waitNodeStop(ignite0.name());
+
+ assertEquals(2, ignite1.cluster().nodes().size());
+ assertEquals(2, ignite3.cluster().nodes().size());
+
+ tryCreateCache(2);
+ }
+ finally {
+ stopAllGrids();
+ }
+ }
+
+ /**
+ * Adds some node in failed list after join process finished.
+ *
+ * @throws Exception If failed.
+ */
+ public void testFailedNodes5() throws Exception {
+ try {
+ ThreadLocalRandom rnd = ThreadLocalRandom.current();
+
+ for (int iter = 0; iter < 3; iter++) {
+ final int NODES = iter == 0 ? 2 : rnd.nextInt(3, 6);
+
+ for (int i = 0; i < NODES; i++) {
+ nodeSpi.set(new TestFailedNodesSpi(-1));
+
+ startGrid(i);
+ }
+
+ Map<Long, Ignite> nodes = new HashMap<>();
+
+ for (int i = 0; i < NODES; i++) {
+ Ignite ignite = ignite(i);
+
+ nodes.put(ignite.cluster().localNode().order(), ignite);
+ }
+
+ Ignite ignite = ignite(rnd.nextInt(NODES));
+
+ log.info("Iteration [iter=" + iter + ", nodes=" + NODES + ", failFrom=" + ignite.name() + ']');
+
+ TestFailedNodesSpi spi = (TestFailedNodesSpi)ignite.configuration().getDiscoverySpi();
+
+ spi.failSingleMsg = true;
+
+ long order = ignite.cluster().localNode().order();
+
+ long nextOrder = order == NODES ? 1 : order + 1;
+
+ Ignite failingNode = nodes.get(nextOrder);
+
+ assertNotNull(failingNode);
+
+ waitNodeStop(failingNode.name());
+
+ Ignite newNode = startGrid(NODES);
+
+ assertEquals(NODES, newNode.cluster().nodes().size());
+
+ tryCreateCache(NODES);
+
+ stopAllGrids();
+ }
+ }
+ finally {
+ stopAllGrids();
+ }
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testCustomEventAckNotSend() throws Exception {
+ try {
+ TestCustomerEventAckSpi spi0 = new TestCustomerEventAckSpi();
+
+ nodeSpi.set(spi0);
+
+ Ignite ignite0 = startGrid(0);
+
+ nodeSpi.set(new TestCustomerEventAckSpi());
+
+ Ignite ignite1 = startGrid(1);
+
+ spi0.stopBeforeSndAck = true;
+
+ ignite1.message().remoteListen("test", new DummyPredicate());
+
+ waitNodeStop(ignite0.name());
+
+ startGrid(2);
+ }
+ finally {
+ stopAllGrids();
+ }
+ }
+
+ /**
+ * @param nodeName Node name.
+ * @throws Exception If failed.
+ */
+ private void waitNodeStop(final String nodeName) throws Exception {
+ boolean wait = GridTestUtils.waitForCondition(new GridAbsPredicate() {
+ @Override public boolean apply() {
+ try {
+ Ignition.ignite(nodeName);
+
+ return false;
+ }
+ catch (IgniteIllegalStateException e) {
+ return true;
+ }
+ }
+ }, 10_000);
+
+ if (!wait)
+ U.dumpThreads(log);
+
+ assertTrue("Failed to wait for node stop.", wait);
+ }
+
+ /**
+ * @param expNodes Expected nodes number.
+ */
+ private void tryCreateCache(int expNodes) {
+ List<Ignite> allNodes = G.allGrids();
+
+ assertEquals(expNodes, allNodes.size());
+
+ int cntr = 0;
+
+ for (Ignite ignite : allNodes) {
+ CacheConfiguration<Object, Object> ccfg = new CacheConfiguration<>();
+
+ ccfg.setName("cache-" + cntr++);
+
+ log.info("Try create cache [node=" + ignite.name() + ", cache=" + ccfg.getName() + ']');
+
+ ignite.getOrCreateCache(ccfg).put(1, 1);
+ }
+ }
+
+ /**
+ *
+ */
+ static class DummyPredicate implements IgniteBiPredicate<UUID, Object> {
+ /** {@inheritDoc} */
+ @Override public boolean apply(UUID uuid, Object o) {
+ return true;
+ }
+ }
+
+ /**
+ *
+ */
+ private static class TestCustomerEventAckSpi extends TcpDiscoverySpi {
+ /** */
+ private volatile boolean stopBeforeSndAck;
+
+ /** {@inheritDoc} */
+ @Override protected void writeToSocket(Socket sock,
+ TcpDiscoveryAbstractMessage msg,
+ GridByteArrayOutputStream bout,
+ long timeout) throws IOException, IgniteCheckedException {
+ if (stopBeforeSndAck) {
+ if (msg instanceof TcpDiscoveryCustomEventMessage) {
+ try {
+ DiscoveryCustomMessage custMsg = GridTestUtils.getFieldValue(
+ ((TcpDiscoveryCustomEventMessage)msg).message(marsh), "delegate");
+
+ if (custMsg instanceof StartRoutineAckDiscoveryMessage) {
+ log.info("Skip message send and stop node: " + msg);
+
+ sock.close();
+
+ GridTestUtils.runAsync(new Callable<Object>() {
+ @Override public Object call() throws Exception {
+ ignite.close();
+
+ return null;
+ }
+ }, "stop-node");
+
+ return;
+ }
+ }
+ catch (Throwable e) {
+ fail("Unexpected error: " + e);
+ }
+ }
+ }
+
+ super.writeToSocket(sock, msg, bout, timeout);
+ }
+ }
+
+ /**
+ * Simulate scenario when node detects node failure trying to send message, but node still alive.
+ */
+ private static class TestFailedNodesSpi extends TcpDiscoverySpi {
+ /** */
+ private AtomicBoolean failMsg = new AtomicBoolean();
+
+ /** */
+ private int failOrder;
+
+ /** */
+ private boolean stopBeforeSndFail;
+
+ /** */
+ private boolean stop;
+
+ /** */
+ private volatile boolean failSingleMsg;
+
+ /**
+ * @param failOrder Spi fails connection if local node order equals to this order.
+ */
+ TestFailedNodesSpi(int failOrder) {
+ this.failOrder = failOrder;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void writeToSocket(Socket sock,
+ TcpDiscoveryAbstractMessage msg,
+ GridByteArrayOutputStream bout,
+ long timeout) throws IOException, IgniteCheckedException {
+ if (stop)
+ return;
+
+ if (failSingleMsg) {
+ failSingleMsg = false;
+
+ log.info("IO error on message send [locNode=" + locNode + ", msg=" + msg + ']');
+
+ sock.close();
+
+ throw new SocketTimeoutException();
+ }
+
+ if (locNode.internalOrder() == failOrder &&
+ (msg instanceof TcpDiscoveryNodeAddedMessage) &&
+ failMsg.compareAndSet(false, true)) {
+ log.info("IO error on message send [locNode=" + locNode + ", msg=" + msg + ']');
+
+ sock.close();
+
+ throw new SocketTimeoutException();
+ }
+
+ if (stopBeforeSndFail &&
+ locNode.internalOrder() == failOrder &&
+ (msg instanceof TcpDiscoveryNodeFailedMessage)) {
+ stop = true;
+
+ log.info("Skip messages send and stop node [locNode=" + locNode + ", msg=" + msg + ']');
+
+ sock.close();
+
+ GridTestUtils.runAsync(new Callable<Object>() {
+ @Override public Object call() throws Exception {
+ ignite.close();
+
+ return null;
+ }
+ }, "stop-node");
+
+ return;
+ }
+
+ super.writeToSocket(sock, msg, bout, timeout);
+ }
+ }
+
+ /**
*
*/
private static class TestCustomEventCoordinatorFailureSpi extends TcpDiscoverySpi {
http://git-wip-us.apache.org/repos/asf/ignite/blob/80147128/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/ipfinder/multicast/TcpDiscoveryMulticastIpFinderSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/ipfinder/multicast/TcpDiscoveryMulticastIpFinderSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/ipfinder/multicast/TcpDiscoveryMulticastIpFinderSelfTest.java
index 97ba5cf..1e710ee 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/ipfinder/multicast/TcpDiscoveryMulticastIpFinderSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/ipfinder/multicast/TcpDiscoveryMulticastIpFinderSelfTest.java
@@ -58,14 +58,20 @@ public class TcpDiscoveryMulticastIpFinderSelfTest
try {
ipFinder1 = ipFinder();
+ ipFinder1.setResponseWaitTime(1000);
+ ipFinder1.setAddressRequestAttempts(10);
ipFinder2 = new TcpDiscoveryMulticastIpFinder();
+ ipFinder2.setResponseWaitTime(1000);
+ ipFinder2.setAddressRequestAttempts(10);
ipFinder2.setMulticastGroup(ipFinder1.getMulticastGroup());
ipFinder2.setMulticastPort(ipFinder1.getMulticastPort());
ipFinder3 = new TcpDiscoveryMulticastIpFinder();
+ ipFinder3.setResponseWaitTime(1000);
+ ipFinder3.setAddressRequestAttempts(10);
ipFinder3.setMulticastGroup(ipFinder1.getMulticastGroup());
ipFinder3.setMulticastPort(ipFinder1.getMulticastPort());
@@ -81,21 +87,13 @@ public class TcpDiscoveryMulticastIpFinderSelfTest
ipFinder2.initializeLocalAddresses(Collections.singleton(new InetSocketAddress("host2", 1002)));
ipFinder3.initializeLocalAddresses(Collections.singleton(new InetSocketAddress("host3", 1003)));
- for (int i = 0; i < 5; i++) {
- Collection<InetSocketAddress> addrs1 = ipFinder1.getRegisteredAddresses();
- Collection<InetSocketAddress> addrs2 = ipFinder2.getRegisteredAddresses();
- Collection<InetSocketAddress> addrs3 = ipFinder3.getRegisteredAddresses();
-
- if (addrs1.size() != 1 || addrs2.size() != 2 || addrs3.size() != 3) {
- info("Addrs1: " + addrs1);
- info("Addrs2: " + addrs2);
- info("Addrs2: " + addrs3);
-
- Thread.sleep(1000);
- }
- else
- break;
- }
+ Collection<InetSocketAddress> addrs1 = ipFinder1.getRegisteredAddresses();
+ Collection<InetSocketAddress> addrs2 = ipFinder2.getRegisteredAddresses();
+ Collection<InetSocketAddress> addrs3 = ipFinder3.getRegisteredAddresses();
+
+ info("Addrs1: " + addrs1);
+ info("Addrs2: " + addrs2);
+ info("Addrs2: " + addrs3);
assertEquals(1, ipFinder1.getRegisteredAddresses().size());
assertEquals(2, ipFinder2.getRegisteredAddresses().size());
http://git-wip-us.apache.org/repos/asf/ignite/blob/80147128/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java
index 41d4b4a..3e41979 100644
--- a/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java
@@ -1120,16 +1120,31 @@ public abstract class GridAbstractTest extends TestCase {
if (gridName != null && gridName.matches(".*\\d")) {
String idStr = UUID.randomUUID().toString();
- char[] chars = idStr.toCharArray();
+ if (gridName.startsWith(getTestGridName())) {
+ String idxStr = String.valueOf(getTestGridIndex(gridName));
- chars[0] = gridName.charAt(gridName.length() - 1);
- chars[1] = '0';
+ while (idxStr.length() < 5)
+ idxStr = '0' + idxStr;
- chars[chars.length - 3] = '0';
- chars[chars.length - 2] = '0';
- chars[chars.length - 1] = gridName.charAt(gridName.length() - 1);
+ char[] chars = idStr.toCharArray();
- cfg.setNodeId(UUID.fromString(new String(chars)));
+ for (int i = 0; i < idxStr.length(); i++)
+ chars[chars.length - idxStr.length() + i] = idxStr.charAt(i);
+
+ cfg.setNodeId(UUID.fromString(new String(chars)));
+ }
+ else {
+ char[] chars = idStr.toCharArray();
+
+ chars[0] = gridName.charAt(gridName.length() - 1);
+ chars[1] = '0';
+
+ chars[chars.length - 3] = '0';
+ chars[chars.length - 2] = '0';
+ chars[chars.length - 1] = gridName.charAt(gridName.length() - 1);
+
+ cfg.setNodeId(UUID.fromString(new String(chars)));
+ }
}
if (isMultiJvm())