You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by yz...@apache.org on 2015/06/16 11:35:24 UTC
[30/50] [abbrv] incubator-ignite git commit: # ignite-883
# ignite-883
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/8870a177
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/8870a177
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/8870a177
Branch: refs/heads/ignite-960
Commit: 8870a177acaa16f6757615cab6017b21aa274c01
Parents: 16f3d32
Author: sboikov <sb...@gridgain.com>
Authored: Fri Jun 12 10:05:22 2015 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Fri Jun 12 17:20:14 2015 +0300
----------------------------------------------------------------------
.../communication/tcp/TcpCommunicationSpi.java | 2 +-
.../ignite/spi/discovery/tcp/ClientImpl.java | 232 ++++++++++++-------
.../ignite/spi/discovery/tcp/ServerImpl.java | 170 +++++++++++---
.../ipfinder/TcpDiscoveryIpFinderAdapter.java | 34 ++-
.../TcpDiscoveryMulticastIpFinder.java | 19 +-
.../messages/TcpDiscoveryAbstractMessage.java | 10 +-
.../distributed/IgniteCacheManyClientsTest.java | 65 ++++--
.../tcp/TcpClientDiscoverySpiSelfTest.java | 42 +++-
8 files changed, 412 insertions(+), 162 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8870a177/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
index f19e25b..9e38788 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
@@ -2028,7 +2028,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
if (X.hasCause(e, SocketTimeoutException.class))
LT.warn(log, null, "Connect timed out (consider increasing 'connTimeout' " +
- "configuration property) [addr=" + addr + ", connTimeout" + connTimeout + ']');
+ "configuration property) [addr=" + addr + ", connTimeout=" + connTimeout + ']');
if (errs == null)
errs = new IgniteCheckedException("Failed to connect to node (is node still alive?). " +
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8870a177/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
index 23e6f88..d8108e5 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
@@ -311,7 +311,7 @@ class ClientImpl extends TcpDiscoveryImpl {
for (ClusterNode n : rmts) {
rmtNodes.remove(n.id());
- Collection<ClusterNode> top = updateTopologyHistory(topVer + 1);
+ Collection<ClusterNode> top = updateTopologyHistory(topVer + 1, null);
lsnr.onDiscovery(EVT_NODE_FAILED, topVer, n, top, new TreeMap<>(topHist), null);
}
@@ -348,13 +348,14 @@ class ClientImpl extends TcpDiscoveryImpl {
/**
* @param recon {@code True} if reconnects.
+ * @param timeout Timeout.
* @return Opened socket or {@code null} if timeout.
* @throws InterruptedException If interrupted.
* @throws IgniteSpiException If failed.
* @see TcpDiscoverySpi#joinTimeout
*/
@SuppressWarnings("BusyWait")
- @Nullable private Socket joinTopology(boolean recon) throws IgniteSpiException, InterruptedException {
+ @Nullable private Socket joinTopology(boolean recon, long timeout) throws IgniteSpiException, InterruptedException {
Collection<InetSocketAddress> addrs = null;
long startTime = U.currentTimeMillis();
@@ -371,11 +372,11 @@ class ClientImpl extends TcpDiscoveryImpl {
log.debug("Resolved addresses from IP finder: " + addrs);
}
else {
- U.warn(log, "No addresses registered in the IP finder (will retry in 2000ms): " + spi.ipFinder);
-
- if (spi.joinTimeout > 0 && (U.currentTimeMillis() - startTime) > spi.joinTimeout)
+ if (timeout > 0 && (U.currentTimeMillis() - startTime) > timeout)
return null;
+ U.warn(log, "No addresses registered in the IP finder (will retry in 2000ms): " + spi.ipFinder);
+
Thread.sleep(2000);
}
}
@@ -421,13 +422,13 @@ class ClientImpl extends TcpDiscoveryImpl {
}
if (addrs.isEmpty()) {
- if (spi.joinTimeout > 0 && (U.currentTimeMillis() - startTime) > spi.joinTimeout)
+ if (timeout > 0 && (U.currentTimeMillis() - startTime) > timeout)
return null;
- Thread.sleep(2000);
-
U.warn(log, "Failed to connect to any address from IP finder (will retry to join topology " +
"in 2000ms): " + addrs0);
+
+ Thread.sleep(2000);
}
}
}
@@ -541,16 +542,17 @@ class ClientImpl extends TcpDiscoveryImpl {
/**
* @param topVer New topology version.
+ * @param msg Discovery message.
* @return Latest topology snapshot.
*/
- private NavigableSet<ClusterNode> updateTopologyHistory(long topVer) {
+ private NavigableSet<ClusterNode> updateTopologyHistory(long topVer, @Nullable TcpDiscoveryAbstractMessage msg) {
this.topVer = topVer;
NavigableSet<ClusterNode> allNodes = allVisibleNodes();
if (!topHist.containsKey(topVer)) {
assert topHist.isEmpty() || topHist.lastKey() == topVer - 1 :
- "lastVer=" + topHist.lastKey() + ", newVer=" + topVer;
+ "lastVer=" + topHist.lastKey() + ", newVer=" + topVer + ", locNode=" + locNode + ", msg=" + msg;
topHist.put(topVer, allNodes);
@@ -619,6 +621,17 @@ class ClientImpl extends TcpDiscoveryImpl {
}
/**
+ * @param err Error.
+ */
+ private void joinError(IgniteSpiException err) {
+ assert err != null;
+
+ joinErr = err;
+
+ joinLatch.countDown();
+ }
+
+ /**
* Heartbeat sender.
*/
private class HeartbeatSender extends TimerTask {
@@ -727,7 +740,7 @@ class ClientImpl extends TcpDiscoveryImpl {
spi.stats.onMessageReceived(msg);
- if (spi.ensured(msg))
+ if (spi.ensured(msg) && joinLatch.getCount() == 0L)
lastMsgId = msg.id();
msgWorker.addMessage(msg);
@@ -866,11 +879,16 @@ class ClientImpl extends TcpDiscoveryImpl {
/** */
private volatile Socket sock;
+ /** */
+ private boolean join;
+
/**
- *
+ * @param join {@code True} if reconnects during join.
*/
- protected Reconnector() {
+ protected Reconnector(boolean join) {
super(spi.ignite().name(), "tcp-client-disco-msg-worker", log);
+
+ this.join = join;
}
/**
@@ -888,51 +906,94 @@ class ClientImpl extends TcpDiscoveryImpl {
boolean success = false;
+ Exception err = null;
+
+ long timeout = join ? spi.joinTimeout : spi.netTimeout;
+
+ long startTime = U.currentTimeMillis();
+
try {
- sock = joinTopology(true);
+ while (true) {
+ sock = joinTopology(true, timeout);
- if (sock == null) {
- U.error(log, "Failed to reconnect to cluster: timeout.");
+ if (sock == null) {
+ if (join) {
+ joinError(new IgniteSpiException("Join process timed out, connection failed and " +
+ "failed to reconnect (consider increasing 'joinTimeout' configuration property) " +
+ "[networkTimeout=" + spi.joinTimeout + ", sock=" + sock + ']'));
+ }
+ else
+ U.error(log, "Failed to reconnect to cluster (consider increasing 'networkTimeout' " +
+ "configuration property) [networkTimeout=" + spi.netTimeout + ", sock=" + sock + ']');
- return;
- }
+ return;
+ }
- if (isInterrupted())
- throw new InterruptedException();
+ if (isInterrupted())
+ throw new InterruptedException();
- InputStream in = new BufferedInputStream(sock.getInputStream());
+ int oldTimeout = 0;
- sock.setKeepAlive(true);
- sock.setTcpNoDelay(true);
+ try {
+ oldTimeout = sock.getSoTimeout();
- // Wait for
- while (!isInterrupted()) {
- TcpDiscoveryAbstractMessage msg = spi.marsh.unmarshal(in, U.gridClassLoader());
+ sock.setSoTimeout((int)spi.netTimeout);
- if (msg instanceof TcpDiscoveryClientReconnectMessage) {
- TcpDiscoveryClientReconnectMessage res = (TcpDiscoveryClientReconnectMessage)msg;
+ InputStream in = new BufferedInputStream(sock.getInputStream());
- if (res.creatorNodeId().equals(getLocalNodeId())) {
- if (res.success()) {
- msgWorker.addMessage(res);
+ sock.setKeepAlive(true);
+ sock.setTcpNoDelay(true);
- success = true;
- }
+ // Wait for
+ while (!isInterrupted()) {
+ TcpDiscoveryAbstractMessage msg = spi.marsh.unmarshal(in, U.gridClassLoader());
- break;
+ if (msg instanceof TcpDiscoveryClientReconnectMessage) {
+ TcpDiscoveryClientReconnectMessage res = (TcpDiscoveryClientReconnectMessage)msg;
+
+ if (res.creatorNodeId().equals(getLocalNodeId())) {
+ if (res.success()) {
+ msgWorker.addMessage(res);
+
+ success = true;
+ }
+
+ return;
+ }
+ }
}
}
+ catch (IOException | IgniteCheckedException e) {
+ U.closeQuiet(sock);
+
+ if (log.isDebugEnabled())
+ log.error("Reconnect error [join=" + join + ", timeout=" + timeout + ']', e);
+ if (timeout > 0 && (U.currentTimeMillis() - startTime) > timeout)
+ throw e;
+ else
+ U.warn(log, "Failed to reconnect to cluster (will retry): " + e);
+ }
+ finally {
+ if (success)
+ sock.setSoTimeout(oldTimeout);
+ }
}
}
catch (IOException | IgniteCheckedException e) {
+ err = e;
+
U.error(log, "Failed to reconnect", e);
}
finally {
if (!success) {
U.closeQuiet(sock);
- msgWorker.addMessage(SPI_RECONNECT_FAILED);
+ if (join)
+ joinError(new IgniteSpiException("Failed to connect to cluster, connection failed and failed " +
+ "to reconnect.", err));
+ else
+ msgWorker.addMessage(SPI_RECONNECT_FAILED);
}
}
}
@@ -967,7 +1028,7 @@ class ClientImpl extends TcpDiscoveryImpl {
spi.stats.onJoinStarted();
try {
- final Socket sock = joinTopology(false);
+ final Socket sock = joinTopology(false, spi.joinTimeout);
if (sock == null) {
joinErr = new IgniteSpiException("Join process timed out.");
@@ -981,12 +1042,14 @@ class ClientImpl extends TcpDiscoveryImpl {
sockWriter.setSocket(sock);
- timer.schedule(new TimerTask() {
- @Override public void run() {
- if (joinLatch.getCount() > 0)
- queue.add(JOIN_TIMEOUT);
- }
- }, spi.netTimeout);
+ if (spi.joinTimeout > 0) {
+ timer.schedule(new TimerTask() {
+ @Override public void run() {
+ if (joinLatch.getCount() > 0)
+ queue.add(JOIN_TIMEOUT);
+ }
+ }, spi.joinTimeout);
+ }
sockReader.setSocket(sock, locNode.clientRouterNodeId());
@@ -996,8 +1059,8 @@ class ClientImpl extends TcpDiscoveryImpl {
if (msg == JOIN_TIMEOUT) {
if (joinLatch.getCount() > 0) {
joinErr = new IgniteSpiException("Join process timed out, did not receive response for " +
- "join request (consider increasing 'networkTimeout' configuration property) " +
- "[networkTimeout=" + spi.netTimeout + ", sock=" + sock +']');
+ "join request (consider increasing 'joinTimeout' configuration property) " +
+ "[joinTimeout=" + spi.joinTimeout + ", sock=" + sock +']');
joinLatch.countDown();
@@ -1021,30 +1084,23 @@ class ClientImpl extends TcpDiscoveryImpl {
if (((SocketClosedMessage)msg).sock == currSock) {
currSock = null;
- if (joinLatch.getCount() > 0) {
- joinErr = new IgniteSpiException("Failed to connect to cluster: socket closed.");
+ boolean join = joinLatch.getCount() > 0;
- joinLatch.countDown();
+ if (spi.getSpiContext().isStopping() || segmented) {
+ leaveLatch.countDown();
- break;
+ if (join) {
+ joinError(new IgniteSpiException("Failed to connect to cluster: socket closed."));
+
+ break;
+ }
}
else {
- if (spi.getSpiContext().isStopping() || segmented)
- leaveLatch.countDown();
- else {
- assert reconnector == null;
-
- final Reconnector reconnector = new Reconnector();
- this.reconnector = reconnector;
- reconnector.start();
-
- timer.schedule(new TimerTask() {
- @Override public void run() {
- if (reconnector.isAlive())
- reconnector.cancel();
- }
- }, spi.netTimeout);
- }
+ assert reconnector == null;
+
+ final Reconnector reconnector = new Reconnector(join);
+ this.reconnector = reconnector;
+ reconnector.start();
}
}
}
@@ -1208,7 +1264,7 @@ class ClientImpl extends TcpDiscoveryImpl {
locNode.order(topVer);
- notifyDiscovery(EVT_NODE_JOINED, topVer, locNode, updateTopologyHistory(topVer));
+ notifyDiscovery(EVT_NODE_JOINED, topVer, locNode, updateTopologyHistory(topVer, msg));
joinErr = null;
@@ -1230,6 +1286,14 @@ class ClientImpl extends TcpDiscoveryImpl {
return;
}
+ if (!topHist.isEmpty() && msg.topologyVersion() <= topHist.lastKey()) {
+ if (log.isDebugEnabled())
+ log.debug("Discarding node add finished message since topology already updated " +
+ "[msg=" + msg + ", lastHistKey=" + topHist.lastKey() + ", node=" + node + ']');
+
+ return;
+ }
+
long topVer = msg.topologyVersion();
node.order(topVer);
@@ -1238,7 +1302,7 @@ class ClientImpl extends TcpDiscoveryImpl {
if (spi.locNodeVer.equals(node.version()))
node.version(spi.locNodeVer);
- NavigableSet<ClusterNode> top = updateTopologyHistory(topVer);
+ NavigableSet<ClusterNode> top = updateTopologyHistory(topVer, msg);
if (!pending && joinLatch.getCount() > 0) {
if (log.isDebugEnabled())
@@ -1276,7 +1340,7 @@ class ClientImpl extends TcpDiscoveryImpl {
return;
}
- NavigableSet<ClusterNode> top = updateTopologyHistory(msg.topologyVersion());
+ NavigableSet<ClusterNode> top = updateTopologyHistory(msg.topologyVersion(), msg);
if (!pending && joinLatch.getCount() > 0) {
if (log.isDebugEnabled())
@@ -1319,7 +1383,7 @@ class ClientImpl extends TcpDiscoveryImpl {
return;
}
- NavigableSet<ClusterNode> top = updateTopologyHistory(msg.topologyVersion());
+ NavigableSet<ClusterNode> top = updateTopologyHistory(msg.topologyVersion(), msg);
if (!pending && joinLatch.getCount() > 0) {
if (log.isDebugEnabled())
@@ -1376,28 +1440,32 @@ class ClientImpl extends TcpDiscoveryImpl {
return;
if (getLocalNodeId().equals(msg.creatorNodeId())) {
- assert msg.success();
+ assert msg.success() : msg;
- currSock = reconnector.sock;
+ if (reconnector != null) {
+ currSock = reconnector.sock;
- sockWriter.setSocket(currSock);
- sockReader.setSocket(currSock, locNode.clientRouterNodeId());
+ sockWriter.setSocket(currSock);
+ sockReader.setSocket(currSock, locNode.clientRouterNodeId());
- reconnector = null;
+ reconnector = null;
- pending = true;
+ pending = true;
- try {
- for (TcpDiscoveryAbstractMessage pendingMsg : msg.pendingMessages()) {
- if (log.isDebugEnabled())
- log.debug("Process message on reconnect [msg=" + pendingMsg + ']');
+ try {
+ for (TcpDiscoveryAbstractMessage pendingMsg : msg.pendingMessages()) {
+ if (log.isDebugEnabled())
+ log.debug("Process message on reconnect [msg=" + pendingMsg + ']');
- processDiscoveryMessage(pendingMsg);
+ processDiscoveryMessage(pendingMsg);
+ }
+ }
+ finally {
+ pending = false;
}
}
- finally {
- pending = false;
- }
+ else if (log.isDebugEnabled())
+ log.debug("Discarding reconnect message, reconnect is completed: " + msg);
}
else if (log.isDebugEnabled())
log.debug("Discarding reconnect message for another client: " + msg);
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8870a177/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
index 65bea9f..9041557 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
@@ -41,7 +41,6 @@ import org.jsr166.*;
import java.io.*;
import java.net.*;
-import java.text.*;
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.*;
@@ -1192,7 +1191,7 @@ class ServerImpl extends TcpDiscoveryImpl {
if (node.id().equals(destNodeId)) {
Collection<TcpDiscoveryNode> allNodes = ring.allNodes();
- Collection<TcpDiscoveryNode> topToSend = new ArrayList<>(allNodes.size());
+ Collection<TcpDiscoveryNode> topToSnd = new ArrayList<>(allNodes.size());
for (TcpDiscoveryNode n0 : allNodes) {
assert n0.internalOrder() != 0 : n0;
@@ -1202,10 +1201,10 @@ class ServerImpl extends TcpDiscoveryImpl {
// There will be separate messages for nodes with greater
// internal order.
if (n0.internalOrder() < nodeAddedMsg.node().internalOrder())
- topToSend.add(n0);
+ topToSnd.add(n0);
}
- nodeAddedMsg.topology(topToSend);
+ nodeAddedMsg.topology(topToSnd);
nodeAddedMsg.messages(msgs, discardMsgId);
Map<Long, Collection<ClusterNode>> hist;
@@ -1646,6 +1645,108 @@ class ServerImpl extends TcpDiscoveryImpl {
}
/**
+ * Discovery messages history used for client reconnect.
+ */
+ private class EnsuredMessageHistory {
+ /** */
+ private static final int MAX = 1024;
+
+ /** Pending messages. */
+ private final ArrayDeque<TcpDiscoveryAbstractMessage> msgs = new ArrayDeque<>(MAX * 2);
+
+ /**
+ * @param msg Adds message.
+ */
+ void add(TcpDiscoveryAbstractMessage msg) {
+ assert spi.ensured(msg) : msg;
+
+ msgs.addLast(msg);
+
+ while (msgs.size() > MAX)
+ msgs.pollFirst();
+ }
+
+ /**
+ * Gets messages starting from provided ID (exclusive). If such
+ * message is not found, {@code null} is returned (this indicates
+ * a failure condition when it was already removed from queue).
+ *
+ * @param lastMsgId Last message ID received on client. {@code Null} if client did not finish connect procedure.
+ * @param node Client node.
+ * @return Collection of messages.
+ */
+ @Nullable Collection<TcpDiscoveryAbstractMessage> messages(@Nullable IgniteUuid lastMsgId,
+ TcpDiscoveryNode node)
+ {
+ assert node != null && node.isClient() : node;
+
+ if (lastMsgId == null) {
+ // Client connection failed before it received TcpDiscoveryNodeAddedMessage.
+ List<TcpDiscoveryAbstractMessage> res = null;
+
+ for (TcpDiscoveryAbstractMessage msg : msgs) {
+ if (msg instanceof TcpDiscoveryNodeAddedMessage) {
+ if (node.id().equals(((TcpDiscoveryNodeAddedMessage) msg).node().id()))
+ res = new ArrayList<>(msgs.size());
+ }
+
+ if (res != null)
+ res.add(prepare(msg, node.id()));
+ }
+
+ if (log.isDebugEnabled()) {
+ if (res == null)
+ log.debug("Failed to find node added message [node=" + node + ']');
+ else
+ log.debug("Found add added message [node=" + node + ", hist=" + res + ']');
+ }
+
+ return res;
+ }
+ else {
+ if (msgs.isEmpty())
+ return Collections.emptyList();
+
+ Collection<TcpDiscoveryAbstractMessage> cp = new ArrayList<>(msgs.size());
+
+ boolean skip = true;
+
+ for (TcpDiscoveryAbstractMessage msg : msgs) {
+ if (skip) {
+ if (msg.id().equals(lastMsgId))
+ skip = false;
+ }
+ else
+ cp.add(prepare(msg, node.id()));
+ }
+
+ cp = !skip ? cp : null;
+
+ if (log.isDebugEnabled()) {
+ if (cp == null)
+ log.debug("Failed to find messages history [node=" + node + ", lastMsgId" + lastMsgId + ']');
+ else
+ log.debug("Found messages history [node=" + node + ", hist=" + cp + ']');
+ }
+
+ return cp;
+ }
+ }
+
+ /**
+ * @param msg Message.
+ * @param destNodeId Client node ID.
+ * @return Prepared message.
+ */
+ private TcpDiscoveryAbstractMessage prepare(TcpDiscoveryAbstractMessage msg, UUID destNodeId) {
+ if (msg instanceof TcpDiscoveryNodeAddedMessage)
+ prepareNodeAddedMessage(msg, destNodeId, null, null);
+
+ return msg;
+ }
+ }
+
+ /**
* Pending messages container.
*/
private static class PendingMessages {
@@ -1678,33 +1779,27 @@ class ServerImpl extends TcpDiscoveryImpl {
}
/**
- * Gets messages starting from provided ID (exclusive). If such
- * message is not found, {@code null} is returned (this indicates
- * a failure condition when it was already removed from queue).
+ * Resets pending messages.
*
- * @param lastMsgId Last message ID.
- * @return Collection of messages.
+ * @param msgs Message.
+ * @param discardId Discarded message ID.
*/
- @Nullable Collection<TcpDiscoveryAbstractMessage> messages(IgniteUuid lastMsgId) {
- assert lastMsgId != null;
-
- if (msgs.isEmpty())
- return Collections.emptyList();
+ void reset(@Nullable Collection<TcpDiscoveryAbstractMessage> msgs, @Nullable IgniteUuid discardId) {
+ this.msgs.clear();
- Collection<TcpDiscoveryAbstractMessage> cp = new ArrayList<>(msgs.size());
+ if (msgs != null)
+ this.msgs.addAll(msgs);
- boolean skip = true;
+ this.discardId = discardId;
+ }
- for (TcpDiscoveryAbstractMessage msg : msgs) {
- if (skip) {
- if (msg.id().equals(lastMsgId))
- skip = false;
- }
- else
- cp.add(msg);
- }
+ /**
+ * Clears pending messages.
+ */
+ void clear() {
+ msgs.clear();
- return !skip ? cp : null;
+ discardId = null;
}
/**
@@ -1728,6 +1823,9 @@ class ServerImpl extends TcpDiscoveryImpl {
/** Pending messages. */
private final PendingMessages pendingMsgs = new PendingMessages();
+ /** Messages history used for client reconnect. */
+ private final EnsuredMessageHistory msgHist = new EnsuredMessageHistory();
+
/** Last message that updated topology. */
private TcpDiscoveryAbstractMessage lastMsg;
@@ -1794,6 +1892,9 @@ class ServerImpl extends TcpDiscoveryImpl {
else
assert false : "Unknown message type: " + msg.getClass().getSimpleName();
+ if (spi.ensured(msg))
+ msgHist.add(msg);
+
spi.stats.onMessageProcessingFinished(msg);
}
@@ -2130,6 +2231,8 @@ class ServerImpl extends TcpDiscoveryImpl {
onException("Failed to send message to next node [next=" + next.id() + ", msg=" + msg + ']',
e);
+ log.error("Will resend [msg=" + msg + ", e=" + e + ']');
+
if (e instanceof SocketTimeoutException || X.hasCause(e, SocketTimeoutException.class)) {
ackTimeout0 *= 2;
@@ -2619,11 +2722,15 @@ class ServerImpl extends TcpDiscoveryImpl {
node.aliveCheck(spi.maxMissedClientHbs);
if (isLocalNodeCoordinator()) {
- Collection<TcpDiscoveryAbstractMessage> pending = pendingMsgs.messages(msg.lastMessageId());
+ Collection<TcpDiscoveryAbstractMessage> pending = msgHist.messages(msg.lastMessageId(), node);
if (pending != null) {
msg.pendingMessages(pending);
msg.success(true);
+
+ if (log.isDebugEnabled())
+ log.debug("Accept client reconnect, restored pending messages " +
+ "[locNodeId=" + locNodeId + ", clientNodeId=" + nodeId + ']');
}
else {
if (log.isDebugEnabled())
@@ -2836,7 +2943,8 @@ class ServerImpl extends TcpDiscoveryImpl {
topHist.clear();
topHist.putAll(msg.topologyHistory());
- pendingMsgs.discard(msg.discardedMessageId());
+ // Restore pending messages.
+ pendingMsgs.reset(msg.messages(), msg.discardedMessageId());
// Clear data to minimize message size.
msg.messages(null, null);
@@ -3094,6 +3202,10 @@ class ServerImpl extends TcpDiscoveryImpl {
if (log.isDebugEnabled())
log.debug("Removed node from topology: " + leftNode);
+ // Clear pending messages map.
+ if (!ring.hasRemoteNodes())
+ pendingMsgs.clear();
+
long topVer;
if (locNodeCoord) {
@@ -3257,6 +3369,10 @@ class ServerImpl extends TcpDiscoveryImpl {
assert node != null;
+ // Clear pending messages map.
+ if (!ring.hasRemoteNodes())
+ pendingMsgs.clear();
+
long topVer;
if (locNodeCoord) {
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8870a177/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ipfinder/TcpDiscoveryIpFinderAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ipfinder/TcpDiscoveryIpFinderAdapter.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ipfinder/TcpDiscoveryIpFinderAdapter.java
index 99a2cdc..4d62ff2 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ipfinder/TcpDiscoveryIpFinderAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ipfinder/TcpDiscoveryIpFinderAdapter.java
@@ -17,9 +17,13 @@
package org.apache.ignite.spi.discovery.tcp.ipfinder;
+import org.apache.ignite.*;
import org.apache.ignite.internal.util.tostring.*;
import org.apache.ignite.internal.util.typedef.internal.*;
+import org.apache.ignite.resources.*;
import org.apache.ignite.spi.*;
+import org.apache.ignite.spi.discovery.*;
+import org.apache.ignite.spi.discovery.tcp.*;
import java.net.*;
import java.util.*;
@@ -35,6 +39,11 @@ public abstract class TcpDiscoveryIpFinderAdapter implements TcpDiscoveryIpFinde
@GridToStringExclude
private volatile IgniteSpiContext spiCtx;
+ /** Ignite instance . */
+ @IgniteInstanceResource
+ @GridToStringExclude
+ protected Ignite ignite;
+
/** {@inheritDoc} */
@Override public void onSpiContextInitialized(IgniteSpiContext spiCtx) throws IgniteSpiException {
this.spiCtx = spiCtx;
@@ -47,7 +56,8 @@ public abstract class TcpDiscoveryIpFinderAdapter implements TcpDiscoveryIpFinde
/** {@inheritDoc} */
@Override public void initializeLocalAddresses(Collection<InetSocketAddress> addrs) throws IgniteSpiException {
- registerAddresses(addrs);
+ if (!discoveryClientMode())
+ registerAddresses(addrs);
}
/** {@inheritDoc} */
@@ -77,6 +87,28 @@ public abstract class TcpDiscoveryIpFinderAdapter implements TcpDiscoveryIpFinde
}
/**
+ * @return {@code True} if TCP discovery works in client mode.
+ */
+ protected boolean discoveryClientMode() {
+ boolean clientMode;
+
+ Ignite ignite0 = ignite;
+
+ if (ignite0 != null) { // Can be null if used in tests without starting Ignite.
+ DiscoverySpi discoSpi = ignite0.configuration().getDiscoverySpi();
+
+ if (!(discoSpi instanceof TcpDiscoverySpi))
+ throw new IgniteSpiException("TcpDiscoveryIpFinder should be used with TcpDiscoverySpi: " + discoSpi);
+
+ clientMode = ignite0.configuration().isClientMode() && !((TcpDiscoverySpi)discoSpi).isForceServerMode();
+ }
+ else
+ clientMode = false;
+
+ return clientMode;
+ }
+
+ /**
* @return SPI context.
*/
protected IgniteSpiContext spiContext() {
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8870a177/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ipfinder/multicast/TcpDiscoveryMulticastIpFinder.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ipfinder/multicast/TcpDiscoveryMulticastIpFinder.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ipfinder/multicast/TcpDiscoveryMulticastIpFinder.java
index a992620..8e5a1fd 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ipfinder/multicast/TcpDiscoveryMulticastIpFinder.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ipfinder/multicast/TcpDiscoveryMulticastIpFinder.java
@@ -85,11 +85,6 @@ public class TcpDiscoveryMulticastIpFinder extends TcpDiscoveryVmIpFinder {
@LoggerResource
private IgniteLogger log;
- /** Ignite instance . */
- @IgniteInstanceResource
- @GridToStringExclude
- private Ignite ignite;
-
/** Multicast IP address as string. */
private String mcastGrp = DFLT_MCAST_GROUP;
@@ -256,19 +251,7 @@ public class TcpDiscoveryMulticastIpFinder extends TcpDiscoveryVmIpFinder {
"(it is recommended in production to specify at least one address in " +
"TcpDiscoveryMulticastIpFinder.getAddresses() configuration property)");
- boolean clientMode;
-
- if (ignite != null) { // Can be null if used in tests without starting Ignite.
- DiscoverySpi discoSpi = ignite.configuration().getDiscoverySpi();
-
- if (!(discoSpi instanceof TcpDiscoverySpi))
- throw new IgniteSpiException("TcpDiscoveryMulticastIpFinder should be used with " +
- "TcpDiscoverySpi: " + discoSpi);
-
- clientMode = ((TcpDiscoverySpi)discoSpi).isClientMode();
- }
- else
- clientMode = false;
+ boolean clientMode = discoveryClientMode();
InetAddress mcastAddr;
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8870a177/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryAbstractMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryAbstractMessage.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryAbstractMessage.java
index 145b518..21dbf4f 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryAbstractMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryAbstractMessage.java
@@ -41,7 +41,7 @@ public abstract class TcpDiscoveryAbstractMessage implements Serializable {
protected static final int CLIENT_RECON_SUCCESS_FLAG_POS = 2;
/** Sender of the message (transient). */
- private transient UUID senderNodeId;
+ private transient UUID sndNodeId;
/** Message ID. */
private IgniteUuid id;
@@ -99,16 +99,16 @@ public abstract class TcpDiscoveryAbstractMessage implements Serializable {
* @return Sender node ID.
*/
public UUID senderNodeId() {
- return senderNodeId;
+ return sndNodeId;
}
/**
* Sets sender node ID.
*
- * @param senderNodeId Sender node ID.
+ * @param sndNodeId Sender node ID.
*/
- public void senderNodeId(UUID senderNodeId) {
- this.senderNodeId = senderNodeId;
+ public void senderNodeId(UUID sndNodeId) {
+ this.sndNodeId = sndNodeId;
}
/**
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8870a177/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheManyClientsTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheManyClientsTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheManyClientsTest.java
index 77ddd40..4fb4387 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheManyClientsTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheManyClientsTest.java
@@ -63,18 +63,13 @@ public class IgniteCacheManyClientsTest extends GridCommonAbstractTest {
((TcpCommunicationSpi)cfg.getCommunicationSpi()).setLocalPortRange(200);
((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(ipFinder);
+ ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setJoinTimeout(2 * 60_000);
if (!clientDiscovery)
((TcpDiscoverySpi)cfg.getDiscoverySpi()).setForceServerMode(true);
cfg.setClientMode(client);
- if (client) {
-// cfg.setPublicThreadPoolSize(1);
-// cfg.setPeerClassLoadingThreadPoolSize(1);
-// cfg.setIgfsThreadPoolSize(1);
- }
-
CacheConfiguration ccfg = new CacheConfiguration();
ccfg.setCacheMode(PARTITIONED);
@@ -197,43 +192,62 @@ public class IgniteCacheManyClientsTest extends GridCommonAbstractTest {
try {
IgniteInternalFuture<?> fut = GridTestUtils.runMultiThreadedAsync(new Callable<Object>() {
@Override public Object call() throws Exception {
- try (Ignite ignite = startGrid(idx.getAndIncrement())) {
- log.info("Started node: " + ignite.name());
+ boolean counted = false;
- assertTrue(ignite.configuration().isClientMode());
+ try {
+ int nodeIdx = idx.getAndIncrement();
- IgniteCache<Object, Object> cache = ignite.cache(null);
+ Thread.currentThread().setName("client-thread-node-" + nodeIdx);
- ThreadLocalRandom rnd = ThreadLocalRandom.current();
+ try (Ignite ignite = startGrid(nodeIdx)) {
+ log.info("Started node: " + ignite.name());
- int iter = 0;
+ assertTrue(ignite.configuration().isClientMode());
- Integer key = rnd.nextInt(0, 1000);
+ IgniteCache<Object, Object> cache = ignite.cache(null);
- cache.put(key, iter++);
+ ThreadLocalRandom rnd = ThreadLocalRandom.current();
- assertNotNull(cache.get(key));
+ int iter = 0;
- latch.countDown();
-
- while (!stop.get()) {
- key = rnd.nextInt(0, 1000);
+ Integer key = rnd.nextInt(0, 1000);
cache.put(key, iter++);
assertNotNull(cache.get(key));
- Thread.sleep(1);
+ latch.countDown();
+
+ counted = true;
+
+ while (!stop.get()) {
+ key = rnd.nextInt(0, 1000);
+
+ cache.put(key, iter++);
+
+ assertNotNull(cache.get(key));
+
+ Thread.sleep(1);
+ }
+
+ log.info("Stopping node: " + ignite.name());
}
- log.info("Stopping node: " + ignite.name());
+ return null;
}
+ catch (Throwable e) {
+ log.error("Unexpected error in client thread: " + e, e);
- return null;
+ throw e;
+ }
+ finally {
+ if (!counted)
+ latch.countDown();
+ }
}
}, THREADS, "client-thread");
- latch.await();
+ assertTrue(latch.await(getTestTimeout(), TimeUnit.MILLISECONDS));
log.info("All clients started.");
@@ -245,6 +259,11 @@ public class IgniteCacheManyClientsTest extends GridCommonAbstractTest {
fut.get();
}
+ catch (Throwable e) {
+ log.error("Unexpected error: " + e, e);
+
+ throw e;
+ }
finally {
stop.set(true);
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8870a177/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java
index 44fe299..8147958 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java
@@ -193,8 +193,7 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest {
}
/**
- *
- * @throws Exception
+ * @throws Exception If failed.
*/
public void testJoinTimeout() throws Exception {
clientIpFinder = new TcpDiscoveryVmIpFinder();
@@ -544,8 +543,6 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest {
* @throws Exception If failed.
*/
public void testClientReconnectTopologyChange2() throws Exception {
- fail("https://issues.apache.org/jira/browse/IGNITE-998");
-
maxMissedClientHbs = 100;
clientsPerSrv = 1;
@@ -1001,6 +998,24 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest {
}
/**
+ * @throws Exception If failed.
+ */
+ public void testJoinError2() throws Exception {
+ startServerNodes(1);
+
+ Ignite ignite = G.ignite("server-0");
+
+ TestTcpDiscoverySpi srvSpi = ((TestTcpDiscoverySpi)ignite.configuration().getDiscoverySpi());
+
+ srvSpi.failNodeAddedMessage();
+ srvSpi.failClientReconnectMessage();
+
+ startClientNodes(1);
+
+ checkNodes(1, 1);
+ }
+
+ /**
* @param clientIdx Client index.
* @param srvIdx Server index.
* @throws Exception In case of error.
@@ -1251,6 +1266,9 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest {
/** */
private AtomicInteger failNodeAdded = new AtomicInteger();
+ /** */
+ private AtomicInteger failClientReconnect = new AtomicInteger();
+
/**
* @param lock Lock.
*/
@@ -1276,6 +1294,13 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest {
}
/**
+ *
+ */
+ void failClientReconnectMessage() {
+ failClientReconnect.set(1);
+ }
+
+ /**
* @param isPause Is lock.
* @param locks Locks.
*/
@@ -1293,7 +1318,14 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest {
GridByteArrayOutputStream bout) throws IOException, IgniteCheckedException {
waitFor(writeLock);
- if (msg instanceof TcpDiscoveryNodeAddedMessage && failNodeAdded.getAndDecrement() > 0) {
+ boolean fail = false;
+
+ if (msg instanceof TcpDiscoveryNodeAddedMessage)
+ fail = failNodeAdded.getAndDecrement() > 0;
+ else if (msg instanceof TcpDiscoveryClientReconnectMessage)
+ fail = failClientReconnect.getAndDecrement() > 0;
+
+ if (fail) {
log.info("Close socket on message write [msg=" + msg + "]");
sock.close();