You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2015/06/11 15:40:24 UTC
incubator-ignite git commit: # ignite-883
Repository: incubator-ignite
Updated Branches:
refs/heads/ignite-883 [created] de0d61f6a
# ignite-883
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/de0d61f6
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/de0d61f6
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/de0d61f6
Branch: refs/heads/ignite-883
Commit: de0d61f6a3467379c168a15bbd9c4580aeebb092
Parents: 89a4f7c
Author: sboikov <sb...@gridgain.com>
Authored: Thu Jun 11 10:40:27 2015 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Thu Jun 11 17:40:01 2015 +0300
----------------------------------------------------------------------
.../apache/ignite/internal/IgniteKernal.java | 1 -
.../discovery/GridDiscoveryManager.java | 9 +-
.../dht/preloader/GridDhtPreloader.java | 2 +-
.../org/apache/ignite/spi/IgniteSpiAdapter.java | 28 ++-
.../communication/tcp/TcpCommunicationSpi.java | 2 +-
.../ignite/spi/discovery/tcp/ClientImpl.java | 214 ++++++++++++-------
.../ignite/spi/discovery/tcp/ServerImpl.java | 64 ------
.../spi/discovery/tcp/TcpDiscoveryImpl.java | 66 ++++++
.../distributed/IgniteCacheManyClientsTest.java | 87 +++++++-
9 files changed, 323 insertions(+), 150 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/de0d61f6/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
index 4f5e365..f38fee1 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
@@ -1001,7 +1001,6 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable {
A.notNull(cfg.getMBeanServer(), "cfg.getMBeanServer()");
A.notNull(cfg.getGridLogger(), "cfg.getGridLogger()");
A.notNull(cfg.getMarshaller(), "cfg.getMarshaller()");
- A.notNull(cfg.getPublicThreadPoolSize(), "cfg.getPublicThreadPoolSize()");
A.notNull(cfg.getUserAttributes(), "cfg.getUserAttributes()");
// All SPIs should be non-null.
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/de0d61f6/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
index 71fbc61..464110c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
@@ -2057,17 +2057,22 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
private final AffinityTopologyVersion topVer;
/** */
+ @GridToStringExclude
private final DiscoCache discoCache;
/**
* @param topVer Topology version.
* @param discoCache Disco cache.
*/
- private Snapshot(AffinityTopologyVersion topVer,
- DiscoCache discoCache) {
+ private Snapshot(AffinityTopologyVersion topVer, DiscoCache discoCache) {
this.topVer = topVer;
this.discoCache = discoCache;
}
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(Snapshot.class, this);
+ }
}
/** Cache for discovery collections. */
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/de0d61f6/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
index 51010ce..0355bb3 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
@@ -102,7 +102,7 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter {
boolean set = topVer.setIfGreater(e.topologyVersion());
assert set : "Have you configured TcpDiscoverySpi for your in-memory data grid? [newVer=" +
- e.topologyVersion() + ", curVer=" + topVer.get() + ']';
+ e.topologyVersion() + ", curVer=" + topVer.get() + ", evt=" + e + ']';
if (e.type() == EVT_NODE_LEFT || e.type() == EVT_NODE_FAILED) {
for (GridDhtAssignmentFetchFuture fut : pendingAssignmentFetchFuts.values())
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/de0d61f6/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java b/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java
index 6e7a706..476f8a8 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java
@@ -585,8 +585,32 @@ public abstract class IgniteSpiAdapter implements IgniteSpi, IgniteSpiManagement
GridDummySpiContext(ClusterNode locNode, boolean stopping, @Nullable IgniteSpiContext spiCtx) {
this.locNode = locNode;
this.stopping = stopping;
- this.msgFactory = spiCtx != null ? spiCtx.messageFactory() : null;
- this.msgFormatter = spiCtx != null ? spiCtx.messageFormatter() : null;
+
+ MessageFactory msgFactory0 = spiCtx != null ? spiCtx.messageFactory() : null;
+ MessageFormatter msgFormatter0 = spiCtx != null ? spiCtx.messageFormatter() : null;
+
+ if (msgFactory0 == null) {
+ msgFactory0 = new MessageFactory() {
+ @Nullable @Override public Message create(byte type) {
+ throw new IgniteException("Failed to read message, node is not started.");
+ }
+ };
+ }
+
+ if (msgFormatter0 == null) {
+ msgFormatter0 = new MessageFormatter() {
+ @Override public MessageWriter writer() {
+ throw new IgniteException("Failed to write message, node is not started.");
+ }
+
+ @Override public MessageReader reader(MessageFactory factory) {
+ throw new IgniteException("Failed to read message, node is not started.");
+ }
+ };
+ }
+
+ this.msgFactory = msgFactory0;
+ this.msgFormatter = msgFormatter0;
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/de0d61f6/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
index a661965..f19e25b 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
@@ -2028,7 +2028,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
if (X.hasCause(e, SocketTimeoutException.class))
LT.warn(log, null, "Connect timed out (consider increasing 'connTimeout' " +
- "configuration property) [addr=" + addr + ']');
+ "configuration property) [addr=" + addr + ", connTimeout" + connTimeout + ']');
if (errs == null)
errs = new IgniteCheckedException("Failed to connect to node (is node still alive?). " +
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/de0d61f6/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
index d064c8d..23297ed 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
@@ -195,7 +195,7 @@ class ClientImpl extends TcpDiscoveryImpl {
U.warn(log, "Failed to left node: timeout [nodeId=" + locNode + ']');
}
catch (InterruptedException ignored) {
-
+ // No-op.
}
}
@@ -282,7 +282,7 @@ class ClientImpl extends TcpDiscoveryImpl {
return false;
}
catch (IgniteCheckedException e) {
- throw new IgniteSpiException(e); // Should newer occur
+ throw new IgniteSpiException(e); // Should newer occur.
}
}
@@ -347,7 +347,10 @@ class ClientImpl extends TcpDiscoveryImpl {
}
/**
+ * @param recon {@code True} if reconnects.
* @return Opened socket or {@code null} if timeout.
+ * @throws InterruptedException If interrupted.
+ * @throws IgniteSpiException If failed.
* @see TcpDiscoverySpi#joinTimeout
*/
@SuppressWarnings("BusyWait")
@@ -387,71 +390,152 @@ class ClientImpl extends TcpDiscoveryImpl {
InetSocketAddress addr = it.next();
- Socket sock = null;
+ T2<Socket, Integer> sockAndRes = sendJoinRequest(recon, addr);
- try {
- long ts = U.currentTimeMillis();
+ if (sockAndRes == null) {
+ it.remove();
- IgniteBiTuple<Socket, UUID> t = initConnection(addr);
+ continue;
+ }
- sock = t.get1();
+ assert sockAndRes.get1() != null;
+ assert sockAndRes.get2() != null;
- UUID rmtNodeId = t.get2();
+ Socket sock = sockAndRes.get1();
- spi.stats.onClientSocketInitialized(U.currentTimeMillis() - ts);
+ switch (sockAndRes.get2()) {
+ case RES_OK:
+ return sock;
- locNode.clientRouterNodeId(rmtNodeId);
+ case RES_CONTINUE_JOIN:
+ case RES_WAIT:
+ U.closeQuiet(sock);
- TcpDiscoveryAbstractMessage msg = recon ?
- new TcpDiscoveryClientReconnectMessage(getLocalNodeId(), rmtNodeId,
- lastMsgId) :
- new TcpDiscoveryJoinRequestMessage(locNode, spi.collectExchangeData(getLocalNodeId()));
+ break;
- msg.client(true);
+ default:
+ if (log.isDebugEnabled())
+ log.debug("Received unexpected response to join request: " + sockAndRes.get2());
- spi.writeToSocket(sock, msg);
+ U.closeQuiet(sock);
+ }
+ }
- int res = spi.readReceipt(sock, spi.ackTimeout);
+ if (addrs.isEmpty()) {
+ if (spi.joinTimeout > 0 && (U.currentTimeMillis() - startTime) > spi.joinTimeout)
+ return null;
- switch (res) {
- case RES_OK:
- return sock;
+ Thread.sleep(2000);
- case RES_CONTINUE_JOIN:
- case RES_WAIT:
- U.closeQuiet(sock);
+ U.warn(log, "Failed to connect to any address from IP finder (will retry to join topology " +
+ "in 2000ms): " + addrs0);
+ }
+ }
+ }
- break;
+ /**
+ * @param recon {@code True} if reconnects.
+ * @param addr Address.
+ * @return Socket and connect response.
+ */
+ @Nullable private T2<Socket, Integer> sendJoinRequest(boolean recon, InetSocketAddress addr) {
+ assert addr != null;
- default:
- if (log.isDebugEnabled())
- log.debug("Received unexpected response to join request: " + res);
+ Collection<Throwable> errs = null;
- U.closeQuiet(sock);
- }
- }
- catch (IOException | IgniteCheckedException e) {
- if (log.isDebugEnabled())
- U.error(log, "Failed to establish connection with address: " + addr, e);
+ long ackTimeout0 = spi.ackTimeout;
- U.closeQuiet(sock);
+ int connectAttempts = 1;
- it.remove();
- }
+ UUID locNodeId = getLocalNodeId();
+
+ for (int i = 0; i < spi.reconCnt; i++) {
+ boolean openSock = false;
+
+ Socket sock = null;
+
+ try {
+ long tstamp = U.currentTimeMillis();
+
+ sock = spi.openSocket(addr);
+
+ openSock = true;
+
+ TcpDiscoveryHandshakeRequest req = new TcpDiscoveryHandshakeRequest(locNodeId);
+
+ req.client(true);
+
+ spi.writeToSocket(sock, req);
+
+ TcpDiscoveryHandshakeResponse res = spi.readMessage(sock, null, ackTimeout0);
+
+ UUID rmtNodeId = res.creatorNodeId();
+
+ assert rmtNodeId != null;
+ assert !getLocalNodeId().equals(rmtNodeId);
+
+ spi.stats.onClientSocketInitialized(U.currentTimeMillis() - tstamp);
+
+ locNode.clientRouterNodeId(rmtNodeId);
+
+ tstamp = U.currentTimeMillis();
+
+ TcpDiscoveryAbstractMessage msg = recon ?
+ new TcpDiscoveryClientReconnectMessage(getLocalNodeId(), rmtNodeId, lastMsgId) :
+ new TcpDiscoveryJoinRequestMessage(locNode, spi.collectExchangeData(getLocalNodeId()));
+
+ msg.client(true);
+
+ spi.writeToSocket(sock, msg);
+
+ spi.stats.onMessageSent(msg, U.currentTimeMillis() - tstamp);
+
+ if (log.isDebugEnabled())
+ log.debug("Message has been sent to address [msg=" + msg + ", addr=" + addr +
+ ", rmtNodeId=" + rmtNodeId + ']');
+
+ return new T2<>(sock, spi.readReceipt(sock, ackTimeout0));
}
+ catch (IOException | IgniteCheckedException e) {
+ U.closeQuiet(sock);
- if (addrs.isEmpty()) {
- U.warn(log, "Failed to connect to any address from IP finder (will retry to join topology " +
- "in 2000ms): " + addrs0);
+ if (log.isDebugEnabled())
+ log.error("Exception on joining: " + e.getMessage(), e);
- if (spi.joinTimeout > 0 && (U.currentTimeMillis() - startTime) > spi.joinTimeout)
- return null;
+ onException("Exception on joining: " + e.getMessage(), e);
- Thread.sleep(2000);
+ if (errs == null)
+ errs = new ArrayList<>();
+
+ errs.add(e);
+
+ if (!openSock) {
+ // Reconnect for the second time, if connection is not established.
+ if (connectAttempts < 2) {
+ connectAttempts++;
+
+ continue;
+ }
+
+ break; // Don't retry if we can not establish connection.
+ }
+
+ if (e instanceof SocketTimeoutException || X.hasCause(e, SocketTimeoutException.class)) {
+ ackTimeout0 *= 2;
+
+ if (!checkAckTimeout(ackTimeout0))
+ break;
+ }
}
}
+
+ if (log.isDebugEnabled())
+ log.debug("Failed to join to address [addr=" + addr + ", recon=" + recon + ", errs=" + errs + ']');
+
+ return null;
}
+
/**
* @param topVer New topology version.
* @return Latest topology snapshot.
@@ -493,33 +577,6 @@ class ClientImpl extends TcpDiscoveryImpl {
return allNodes;
}
- /**
- * @param addr Address.
- * @return Remote node ID.
- * @throws IOException In case of I/O error.
- * @throws IgniteCheckedException In case of other error.
- */
- private IgniteBiTuple<Socket, UUID> initConnection(InetSocketAddress addr) throws IOException, IgniteCheckedException {
- assert addr != null;
-
- Socket sock = spi.openSocket(addr);
-
- TcpDiscoveryHandshakeRequest req = new TcpDiscoveryHandshakeRequest(getLocalNodeId());
-
- req.client(true);
-
- spi.writeToSocket(sock, req);
-
- TcpDiscoveryHandshakeResponse res = spi.readMessage(sock, null, spi.ackTimeout);
-
- UUID nodeId = res.creatorNodeId();
-
- assert nodeId != null;
- assert !getLocalNodeId().equals(nodeId);
-
- return F.t(sock, nodeId);
- }
-
/** {@inheritDoc} */
@Override void simulateNodeFailure() {
U.warn(log, "Simulating client node failure: " + getLocalNodeId());
@@ -736,7 +793,7 @@ class ClientImpl extends TcpDiscoveryImpl {
}
/**
- *
+ * @return {@code True} if connection is alive.
*/
public boolean isOnline() {
synchronized (mux) {
@@ -780,7 +837,8 @@ class ClientImpl extends TcpDiscoveryImpl {
}
catch (IOException e) {
if (log.isDebugEnabled())
- U.error(log, "Failed to send node left message (will stop anyway) [sock=" + sock + ']', e);
+ U.error(log, "Failed to send node left message (will stop anyway) " +
+ "[sock=" + sock + ", msg=" + msg + ']', e);
U.closeQuiet(sock);
@@ -909,7 +967,7 @@ class ClientImpl extends TcpDiscoveryImpl {
final Socket sock = joinTopology(false);
if (sock == null) {
- joinErr = new IgniteSpiException("Join process timed out");
+ joinErr = new IgniteSpiException("Join process timed out.");
joinLatch.countDown();
@@ -934,8 +992,9 @@ class ClientImpl extends TcpDiscoveryImpl {
if (msg == JOIN_TIMEOUT) {
if (joinLatch.getCount() > 0) {
- joinErr = new IgniteSpiException("Join process timed out [sock=" + sock +
- ", timeout=" + spi.netTimeout + ']');
+ joinErr = new IgniteSpiException("Join process timed out, did not receive response for " +
+ "join request (consider increasing 'networkTimeout' configuration property) " +
+ "[networkTimeout=" + spi.netTimeout + ", sock=" + sock +']');
joinLatch.countDown();
@@ -1027,7 +1086,7 @@ class ClientImpl extends TcpDiscoveryImpl {
if (joinLatch.getCount() > 0) {
// This should not occurs.
- joinErr = new IgniteSpiException("Some error occurs in joinig process");
+ joinErr = new IgniteSpiException("Some error occur in join process.");
joinLatch.countDown();
}
@@ -1236,8 +1295,9 @@ class ClientImpl extends TcpDiscoveryImpl {
if (spi.getSpiContext().isStopping()) {
if (!getLocalNodeId().equals(msg.creatorNodeId()) && getLocalNodeId().equals(msg.failedNodeId())) {
if (leaveLatch.getCount() > 0) {
- log.debug("Remote node fail this node while node is stopping [locNode=" + getLocalNodeId()
- + ", rmtNode=" + msg.creatorNodeId() + ']');
+ if (log.isDebugEnabled())
+ log.debug("Remote node fail this node while node is stopping [locNode=" + getLocalNodeId()
+ + ", rmtNode=" + msg.creatorNodeId() + ']');
leaveLatch.countDown();
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/de0d61f6/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
index 5aceaae..311c783 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
@@ -125,16 +125,6 @@ class ServerImpl extends TcpDiscoveryImpl {
private final ConcurrentMap<InetSocketAddress, IgniteInternalFuture<IgniteBiTuple<UUID, Boolean>>> pingMap =
new ConcurrentHashMap8<>();
- /** Debug mode. */
- private boolean debugMode;
-
- /** Debug messages history. */
- private int debugMsgHist = 512;
-
- /** Received messages. */
- @SuppressWarnings("FieldAccessedSynchronizedAndUnsynchronized")
- private ConcurrentLinkedDeque<String> debugLog;
-
/**
* @param adapter Adapter.
*/
@@ -142,24 +132,6 @@ class ServerImpl extends TcpDiscoveryImpl {
super(adapter);
}
- /**
- * This method is intended for troubleshooting purposes only.
- *
- * @param debugMode {code True} to start SPI in debug mode.
- */
- public void setDebugMode(boolean debugMode) {
- this.debugMode = debugMode;
- }
-
- /**
- * This method is intended for troubleshooting purposes only.
- *
- * @param debugMsgHist Message history log size.
- */
- public void setDebugMessageHistory(int debugMsgHist) {
- this.debugMsgHist = debugMsgHist;
- }
-
/** {@inheritDoc} */
@Override public String getSpiState() {
synchronized (mux) {
@@ -1060,23 +1032,6 @@ class ServerImpl extends TcpDiscoveryImpl {
}
/**
- * @param ackTimeout Acknowledgement timeout.
- * @return {@code True} if acknowledgement timeout is less or equal to
- * maximum acknowledgement timeout, {@code false} otherwise.
- */
- private boolean checkAckTimeout(long ackTimeout) {
- if (ackTimeout > spi.maxAckTimeout) {
- LT.warn(log, null, "Acknowledgement timeout is greater than maximum acknowledgement timeout " +
- "(consider increasing 'maxAckTimeout' configuration property) " +
- "[ackTimeout=" + ackTimeout + ", maxAckTimeout=" + spi.maxAckTimeout + ']');
-
- return false;
- }
-
- return true;
- }
-
- /**
* Notify external listener on discovery event.
*
* @param type Discovery event type. See {@link org.apache.ignite.events.DiscoveryEvent} for more details.
@@ -1422,25 +1377,6 @@ class ServerImpl extends TcpDiscoveryImpl {
/**
* @param msg Message.
- */
- private void debugLog(String msg) {
- assert debugMode;
-
- String msg0 = new SimpleDateFormat("[HH:mm:ss,SSS]").format(new Date(System.currentTimeMillis())) +
- '[' + Thread.currentThread().getName() + "][" + getLocalNodeId() +
- "-" + locNode.internalOrder() + "] " +
- msg;
-
- debugLog.add(msg0);
-
- int delta = debugLog.size() - debugMsgHist;
-
- for (int i = 0; i < delta && debugLog.size() > debugMsgHist; i++)
- debugLog.poll();
- }
-
- /**
- * @param msg Message.
* @return {@code True} if recordable in debug mode.
*/
private boolean recordable(TcpDiscoveryAbstractMessage msg) {
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/de0d61f6/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java
index b7e9e53..94097c9 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java
@@ -26,7 +26,9 @@ import org.apache.ignite.spi.discovery.*;
import org.apache.ignite.spi.discovery.tcp.internal.*;
import org.jetbrains.annotations.*;
+import java.text.*;
import java.util.*;
+import java.util.concurrent.*;
/**
*
@@ -50,6 +52,16 @@ abstract class TcpDiscoveryImpl {
/** */
protected TcpDiscoveryNode locNode;
+ /** Debug mode. */
+ protected boolean debugMode;
+
+ /** Debug messages history. */
+ private int debugMsgHist = 512;
+
+ /** Received messages. */
+ @SuppressWarnings("FieldAccessedSynchronizedAndUnsynchronized")
+ protected ConcurrentLinkedDeque<String> debugLog;
+
/**
* @param spi Adapter.
*/
@@ -60,6 +72,43 @@ abstract class TcpDiscoveryImpl {
}
/**
+ * This method is intended for troubleshooting purposes only.
+ *
+ * @param debugMode {code True} to start SPI in debug mode.
+ */
+ public void setDebugMode(boolean debugMode) {
+ this.debugMode = debugMode;
+ }
+
+ /**
+ * This method is intended for troubleshooting purposes only.
+ *
+ * @param debugMsgHist Message history log size.
+ */
+ public void setDebugMessageHistory(int debugMsgHist) {
+ this.debugMsgHist = debugMsgHist;
+ }
+
+ /**
+ * @param msg Message.
+ */
+ protected void debugLog(String msg) {
+ assert debugMode;
+
+ String msg0 = new SimpleDateFormat("[HH:mm:ss,SSS]").format(new Date(System.currentTimeMillis())) +
+ '[' + Thread.currentThread().getName() + "][" + getLocalNodeId() +
+ "-" + locNode.internalOrder() + "] " +
+ msg;
+
+ debugLog.add(msg0);
+
+ int delta = debugLog.size() - debugMsgHist;
+
+ for (int i = 0; i < delta && debugLog.size() > debugMsgHist; i++)
+ debugLog.poll();
+ }
+
+ /**
* @return Local node ID.
*/
public UUID getLocalNodeId() {
@@ -209,4 +258,21 @@ abstract class TcpDiscoveryImpl {
}
}
}
+
+ /**
+ * @param ackTimeout Acknowledgement timeout.
+ * @return {@code True} if acknowledgement timeout is less or equal to
+ * maximum acknowledgement timeout, {@code false} otherwise.
+ */
+ protected boolean checkAckTimeout(long ackTimeout) {
+ if (ackTimeout > spi.maxAckTimeout) {
+ LT.warn(log, null, "Acknowledgement timeout is greater than maximum acknowledgement timeout " +
+ "(consider increasing 'maxAckTimeout' configuration property) " +
+ "[ackTimeout=" + ackTimeout + ", maxAckTimeout=" + spi.maxAckTimeout + ']');
+
+ return false;
+ }
+
+ return true;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/de0d61f6/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheManyClientsTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheManyClientsTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheManyClientsTest.java
index 24ebb7c..77ddd40 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheManyClientsTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheManyClientsTest.java
@@ -18,15 +18,17 @@
package org.apache.ignite.internal.processors.cache.distributed;
import org.apache.ignite.*;
-import org.apache.ignite.cache.*;
import org.apache.ignite.configuration.*;
import org.apache.ignite.internal.*;
+import org.apache.ignite.internal.util.typedef.*;
+import org.apache.ignite.spi.communication.tcp.*;
import org.apache.ignite.spi.discovery.tcp.*;
import org.apache.ignite.spi.discovery.tcp.ipfinder.*;
import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*;
import org.apache.ignite.testframework.*;
import org.apache.ignite.testframework.junits.common.*;
+import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.*;
@@ -54,6 +56,12 @@ public class IgniteCacheManyClientsTest extends GridCommonAbstractTest {
@Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
IgniteConfiguration cfg = super.getConfiguration(gridName);
+ cfg.setConnectorConfiguration(null);
+ cfg.setPeerClassLoadingEnabled(false);
+ cfg.setTimeServerPortRange(200);
+
+ ((TcpCommunicationSpi)cfg.getCommunicationSpi()).setLocalPortRange(200);
+
((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(ipFinder);
if (!clientDiscovery)
@@ -61,6 +69,12 @@ public class IgniteCacheManyClientsTest extends GridCommonAbstractTest {
cfg.setClientMode(client);
+ if (client) {
+// cfg.setPublicThreadPoolSize(1);
+// cfg.setPeerClassLoadingThreadPoolSize(1);
+// cfg.setIgfsThreadPoolSize(1);
+ }
+
CacheConfiguration ccfg = new CacheConfiguration();
ccfg.setCacheMode(PARTITIONED);
@@ -85,6 +99,11 @@ public class IgniteCacheManyClientsTest extends GridCommonAbstractTest {
stopAllGrids();
}
+ /** {@inheritDoc} */
+ @Override protected long getTestTimeout() {
+ return 10 * 60_000;
+ }
+
/**
* @throws Exception If failed.
*/
@@ -104,6 +123,66 @@ public class IgniteCacheManyClientsTest extends GridCommonAbstractTest {
/**
* @throws Exception If failed.
*/
+ public void testManyClientsSequentiallyClientDiscovery() throws Exception {
+ clientDiscovery = true;
+
+ manyClientsSequentially();
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ private void manyClientsSequentially() throws Exception {
+ client = true;
+
+ List<Ignite> clients = new ArrayList<>();
+
+ final int CLIENTS = 50;
+
+ int idx = SRVS;
+
+ ThreadLocalRandom rnd = ThreadLocalRandom.current();
+
+ for (int i = 0; i < CLIENTS; i++) {
+ Ignite ignite = startGrid(idx++);
+
+ log.info("Started node: " + ignite.name());
+
+ assertTrue(ignite.configuration().isClientMode());
+
+ clients.add(ignite);
+
+ IgniteCache<Object, Object> cache = ignite.cache(null);
+
+ Integer key = rnd.nextInt(0, 1000);
+
+ cache.put(key, i);
+
+ assertNotNull(cache.get(key));
+ }
+
+ log.info("All clients started.");
+
+ assertEquals(SRVS + CLIENTS, G.allGrids().size());
+
+ long topVer = -1L;
+
+ for (Ignite ignite : G.allGrids()) {
+ assertEquals(SRVS + CLIENTS, ignite.cluster().nodes().size());
+
+ if (topVer == -1L)
+ topVer = ignite.cluster().topologyVersion();
+ else
+ assertEquals(topVer, ignite.cluster().topologyVersion());
+ }
+
+ for (Ignite client : clients)
+ client.close();
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
private void manyClientsPutGet() throws Exception {
client = true;
@@ -111,7 +190,7 @@ public class IgniteCacheManyClientsTest extends GridCommonAbstractTest {
final AtomicBoolean stop = new AtomicBoolean();
- final int THREADS = 30;
+ final int THREADS = 50;
final CountDownLatch latch = new CountDownLatch(THREADS);
@@ -143,6 +222,8 @@ public class IgniteCacheManyClientsTest extends GridCommonAbstractTest {
cache.put(key, iter++);
assertNotNull(cache.get(key));
+
+ Thread.sleep(1);
}
log.info("Stopping node: " + ignite.name());
@@ -154,6 +235,8 @@ public class IgniteCacheManyClientsTest extends GridCommonAbstractTest {
latch.await();
+ log.info("All clients started.");
+
Thread.sleep(10_000);
log.info("Stop clients.");