You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2015/10/30 16:42:28 UTC
[13/14] ignite git commit: ignite-1758 Fixed issues with client
reconnect handling
ignite-1758 Fixed issues with client reconnect handling
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/6ea3b562
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/6ea3b562
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/6ea3b562
Branch: refs/heads/ignite-1093-3
Commit: 6ea3b56205de19ceac89762d9c20c3fe62ab13b9
Parents: 04964b9
Author: sboikov <sb...@gridgain.com>
Authored: Fri Oct 30 16:33:40 2015 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Fri Oct 30 16:33:40 2015 +0300
----------------------------------------------------------------------
.../apache/ignite/IgniteSystemProperties.java | 3 +
.../apache/ignite/internal/IgniteKernal.java | 14 +-
.../processors/cache/GridCacheProcessor.java | 77 +++--
.../dht/preloader/GridDhtPreloader.java | 4 +-
.../CacheObjectPortableProcessorImpl.java | 9 +
.../util/nio/GridNioRecoveryDescriptor.java | 11 +-
.../communication/tcp/TcpCommunicationSpi.java | 40 ++-
.../ignite/spi/discovery/tcp/ClientImpl.java | 205 ++++++++-----
.../ignite/spi/discovery/tcp/ServerImpl.java | 213 +++++++++----
.../messages/TcpDiscoveryAbstractMessage.java | 11 +
.../messages/TcpDiscoveryNodeAddedMessage.java | 39 +++
.../IgniteClientReconnectCacheTest.java | 33 ++
.../cache/GridCacheAbstractFullApiSelfTest.java | 3 +
.../CacheGetFutureHangsSelfTest.java | 8 +
.../IgniteCacheClientReconnectTest.java | 2 +
.../distributed/IgniteCacheManyClientsTest.java | 14 +-
...gniteClientReconnectMassiveShutdownTest.java | 303 +++++++++++++++++++
.../tcp/TcpDiscoveryMultiThreadedTest.java | 285 +++++++++++++----
18 files changed, 1021 insertions(+), 253 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/6ea3b562/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java b/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
index 1e7d002..de7c10b 100644
--- a/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
+++ b/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
@@ -355,6 +355,9 @@ public final class IgniteSystemProperties {
/** Maximum size for affinity assignment history. */
public static final String IGNITE_AFFINITY_HISTORY_SIZE = "IGNITE_AFFINITY_HISTORY_SIZE";
+ /** Maximum size for discovery messages history. */
+ public static final String IGNITE_DISCOVERY_HISTORY_SIZE = "IGNITE_DISCOVERY_HISTORY_SIZE";
+
/** Number of cache operation retries in case of topology exceptions. */
public static final String IGNITE_CACHE_RETRIES_COUNT = "IGNITE_CACHE_RETRIES_COUNT";
http://git-wip-us.apache.org/repos/asf/ignite/blob/6ea3b562/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
index 4820a93..5a0fe16 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
@@ -165,6 +165,7 @@ import org.apache.ignite.plugin.PluginNotFoundException;
import org.apache.ignite.plugin.PluginProvider;
import org.apache.ignite.spi.IgniteSpi;
import org.apache.ignite.spi.IgniteSpiVersionCheckException;
+import org.apache.ignite.spi.discovery.tcp.internal.TcpDiscoveryNode;
import org.jetbrains.annotations.Nullable;
import static org.apache.ignite.IgniteSystemProperties.IGNITE_CONFIG_URL;
@@ -3158,10 +3159,17 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable {
/** {@inheritDoc} */
public void dumpDebugInfo() {
- U.warn(log, "Dumping debug info for node [id=" + ctx.localNodeId() +
+ boolean client = ctx.clientNode();
+
+ ClusterNode locNode = ctx.discovery().localNode();
+
+ UUID routerId = locNode instanceof TcpDiscoveryNode ? ((TcpDiscoveryNode)locNode).clientRouterNodeId() : null;
+
+ U.warn(log, "Dumping debug info for node [id=" + locNode.id() +
", name=" + ctx.gridName() +
- ", order=" + ctx.discovery().localNode().order() +
- ", client=" + ctx.clientNode() + ']');
+ ", order=" + locNode.order() +
+ ", client=" + client +
+ (client && routerId != null ? ", routerId=" + routerId : "") + ']');
ctx.cache().context().exchange().dumpDebugInfo();
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/6ea3b562/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
index 5bf4ac7..301e7d3 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
@@ -1803,61 +1803,80 @@ public class GridCacheProcessor extends GridProcessorAdapter {
/** {@inheritDoc} */
@Nullable @Override public Serializable collectDiscoveryData(UUID nodeId) {
+ boolean reconnect = ctx.localNodeId().equals(nodeId) && cachesOnDisconnect != null;
+
// Collect dynamically started caches to a single object.
- Collection<DynamicCacheChangeRequest> reqs =
- new ArrayList<>(registeredCaches.size() + registeredTemplates.size());
+ Collection<DynamicCacheChangeRequest> reqs;
- boolean reconnect = ctx.localNodeId().equals(nodeId) && cachesOnDisconnect != null;
+ Map<String, Map<UUID, Boolean>> clientNodesMap;
- Map<String, DynamicCacheDescriptor> descs = reconnect ? cachesOnDisconnect : registeredCaches;
+ if (reconnect) {
+ reqs = new ArrayList<>(caches.size());
- for (DynamicCacheDescriptor desc : descs.values()) {
- DynamicCacheChangeRequest req = new DynamicCacheChangeRequest(desc.cacheConfiguration().getName(), null);
+ clientNodesMap = U.newHashMap(caches.size());
- req.startCacheConfiguration(desc.cacheConfiguration());
+ for (GridCacheAdapter<?, ?> cache : caches.values()) {
+ DynamicCacheDescriptor desc = cachesOnDisconnect.get(maskNull(cache.name()));
- req.cacheType(desc.cacheType());
+ if (desc == null)
+ continue;
- req.deploymentId(desc.deploymentId());
+ DynamicCacheChangeRequest req = new DynamicCacheChangeRequest(cache.name(), null);
- reqs.add(req);
- }
+ req.startCacheConfiguration(desc.cacheConfiguration());
- for (DynamicCacheDescriptor desc : registeredTemplates.values()) {
- DynamicCacheChangeRequest req = new DynamicCacheChangeRequest(desc.cacheConfiguration().getName(), null);
+ req.cacheType(desc.cacheType());
- req.startCacheConfiguration(desc.cacheConfiguration());
+ req.deploymentId(desc.deploymentId());
- req.template(true);
+ reqs.add(req);
- req.deploymentId(desc.deploymentId());
+ Boolean nearEnabled = cache.isNear();
+
+ Map<UUID, Boolean> map = U.newHashMap(1);
+
+ map.put(nodeId, nearEnabled);
- reqs.add(req);
+ clientNodesMap.put(cache.name(), map);
+ }
}
+ else {
+ reqs = new ArrayList<>(registeredCaches.size() + registeredTemplates.size());
- DynamicCacheChangeBatch req = new DynamicCacheChangeBatch(reqs);
+ for (DynamicCacheDescriptor desc : registeredCaches.values()) {
+ DynamicCacheChangeRequest req = new DynamicCacheChangeRequest(desc.cacheConfiguration().getName(), null);
- Map<String, Map<UUID, Boolean>> clientNodesMap = ctx.discovery().clientNodesMap();
+ req.startCacheConfiguration(desc.cacheConfiguration());
- if (reconnect) {
- clientNodesMap = U.newHashMap(caches.size());
+ req.cacheType(desc.cacheType());
- for (GridCacheAdapter<?, ?> cache : caches.values()) {
- Boolean nearEnabled = cache.isNear();
+ req.deploymentId(desc.deploymentId());
- Map<UUID, Boolean> map = U.newHashMap(1);
+ reqs.add(req);
+ }
- map.put(nodeId, nearEnabled);
+ for (DynamicCacheDescriptor desc : registeredTemplates.values()) {
+ DynamicCacheChangeRequest req = new DynamicCacheChangeRequest(desc.cacheConfiguration().getName(), null);
- clientNodesMap.put(cache.name(), map);
+ req.startCacheConfiguration(desc.cacheConfiguration());
+
+ req.template(true);
+
+ req.deploymentId(desc.deploymentId());
+
+ reqs.add(req);
}
+
+ clientNodesMap = ctx.discovery().clientNodesMap();
}
- req.clientNodes(clientNodesMap);
+ DynamicCacheChangeBatch batch = new DynamicCacheChangeBatch(reqs);
+
+ batch.clientNodes(clientNodesMap);
- req.clientReconnect(reconnect);
+ batch.clientReconnect(reconnect);
- return req;
+ return batch;
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/ignite/blob/6ea3b562/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
index 83867f4..356a85b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
@@ -192,9 +192,7 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter {
ClusterNode loc = cctx.localNode();
- long startTime = loc.metrics().getStartTime();
-
- assert startTime > 0;
+ assert loc.metrics().getStartTime() > 0;
final long startTopVer = loc.order();
http://git-wip-us.apache.org/repos/asf/ignite/blob/6ea3b562/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/portable/CacheObjectPortableProcessorImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/portable/CacheObjectPortableProcessorImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/portable/CacheObjectPortableProcessorImpl.java
index 2de9d84..f0319aa 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/portable/CacheObjectPortableProcessorImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/portable/CacheObjectPortableProcessorImpl.java
@@ -39,6 +39,8 @@ import javax.cache.processor.EntryProcessor;
import javax.cache.processor.MutableEntry;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
+import org.apache.ignite.cluster.ClusterTopologyException;
+import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
import org.apache.ignite.internal.portable.api.IgnitePortables;
import org.apache.ignite.cache.CacheEntryEventSerializableFilter;
import org.apache.ignite.cluster.ClusterNode;
@@ -75,6 +77,7 @@ import org.apache.ignite.internal.util.lang.GridMapEntry;
import org.apache.ignite.internal.util.tostring.GridToStringExclude;
import org.apache.ignite.internal.util.typedef.C1;
import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.X;
import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
@@ -371,6 +374,12 @@ public class CacheObjectPortableProcessorImpl extends IgniteCacheObjectProcessor
else
throw e;
}
+ catch (CacheException e) {
+ if (X.hasCause(e, ClusterTopologyCheckedException.class, ClusterTopologyException.class))
+ continue;
+ else
+ throw e;
+ }
break;
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/6ea3b562/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioRecoveryDescriptor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioRecoveryDescriptor.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioRecoveryDescriptor.java
index 88837de..5647239 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioRecoveryDescriptor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioRecoveryDescriptor.java
@@ -193,14 +193,19 @@ public class GridNioRecoveryDescriptor {
/**
* Node left callback.
+ *
+ * @return {@code False} if descriptor is reserved.
*/
- public void onNodeLeft() {
+ public boolean onNodeLeft() {
GridNioFuture<?>[] futs = null;
synchronized (this) {
nodeLeft = true;
- if (!reserved && !msgFuts.isEmpty()) {
+ if (reserved)
+ return false;
+
+ if (!msgFuts.isEmpty()) {
futs = msgFuts.toArray(new GridNioFuture<?>[msgFuts.size()]);
msgFuts.clear();
@@ -209,6 +214,8 @@ public class GridNioRecoveryDescriptor {
if (futs != null)
completeOnNodeLeft(futs);
+
+ return true;
}
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/6ea3b562/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
index 5ea2c02..e8bd8a1 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
@@ -61,6 +61,7 @@ import org.apache.ignite.events.Event;
import org.apache.ignite.internal.IgniteClientDisconnectedCheckedException;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.IgniteInterruptedCheckedException;
+import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener;
import org.apache.ignite.internal.util.GridConcurrentFactory;
import org.apache.ignite.internal.util.GridSpinReadWriteLock;
@@ -1358,7 +1359,9 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
/** {@inheritDoc} */
@Override public int getOutboundMessagesQueueSize() {
- return nioSrvr.outboundMessagesQueueSize();
+ GridNioServer<Message> srv = nioSrvr;
+
+ return srv != null ? srv.outboundMessagesQueueSize() : 0;
}
/** {@inheritDoc} */
@@ -1870,25 +1873,25 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
*
* @param node Destination node.
* @param msg Message to send.
- * @param ackClosure Ack closure.
+ * @param ackC Ack closure.
* @throws org.apache.ignite.spi.IgniteSpiException Thrown in case of any error during sending the message.
* Note that this is not guaranteed that failed communication will result
* in thrown exception as this is dependant on SPI implementation.
*/
- public void sendMessage(ClusterNode node, Message msg, IgniteInClosure<IgniteException> ackClosure)
+ public void sendMessage(ClusterNode node, Message msg, IgniteInClosure<IgniteException> ackC)
throws IgniteSpiException {
- sendMessage0(node, msg, ackClosure);
+ sendMessage0(node, msg, ackC);
}
/**
* @param node Destination node.
* @param msg Message to send.
- * @param ackClosure Ack closure.
+ * @param ackC Ack closure.
* @throws org.apache.ignite.spi.IgniteSpiException Thrown in case of any error during sending the message.
* Note that this is not guaranteed that failed communication will result
* in thrown exception as this is dependant on SPI implementation.
*/
- private void sendMessage0(ClusterNode node, Message msg, IgniteInClosure<IgniteException> ackClosure)
+ private void sendMessage0(ClusterNode node, Message msg, IgniteInClosure<IgniteException> ackC)
throws IgniteSpiException {
assert node != null;
assert msg != null;
@@ -1896,13 +1899,13 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
if (log.isTraceEnabled())
log.trace("Sending message with ack to node [node=" + node + ", msg=" + msg + ']');
- ClusterNode localNode = getLocalNode();
+ ClusterNode locNode = getLocalNode();
- if (localNode == null)
+ if (locNode == null)
throw new IgniteSpiException("Local node has not been started or fully initialized " +
"[isStopping=" + getSpiContext().isStopping() + ']');
- if (node.id().equals(localNode.id()))
+ if (node.id().equals(locNode.id()))
notifyListener(node.id(), msg, NOOP);
else {
GridCommunicationClient client = null;
@@ -1915,10 +1918,10 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
UUID nodeId = null;
- if (!client.async() && !localNode.version().equals(node.version()))
+ if (!client.async() && !locNode.version().equals(node.version()))
nodeId = node.id();
- retry = client.sendMessage(nodeId, msg, ackClosure);
+ retry = client.sendMessage(nodeId, msg, ackC);
client.release();
@@ -2292,6 +2295,15 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
return null;
}
+ if (getSpiContext().node(node.id()) == null) {
+ recoveryDesc.release();
+
+ U.closeQuiet(ch);
+
+ throw new ClusterTopologyCheckedException("Failed to send message, " +
+ "node left cluster: " + node);
+ }
+
long rcvCnt = -1;
SSLEngine sslEngine = null;
@@ -3100,10 +3112,10 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
assert !left.isEmpty();
for (ClientKey id : left) {
- GridNioRecoveryDescriptor recoverySnd = recoveryDescs.remove(id);
+ GridNioRecoveryDescriptor recoverySnd = recoveryDescs.get(id);
- if (recoverySnd != null)
- recoverySnd.onNodeLeft();
+ if (recoverySnd != null && recoverySnd.onNodeLeft())
+ recoveryDescs.remove(id);
}
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/6ea3b562/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
index e4c29db..a4619c6 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
@@ -270,8 +270,6 @@ class ClientImpl extends TcpDiscoveryImpl {
/** {@inheritDoc} */
@Override public void spiStop() throws IgniteSpiException {
- timer.cancel();
-
if (msgWorker != null && msgWorker.isAlive()) { // Should always be alive
msgWorker.addMessage(SPI_STOP);
@@ -297,6 +295,8 @@ class ClientImpl extends TcpDiscoveryImpl {
U.join(sockWriter, log);
U.join(sockReader, log);
+ timer.cancel();
+
spi.printStopInfo();
}
@@ -461,7 +461,8 @@ class ClientImpl extends TcpDiscoveryImpl {
* @see TcpDiscoverySpi#joinTimeout
*/
@SuppressWarnings("BusyWait")
- @Nullable private T2<Socket, Boolean> joinTopology(boolean recon, long timeout) throws IgniteSpiException, InterruptedException {
+ @Nullable private T2<SocketStream, Boolean> joinTopology(boolean recon, long timeout)
+ throws IgniteSpiException, InterruptedException {
Collection<InetSocketAddress> addrs = null;
long startTime = U.currentTimeMillis();
@@ -501,7 +502,7 @@ class ClientImpl extends TcpDiscoveryImpl {
InetSocketAddress addr = it.next();
- T3<Socket, Integer, Boolean> sockAndRes = sendJoinRequest(recon, addr);
+ T3<SocketStream, Integer, Boolean> sockAndRes = sendJoinRequest(recon, addr);
if (sockAndRes == null) {
it.remove();
@@ -511,11 +512,11 @@ class ClientImpl extends TcpDiscoveryImpl {
assert sockAndRes.get1() != null && sockAndRes.get2() != null : sockAndRes;
- Socket sock = sockAndRes.get1();
+ Socket sock = sockAndRes.get1().socket();
switch (sockAndRes.get2()) {
case RES_OK:
- return new T2<>(sock, sockAndRes.get3());
+ return new T2<>(sockAndRes.get1(), sockAndRes.get3());
case RES_CONTINUE_JOIN:
case RES_WAIT:
@@ -548,7 +549,7 @@ class ClientImpl extends TcpDiscoveryImpl {
* @param addr Address.
* @return Socket, connect response and client acknowledge support flag.
*/
- @Nullable private T3<Socket, Integer, Boolean> sendJoinRequest(boolean recon, InetSocketAddress addr) {
+ @Nullable private T3<SocketStream, Integer, Boolean> sendJoinRequest(boolean recon, InetSocketAddress addr) {
assert addr != null;
if (log.isDebugEnabled())
@@ -621,7 +622,8 @@ class ClientImpl extends TcpDiscoveryImpl {
log.debug("Message has been sent to address [msg=" + msg + ", addr=" + addr +
", rmtNodeId=" + rmtNodeId + ']');
- return new T3<>(sock, spi.readReceipt(sock, timeoutHelper.nextTimeoutChunk(ackTimeout0)),
+ return new T3<>(new SocketStream(sock),
+ spi.readReceipt(sock, timeoutHelper.nextTimeoutChunk(ackTimeout0)),
res.clientAck());
}
catch (IOException | IgniteCheckedException e) {
@@ -708,7 +710,7 @@ class ClientImpl extends TcpDiscoveryImpl {
Collection<ClusterNode> top = topHist.get(topVer);
- assert top != null : msg;
+ assert top != null : "Failed to find topology history [msg=" + msg + ", hist=" + topHist + ']';
return top;
}
@@ -765,7 +767,10 @@ class ClientImpl extends TcpDiscoveryImpl {
/** {@inheritDoc} */
@Override public void brakeConnection() {
- U.closeQuiet(msgWorker.currSock);
+ SocketStream sockStream = msgWorker.currSock;
+
+ if (sockStream != null)
+ U.closeQuiet(sockStream.socket());
}
/** {@inheritDoc} */
@@ -826,7 +831,7 @@ class ClientImpl extends TcpDiscoveryImpl {
private final Object mux = new Object();
/** */
- private Socket sock;
+ private SocketStream sockStream;
/** */
private UUID rmtNodeId;
@@ -838,12 +843,12 @@ class ClientImpl extends TcpDiscoveryImpl {
}
/**
- * @param sock Socket.
+ * @param sockStream Socket.
* @param rmtNodeId Rmt node id.
*/
- public void setSocket(Socket sock, UUID rmtNodeId) {
+ public void setSocket(SocketStream sockStream, UUID rmtNodeId) {
synchronized (mux) {
- this.sock = sock;
+ this.sockStream = sockStream;
this.rmtNodeId = rmtNodeId;
@@ -854,22 +859,24 @@ class ClientImpl extends TcpDiscoveryImpl {
/** {@inheritDoc} */
@Override protected void body() throws InterruptedException {
while (!isInterrupted()) {
- Socket sock;
+ SocketStream sockStream;
UUID rmtNodeId;
synchronized (mux) {
- if (this.sock == null) {
+ if (this.sockStream == null) {
mux.wait();
continue;
}
- sock = this.sock;
+ sockStream = this.sockStream;
rmtNodeId = this.rmtNodeId;
}
+ Socket sock = sockStream.socket();
+
try {
- InputStream in = new BufferedInputStream(sock.getInputStream());
+ InputStream in = sockStream.stream();
sock.setKeepAlive(true);
sock.setTcpNoDelay(true);
@@ -912,18 +919,14 @@ class ClientImpl extends TcpDiscoveryImpl {
boolean ack = msg instanceof TcpDiscoveryClientAckResponse;
- if (!ack) {
- if (spi.ensured(msg) && joinLatch.getCount() == 0L)
- lastMsgId = msg.id();
-
+ if (!ack)
msgWorker.addMessage(msg);
- }
else
sockWriter.ackReceived((TcpDiscoveryClientAckResponse)msg);
}
}
catch (IOException e) {
- msgWorker.addMessage(new SocketClosedMessage(sock));
+ msgWorker.addMessage(new SocketClosedMessage(sockStream));
if (log.isDebugEnabled())
U.error(log, "Connection failed [sock=" + sock + ", locNodeId=" + getLocalNodeId() + ']', e);
@@ -932,8 +935,8 @@ class ClientImpl extends TcpDiscoveryImpl {
U.closeQuiet(sock);
synchronized (mux) {
- if (this.sock == sock) {
- this.sock = null;
+ if (this.sockStream == sockStream) {
+ this.sockStream = null;
this.rmtNodeId = null;
}
}
@@ -1125,7 +1128,7 @@ class ClientImpl extends TcpDiscoveryImpl {
*/
private class Reconnector extends IgniteSpiThread {
/** */
- private volatile Socket sock;
+ private volatile SocketStream sockStream;
/** */
private boolean clientAck;
@@ -1148,7 +1151,10 @@ class ClientImpl extends TcpDiscoveryImpl {
public void cancel() {
interrupt();
- U.closeQuiet(sock);
+ SocketStream sockStream = this.sockStream;
+
+ if (sockStream != null)
+ U.closeQuiet(sockStream.socket());
}
/** {@inheritDoc} */
@@ -1166,24 +1172,26 @@ class ClientImpl extends TcpDiscoveryImpl {
try {
while (true) {
- T2<Socket, Boolean> joinRes = joinTopology(true, timeout);
+ T2<SocketStream, Boolean> joinRes = joinTopology(true, timeout);
if (joinRes == null) {
if (join) {
joinError(new IgniteSpiException("Join process timed out, connection failed and " +
"failed to reconnect (consider increasing 'joinTimeout' configuration property) " +
- "[joinTimeout=" + spi.joinTimeout + ", sock=" + sock + ']'));
+ "[joinTimeout=" + spi.joinTimeout + ']'));
}
else
U.error(log, "Failed to reconnect to cluster (consider increasing 'networkTimeout'" +
- " configuration property) [networkTimeout=" + spi.netTimeout + ", sock=" + sock + ']');
+ " configuration property) [networkTimeout=" + spi.netTimeout + ']');
return;
}
- sock = joinRes.get1();
+ sockStream = joinRes.get1();
clientAck = joinRes.get2();
+ Socket sock = sockStream.socket();
+
if (isInterrupted())
throw new InterruptedException();
@@ -1194,7 +1202,7 @@ class ClientImpl extends TcpDiscoveryImpl {
sock.setSoTimeout((int)spi.netTimeout);
- InputStream in = new BufferedInputStream(sock.getInputStream());
+ InputStream in = sockStream.stream();
sock.setKeepAlive(true);
sock.setTcpNoDelay(true);
@@ -1264,11 +1272,16 @@ class ClientImpl extends TcpDiscoveryImpl {
catch (IOException | IgniteCheckedException e) {
err = e;
+ success = false;
+
U.error(log, "Failed to reconnect", e);
}
finally {
if (!success) {
- U.closeQuiet(sock);
+ SocketStream sockStream = this.sockStream;
+
+ if (sockStream != null)
+ U.closeQuiet(sockStream.socket());
if (join)
joinError(new IgniteSpiException("Failed to connect to cluster, connection failed and failed " +
@@ -1288,10 +1301,7 @@ class ClientImpl extends TcpDiscoveryImpl {
private final BlockingDeque<Object> queue = new LinkedBlockingDeque<>();
/** */
- private Socket currSock;
-
- /** Indicates that pending messages are currently processed. */
- private boolean pending;
+ private SocketStream currSock;
/** */
private Reconnector reconnector;
@@ -1338,11 +1348,13 @@ class ClientImpl extends TcpDiscoveryImpl {
}
}
else if (msg == SPI_STOP) {
+ boolean connected = state == CONNECTED;
+
state = STOPPED;
assert spi.getSpiContext().isStopping();
- if (currSock != null) {
+ if (connected && currSock != null) {
TcpDiscoveryAbstractMessage leftMsg = new TcpDiscoveryNodeLeftMessage(getLocalNodeId());
leftMsg.client(true);
@@ -1467,7 +1479,10 @@ class ClientImpl extends TcpDiscoveryImpl {
}
}
finally {
- U.closeQuiet(currSock);
+ SocketStream currSock = this.currSock;
+
+ if (currSock != null)
+ U.closeQuiet(currSock.socket());
if (joinLatch.getCount() > 0)
joinError(new IgniteSpiException("Some error in join process.")); // This should not occur.
@@ -1490,7 +1505,7 @@ class ClientImpl extends TcpDiscoveryImpl {
joinCnt++;
- T2<Socket, Boolean> joinRes = joinTopology(false, spi.joinTimeout);
+ T2<SocketStream, Boolean> joinRes = joinTopology(false, spi.joinTimeout);
if (joinRes == null) {
if (join)
@@ -1506,7 +1521,7 @@ class ClientImpl extends TcpDiscoveryImpl {
currSock = joinRes.get1();
- sockWriter.setSocket(joinRes.get1(), joinRes.get2());
+ sockWriter.setSocket(joinRes.get1().socket(), joinRes.get2());
if (spi.joinTimeout > 0) {
final int joinCnt0 = joinCnt;
@@ -1551,6 +1566,9 @@ class ClientImpl extends TcpDiscoveryImpl {
processPingRequest();
spi.stats.onMessageProcessingFinished(msg);
+
+ if (spi.ensured(msg) && state == CONNECTED)
+ lastMsgId = msg.id();
}
/**
@@ -1604,8 +1622,10 @@ class ClientImpl extends TcpDiscoveryImpl {
if (msg.topologyHistory() != null)
topHist.putAll(msg.topologyHistory());
}
- else if (log.isDebugEnabled())
- log.debug("Discarding node added message with empty topology: " + msg);
+ else {
+ if (log.isDebugEnabled())
+ log.debug("Discarding node added message with empty topology: " + msg);
+ }
}
else if (log.isDebugEnabled())
log.debug("Discarding node added message (this message has already been processed) " +
@@ -1625,8 +1645,10 @@ class ClientImpl extends TcpDiscoveryImpl {
spi.onExchange(newNodeId, newNodeId, data, null);
}
}
- else if (log.isDebugEnabled())
- log.debug("Ignore topology message, local node not added to topology: " + msg);
+ else {
+ if (log.isDebugEnabled())
+ log.debug("Ignore topology message, local node not added to topology: " + msg);
+ }
}
}
@@ -1653,6 +1675,11 @@ class ClientImpl extends TcpDiscoveryImpl {
locNode.order(topVer);
+ for (Iterator<Long> it = topHist.keySet().iterator(); it.hasNext();) {
+ if (it.next() >= topVer)
+ it.remove();
+ }
+
Collection<ClusterNode> nodes = updateTopologyHistory(topVer, msg);
notifyDiscovery(EVT_NODE_JOINED, topVer, locNode, nodes);
@@ -1712,7 +1739,7 @@ class ClientImpl extends TcpDiscoveryImpl {
assert top != null && top.contains(node) : "Topology does not contain node [msg=" + msg +
", node=" + node + ", top=" + top + ']';
- if (!pending && joinLatch.getCount() > 0) {
+ if (state != CONNECTED) {
if (log.isDebugEnabled())
log.debug("Discarding node add finished message (join process is not finished): " + msg);
@@ -1725,8 +1752,10 @@ class ClientImpl extends TcpDiscoveryImpl {
spi.stats.onNodeJoined();
}
}
- else if (log.isDebugEnabled())
- log.debug("Ignore topology message, local node not added to topology: " + msg);
+ else {
+ if (log.isDebugEnabled())
+ log.debug("Ignore topology message, local node not added to topology: " + msg);
+ }
}
}
@@ -1756,7 +1785,7 @@ class ClientImpl extends TcpDiscoveryImpl {
Collection<ClusterNode> top = updateTopologyHistory(msg.topologyVersion(), msg);
- if (!pending && joinLatch.getCount() > 0) {
+ if (state != CONNECTED) {
if (log.isDebugEnabled())
log.debug("Discarding node left message (join process is not finished): " + msg);
@@ -1767,8 +1796,10 @@ class ClientImpl extends TcpDiscoveryImpl {
spi.stats.onNodeLeft();
}
- else if (log.isDebugEnabled())
- log.debug("Ignore topology message, local node not added to topology: " + msg);
+ else {
+ if (log.isDebugEnabled())
+ log.debug("Ignore topology message, local node not added to topology: " + msg);
+ }
}
}
@@ -1809,7 +1840,7 @@ class ClientImpl extends TcpDiscoveryImpl {
Collection<ClusterNode> top = updateTopologyHistory(msg.topologyVersion(), msg);
- if (!pending && joinLatch.getCount() > 0) {
+ if (state != CONNECTED) {
if (log.isDebugEnabled())
log.debug("Discarding node failed message (join process is not finished): " + msg);
@@ -1875,25 +1906,18 @@ class ClientImpl extends TcpDiscoveryImpl {
if (reconnector != null) {
assert msg.success() : msg;
- currSock = reconnector.sock;
+ currSock = reconnector.sockStream;
- sockWriter.setSocket(currSock, reconnector.clientAck);
+ sockWriter.setSocket(currSock.socket(), reconnector.clientAck);
sockReader.setSocket(currSock, locNode.clientRouterNodeId());
reconnector = null;
- pending = true;
-
- try {
- for (TcpDiscoveryAbstractMessage pendingMsg : msg.pendingMessages()) {
- if (log.isDebugEnabled())
- log.debug("Process pending message on reconnect [msg=" + pendingMsg + ']');
+ for (TcpDiscoveryAbstractMessage pendingMsg : msg.pendingMessages()) {
+ if (log.isDebugEnabled())
+ log.debug("Process pending message on reconnect [msg=" + pendingMsg + ']');
- processDiscoveryMessage(pendingMsg);
- }
- }
- finally {
- pending = false;
+ processDiscoveryMessage(pendingMsg);
}
}
else {
@@ -1921,7 +1945,7 @@ class ClientImpl extends TcpDiscoveryImpl {
* @param msg Message.
*/
private void processCustomMessage(TcpDiscoveryCustomEventMessage msg) {
- if (msg.verified() && state == CONNECTED) {
+ if (state == CONNECTED) {
DiscoverySpiListener lsnr = spi.lsnr;
if (lsnr != null) {
@@ -2048,13 +2072,56 @@ class ClientImpl extends TcpDiscoveryImpl {
*/
private static class SocketClosedMessage {
/** */
+ private final SocketStream sock;
+
+ /**
+ * @param sock Socket.
+ */
+ private SocketClosedMessage(SocketStream sock) {
+ this.sock = sock;
+ }
+ }
+
+ /**
+ *
+ */
+ private static class SocketStream {
+ /** */
private final Socket sock;
+ /** */
+ private final InputStream in;
+
/**
* @param sock Socket.
+ * @throws IOException If failed to create stream.
*/
- private SocketClosedMessage(Socket sock) {
+ public SocketStream(Socket sock) throws IOException {
+ assert sock != null;
+
this.sock = sock;
+
+ this.in = new BufferedInputStream(sock.getInputStream());
+ }
+
+ /**
+ * @return Socket.
+ */
+ Socket socket() {
+ return sock;
+
+ }
+
+ /**
+ * @return Socket input stream.
+ */
+ InputStream stream() {
+ return in;
+ }
+
+ /** {@inheritDoc} */
+ public String toString() {
+ return sock.toString();
}
}
@@ -2077,4 +2144,4 @@ class ClientImpl extends TcpDiscoveryImpl {
/** */
STOPPED
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/6ea3b562/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
index b8df846..ee9f818 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
@@ -126,7 +126,9 @@ import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryStatusCheckMessa
import org.jetbrains.annotations.Nullable;
import org.jsr166.ConcurrentHashMap8;
+import static org.apache.ignite.IgniteSystemProperties.IGNITE_DISCOVERY_HISTORY_SIZE;
import static org.apache.ignite.IgniteSystemProperties.IGNITE_OPTIMIZED_MARSHALLER_USE_DEFAULT_SUID;
+import static org.apache.ignite.IgniteSystemProperties.getInteger;
import static org.apache.ignite.events.EventType.EVT_NODE_FAILED;
import static org.apache.ignite.events.EventType.EVT_NODE_JOINED;
import static org.apache.ignite.events.EventType.EVT_NODE_LEFT;
@@ -154,6 +156,9 @@ import static org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryStatusChe
@SuppressWarnings("All")
class ServerImpl extends TcpDiscoveryImpl {
/** */
+ private static final int ENSURED_MSG_HIST_SIZE = getInteger(IGNITE_DISCOVERY_HISTORY_SIZE, 1024 * 10);
+
+ /** */
private final ThreadPoolExecutor utilityPool = new ThreadPoolExecutor(0, 1, 2000, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
@@ -1250,9 +1255,11 @@ class ServerImpl extends TcpDiscoveryImpl {
lsnr.onDiscovery(type, topVer, node, top, hist, null);
}
- else if (log.isDebugEnabled())
- log.debug("Skipped discovery notification [node=" + node + ", spiState=" + spiState +
- ", type=" + U.gridEventName(type) + ", topVer=" + topVer + ']');
+ else {
+ if (log.isDebugEnabled())
+ log.debug("Skipped discovery notification [node=" + node + ", spiState=" + spiState +
+ ", type=" + U.gridEventName(type) + ", topVer=" + topVer + ']');
+ }
}
/**
@@ -1447,6 +1454,12 @@ class ServerImpl extends TcpDiscoveryImpl {
tmp = U.arrayList(readers);
}
+ for (ClientMessageWorker msgWorker : clientMsgWorkers.values()) {
+ U.interrupt(msgWorker);
+
+ U.join(msgWorker, log);
+ }
+
U.interrupt(tmp);
U.joinThreads(tmp, log);
@@ -1744,22 +1757,36 @@ class ServerImpl extends TcpDiscoveryImpl {
* Discovery messages history used for client reconnect.
*/
private class EnsuredMessageHistory {
- /** */
- private static final int MAX = 1024;
-
/** Pending messages. */
- private final ArrayDeque<TcpDiscoveryAbstractMessage> msgs = new ArrayDeque<>(MAX * 2);
+ private final GridBoundedLinkedHashSet<TcpDiscoveryAbstractMessage>
+ msgs = new GridBoundedLinkedHashSet<>(ENSURED_MSG_HIST_SIZE);
/**
* @param msg Adds message.
*/
void add(TcpDiscoveryAbstractMessage msg) {
- assert spi.ensured(msg) : msg;
+ assert spi.ensured(msg) && msg.verified() : msg;
+
+ if (msg instanceof TcpDiscoveryNodeAddedMessage) {
+ TcpDiscoveryNodeAddedMessage addedMsg = (TcpDiscoveryNodeAddedMessage)msg;
- msgs.addLast(msg);
+ TcpDiscoveryNode node = addedMsg.node();
- while (msgs.size() > MAX)
- msgs.pollFirst();
+ if (node.isClient() && !msgs.contains(msg)) {
+ Collection<TcpDiscoveryNode> allNodes = ring.allNodes();
+
+ Collection<TcpDiscoveryNode> top = new ArrayList<>(allNodes.size());
+
+ for (TcpDiscoveryNode n0 : allNodes) {
+ if (n0.internalOrder() != 0 && n0.internalOrder() < node.internalOrder())
+ top.add(n0);
+ }
+
+ addedMsg.clientTopology(top);
+ }
+ }
+
+ msgs.add(msg);
}
/**
@@ -1782,11 +1809,11 @@ class ServerImpl extends TcpDiscoveryImpl {
for (TcpDiscoveryAbstractMessage msg : msgs) {
if (msg instanceof TcpDiscoveryNodeAddedMessage) {
- if (node.id().equals(((TcpDiscoveryNodeAddedMessage) msg).node().id()))
+ if (node.id().equals(((TcpDiscoveryNodeAddedMessage)msg).node().id()))
res = new ArrayList<>(msgs.size());
}
- if (res != null && msg.verified())
+ if (res != null)
res.add(prepare(msg, node.id()));
}
@@ -1812,7 +1839,7 @@ class ServerImpl extends TcpDiscoveryImpl {
if (msg.id().equals(lastMsgId))
skip = false;
}
- else if (msg.verified())
+ else
cp.add(prepare(msg, node.id()));
}
@@ -1820,7 +1847,7 @@ class ServerImpl extends TcpDiscoveryImpl {
if (log.isDebugEnabled()) {
if (cp == null)
- log.debug("Failed to find messages history [node=" + node + ", lastMsgId" + lastMsgId + ']');
+ log.debug("Failed to find messages history [node=" + node + ", lastMsgId=" + lastMsgId + ']');
else
log.debug("Found messages history [node=" + node + ", hist=" + cp + ']');
}
@@ -1835,8 +1862,21 @@ class ServerImpl extends TcpDiscoveryImpl {
* @return Prepared message.
*/
private TcpDiscoveryAbstractMessage prepare(TcpDiscoveryAbstractMessage msg, UUID destNodeId) {
- if (msg instanceof TcpDiscoveryNodeAddedMessage)
- prepareNodeAddedMessage(msg, destNodeId, null, null, null);
+ if (msg instanceof TcpDiscoveryNodeAddedMessage) {
+ TcpDiscoveryNodeAddedMessage addedMsg = (TcpDiscoveryNodeAddedMessage)msg;
+
+ if (addedMsg.node().id().equals(destNodeId)) {
+ assert addedMsg.clientTopology() != null : addedMsg;
+
+ TcpDiscoveryNodeAddedMessage msg0 = new TcpDiscoveryNodeAddedMessage(addedMsg);
+
+ prepareNodeAddedMessage(msg0, destNodeId, null, null, null);
+
+ msg0.topology(addedMsg.clientTopology());
+
+ return msg0;
+ }
+ }
return msg;
}
@@ -2132,7 +2172,7 @@ class ServerImpl extends TcpDiscoveryImpl {
else
assert false : "Unknown message type: " + msg.getClass().getSimpleName();
- if (spi.ensured(msg))
+ if (spi.ensured(msg) && redirectToClients(msg))
msgHist.add(msg);
if (msg.senderNodeId() != null && !msg.senderNodeId().equals(getLocalNodeId())) {
@@ -2161,19 +2201,9 @@ class ServerImpl extends TcpDiscoveryImpl {
}
/**
- * Sends message across the ring.
- *
- * @param msg Message to send
+ * @param msg Message.
*/
- @SuppressWarnings({"BreakStatementWithLabel", "LabeledStatement", "ContinueStatementWithLabel"})
- private void sendMessageAcrossRing(TcpDiscoveryAbstractMessage msg) {
- assert msg != null;
-
- assert ring.hasRemoteNodes();
-
- for (IgniteInClosure<TcpDiscoveryAbstractMessage> msgLsnr : spi.sendMsgLsnrs)
- msgLsnr.apply(msg);
-
+ private void sendMessageToClients(TcpDiscoveryAbstractMessage msg) {
if (redirectToClients(msg)) {
byte[] marshalledMsg = null;
@@ -2193,9 +2223,28 @@ class ServerImpl extends TcpDiscoveryImpl {
msgClone = msg;
}
+ prepareNodeAddedMessage(msgClone, clientMsgWorker.clientNodeId, null, null, null);
+
clientMsgWorker.addMessage(msgClone);
}
}
+ }
+
+ /**
+ * Sends message across the ring.
+ *
+ * @param msg Message to send
+ */
+ @SuppressWarnings({"BreakStatementWithLabel", "LabeledStatement", "ContinueStatementWithLabel"})
+ private void sendMessageAcrossRing(TcpDiscoveryAbstractMessage msg) {
+ assert msg != null;
+
+ assert ring.hasRemoteNodes();
+
+ for (IgniteInClosure<TcpDiscoveryAbstractMessage> msgLsnr : spi.sendMsgLsnrs)
+ msgLsnr.apply(msg);
+
+ sendMessageToClients(msg);
Collection<TcpDiscoveryNode> failedNodes;
@@ -2810,7 +2859,7 @@ class ServerImpl extends TcpDiscoveryImpl {
"[clientNode=" + existingNode + ", msg=" + reconMsg + ']');
}
else {
- if (ring.hasRemoteNodes())
+ if (sendMessageToRemotes(reconMsg))
sendMessageAcrossRing(reconMsg);
}
}
@@ -3052,8 +3101,11 @@ class ServerImpl extends TcpDiscoveryImpl {
nodeAddedMsg.client(msg.client());
processNodeAddedMessage(nodeAddedMsg);
+
+ if (nodeAddedMsg.verified())
+ msgHist.add(nodeAddedMsg);
}
- else if (ring.hasRemoteNodes())
+ else if (sendMessageToRemotes(msg))
sendMessageAcrossRing(msg);
}
@@ -3155,8 +3207,13 @@ class ServerImpl extends TcpDiscoveryImpl {
log.debug("Failing reconnecting client node because failed to restore pending " +
"messages [locNodeId=" + locNodeId + ", clientNodeId=" + nodeId + ']');
- processNodeFailedMessage(new TcpDiscoveryNodeFailedMessage(locNodeId,
- node.id(), node.internalOrder()));
+ TcpDiscoveryNodeFailedMessage nodeFailedMsg = new TcpDiscoveryNodeFailedMessage(locNodeId,
+ node.id(), node.internalOrder());
+
+ processNodeFailedMessage(nodeFailedMsg);
+
+ if (nodeFailedMsg.verified())
+ msgHist.add(nodeFailedMsg);
}
}
else if (log.isDebugEnabled())
@@ -3172,12 +3229,12 @@ class ServerImpl extends TcpDiscoveryImpl {
locNodeId + ", clientNodeId=" + nodeId + ']');
}
else {
- if (ring.hasRemoteNodes())
+ if (sendMessageToRemotes(msg))
sendMessageAcrossRing(msg);
}
}
else {
- if (ring.hasRemoteNodes())
+ if (sendMessageToRemotes(msg))
sendMessageAcrossRing(msg);
}
}
@@ -3239,6 +3296,9 @@ class ServerImpl extends TcpDiscoveryImpl {
processNodeAddFinishedMessage(addFinishMsg);
+ if (addFinishMsg.verified())
+ msgHist.add(addFinishMsg);
+
addMessage(new TcpDiscoveryDiscardMessage(locNodeId, msg.id(), false));
return;
@@ -3249,7 +3309,7 @@ class ServerImpl extends TcpDiscoveryImpl {
else if (!locNodeId.equals(node.id()) && ring.node(node.id()) != null) {
// Local node already has node from message in local topology.
// Just pass it to coordinator via the ring.
- if (ring.hasRemoteNodes())
+ if (sendMessageToRemotes(msg))
sendMessageAcrossRing(msg);
if (log.isDebugEnabled())
@@ -3437,7 +3497,7 @@ class ServerImpl extends TcpDiscoveryImpl {
}
}
- if (ring.hasRemoteNodes())
+ if (sendMessageToRemotes(msg))
sendMessageAcrossRing(msg);
}
@@ -3572,7 +3632,7 @@ class ServerImpl extends TcpDiscoveryImpl {
notifyDiscovery(EVT_NODE_JOINED, topVer, locNode);
}
- if (ring.hasRemoteNodes())
+ if (sendMessageToRemotes(msg))
sendMessageAcrossRing(msg);
checkPendingCustomMessages();
@@ -3740,7 +3800,7 @@ class ServerImpl extends TcpDiscoveryImpl {
}
}
- if (ring.hasRemoteNodes()) {
+ if (sendMessageToRemotes(msg)) {
try {
sendMessageAcrossRing(msg);
}
@@ -3761,6 +3821,19 @@ class ServerImpl extends TcpDiscoveryImpl {
}
/**
+ * @param msg Message to send.
+ * @return {@code True} if message should be send across the ring.
+ */
+ private boolean sendMessageToRemotes(TcpDiscoveryAbstractMessage msg) {
+ if (ring.hasRemoteNodes())
+ return true;
+
+ sendMessageToClients(msg);
+
+ return false;
+ }
+
+ /**
* Processes node failed message.
*
* @param msg Node failed message.
@@ -3892,7 +3965,7 @@ class ServerImpl extends TcpDiscoveryImpl {
spi.stats.onNodeFailed();
}
- if (ring.hasRemoteNodes())
+ if (sendMessageToRemotes(msg))
sendMessageAcrossRing(msg);
else {
if (log.isDebugEnabled())
@@ -4032,7 +4105,7 @@ class ServerImpl extends TcpDiscoveryImpl {
}
}
- if (ring.hasRemoteNodes())
+ if (sendMessageToRemotes(msg))
sendMessageAcrossRing(msg);
}
@@ -4098,7 +4171,7 @@ class ServerImpl extends TcpDiscoveryImpl {
}
}
- if (ring.hasRemoteNodes()) {
+ if (sendMessageToRemotes(msg)) {
if ((locNodeId.equals(msg.creatorNodeId()) && msg.senderNodeId() == null ||
!hasMetrics(msg, locNodeId)) && spiStateCopy() == CONNECTED) {
// Message is on its first ring or just created on coordinator.
@@ -4135,16 +4208,22 @@ class ServerImpl extends TcpDiscoveryImpl {
failedNode = failedNodes.contains(clientNode);
}
- if (!failedNode)
- processNodeFailedMessage(new TcpDiscoveryNodeFailedMessage(locNodeId,
- clientNode.id(), clientNode.internalOrder()));
+ if (!failedNode) {
+ TcpDiscoveryNodeFailedMessage nodeFailedMsg = new TcpDiscoveryNodeFailedMessage(
+ locNodeId, clientNode.id(), clientNode.internalOrder());
+
+ processNodeFailedMessage(nodeFailedMsg);
+
+ if (nodeFailedMsg.verified())
+ msgHist.add(nodeFailedMsg);
+ }
}
}
}
}
}
- if (ring.hasRemoteNodes())
+ if (sendMessageToRemotes(msg))
sendMessageAcrossRing(msg);
}
else {
@@ -4351,7 +4430,7 @@ class ServerImpl extends TcpDiscoveryImpl {
notifyDiscoveryListener(msg);
}
- if (ring.hasRemoteNodes())
+ if (sendMessageToRemotes(msg))
sendMessageAcrossRing(msg);
}
}
@@ -4363,8 +4442,12 @@ class ServerImpl extends TcpDiscoveryImpl {
if (joiningNodes.isEmpty() && isLocalNodeCoordinator()) {
TcpDiscoveryCustomEventMessage msg;
- while ((msg = pendingCustomMsgs.poll()) != null)
+ while ((msg = pendingCustomMsgs.poll()) != null) {
processCustomMessage(msg);
+
+ if (msg.verified())
+ msgHist.add(msg);
+ }
}
}
@@ -5293,19 +5376,14 @@ class ServerImpl extends TcpDiscoveryImpl {
}
}
else {
- try {
- if (log.isDebugEnabled())
- log.debug("Redirecting message to client [sock=" + sock + ", locNodeId="
- + getLocalNodeId() + ", rmtNodeId=" + clientNodeId + ", msg=" + msg + ']');
+ if (log.isDebugEnabled())
+ log.debug("Redirecting message to client [sock=" + sock + ", locNodeId="
+ + getLocalNodeId() + ", rmtNodeId=" + clientNodeId + ", msg=" + msg + ']');
- prepareNodeAddedMessage(msg, clientNodeId, null, null, null);
+ assert topologyInitialized(msg) : msg;
- writeToSocket(sock, msg, spi.failureDetectionTimeoutEnabled() ?
- spi.failureDetectionTimeout() : spi.getSocketTimeout());
- }
- finally {
- clearNodeAddedMessage(msg);
- }
+ writeToSocket(sock, msg, spi.failureDetectionTimeoutEnabled() ?
+ spi.failureDetectionTimeout() : spi.getSocketTimeout());
}
}
catch (IgniteCheckedException | IOException e) {
@@ -5325,6 +5403,21 @@ class ServerImpl extends TcpDiscoveryImpl {
}
/**
+ * @param msg Message.
+ * @return {@code True} if topology initialized.
+ */
+ private boolean topologyInitialized(TcpDiscoveryAbstractMessage msg) {
+ if (msg instanceof TcpDiscoveryNodeAddedMessage) {
+ TcpDiscoveryNodeAddedMessage addedMsg = (TcpDiscoveryNodeAddedMessage)msg;
+
+ if (clientNodeId.equals(addedMsg.node().id()))
+ return addedMsg.topology() != null;
+ }
+
+ return true;
+ }
+
+ /**
* @param res Ping result.
*/
public void pingResult(boolean res) {
http://git-wip-us.apache.org/repos/asf/ignite/blob/6ea3b562/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryAbstractMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryAbstractMessage.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryAbstractMessage.java
index c50f791..875d18e 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryAbstractMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryAbstractMessage.java
@@ -79,6 +79,17 @@ public abstract class TcpDiscoveryAbstractMessage implements Serializable {
}
/**
+ * @param msg Message.
+ */
+ protected TcpDiscoveryAbstractMessage(TcpDiscoveryAbstractMessage msg) {
+ this.id = msg.id;
+ this.verifierNodeId = msg.verifierNodeId;
+ this.topVer = msg.topVer;
+ this.flags = msg.flags;
+ this.pendingIdx = msg.pendingIdx;
+ }
+
+ /**
* Gets creator node.
*
* @return Creator node ID.
http://git-wip-us.apache.org/repos/asf/ignite/blob/6ea3b562/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeAddedMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeAddedMessage.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeAddedMessage.java
index 5a7146d..6f8e14e 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeAddedMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeAddedMessage.java
@@ -55,6 +55,10 @@ public class TcpDiscoveryNodeAddedMessage extends TcpDiscoveryAbstractMessage {
@GridToStringInclude
private Collection<TcpDiscoveryNode> top;
+ /** */
+ @GridToStringInclude
+ private transient Collection<TcpDiscoveryNode> clientTop;
+
/** Topology snapshots history. */
private Map<Long, Collection<ClusterNode>> topHist;
@@ -93,6 +97,24 @@ public class TcpDiscoveryNodeAddedMessage extends TcpDiscoveryAbstractMessage {
}
/**
+ * @param msg Message.
+ */
+ public TcpDiscoveryNodeAddedMessage(TcpDiscoveryNodeAddedMessage msg) {
+ super(msg);
+
+ this.node = msg.node;
+ this.msgs = msg.msgs;
+ this.discardMsgId = msg.discardMsgId;
+ this.discardCustomMsgId = msg.discardCustomMsgId;
+ this.top = msg.top;
+ this.clientTop = msg.clientTop;
+ this.topHist = msg.topHist;
+ this.newNodeDiscoData = msg.newNodeDiscoData;
+ this.oldNodesDiscoData = msg.oldNodesDiscoData;
+ this.gridStartTime = msg.gridStartTime;
+ }
+
+ /**
* Gets newly added node.
*
* @return New node.
@@ -133,6 +155,7 @@ public class TcpDiscoveryNodeAddedMessage extends TcpDiscoveryAbstractMessage {
*
* @param msgs Pending messages to send to new node.
* @param discardMsgId Discarded message ID.
+ * @param discardCustomMsgId Discarded custom message ID.
*/
public void messages(
@Nullable Collection<TcpDiscoveryAbstractMessage> msgs,
@@ -163,6 +186,22 @@ public class TcpDiscoveryNodeAddedMessage extends TcpDiscoveryAbstractMessage {
}
/**
+ * @param top Topology at the moment when client joined.
+ */
+ public void clientTopology(Collection<TcpDiscoveryNode> top) {
+ assert top != null && !top.isEmpty() : top;
+
+ this.clientTop = top;
+ }
+
+ /**
+ * @return Topology at the moment when client joined.
+ */
+ public Collection<TcpDiscoveryNode> clientTopology() {
+ return clientTop;
+ }
+
+ /**
* Gets topology snapshots history.
*
* @return Map with topology snapshots history.
http://git-wip-us.apache.org/repos/asf/ignite/blob/6ea3b562/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCacheTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCacheTest.java b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCacheTest.java
index edd95e9..6131f54 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCacheTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCacheTest.java
@@ -1128,6 +1128,39 @@ public class IgniteClientReconnectCacheTest extends IgniteClientReconnectAbstrac
}
/**
+ * @throws Exception If failed.
+ */
+ public void testReconnectDestroyCache() throws Exception {
+ clientMode = true;
+
+ Ignite client = startGrid(SRV_CNT);
+
+ CacheConfiguration<Integer, Integer> ccfg1 = new CacheConfiguration<>();
+ ccfg1.setName("cache1");
+
+ CacheConfiguration<Integer, Integer> ccfg2 = new CacheConfiguration<>();
+ ccfg2.setName("cache2");
+
+ final Ignite srv = grid(0);
+
+ srv.createCache(ccfg1);
+ srv.createCache(ccfg2).put(1, 1);
+
+ IgniteCache<Integer, Integer> cache = client.cache("cache2");
+
+ reconnectClientNode(client, srv, new Runnable() {
+ @Override public void run() {
+ srv.destroyCache("cache1");
+ }
+ });
+
+ cache.put(2, 2);
+
+ assertEquals(1, (Object)cache.get(1));
+ assertEquals(2, (Object)cache.get(2));
+ }
+
+ /**
* @param client Client.
* @param disconnectLatch Disconnect event latch.
* @param reconnectLatch Reconnect event latch.
http://git-wip-us.apache.org/repos/asf/ignite/blob/6ea3b562/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiSelfTest.java
index a6b5535..530ff61 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiSelfTest.java
@@ -78,6 +78,7 @@ import org.apache.ignite.lang.IgniteClosure;
import org.apache.ignite.lang.IgniteFuture;
import org.apache.ignite.lang.IgnitePredicate;
import org.apache.ignite.resources.LoggerResource;
+import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
import org.apache.ignite.spi.swapspace.inmemory.GridTestSwapSpaceSpi;
import org.apache.ignite.testframework.GridTestUtils;
@@ -183,6 +184,8 @@ public abstract class GridCacheAbstractFullApiSelfTest extends GridCacheAbstract
@Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
IgniteConfiguration cfg = super.getConfiguration(gridName);
+ ((TcpCommunicationSpi)cfg.getCommunicationSpi()).setSharedMemoryPort(-1);
+
((TcpDiscoverySpi)cfg.getDiscoverySpi()).setForceServerMode(true);
if (memoryMode() == OFFHEAP_TIERED || memoryMode() == OFFHEAP_VALUES)
http://git-wip-us.apache.org/repos/asf/ignite/blob/6ea3b562/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheGetFutureHangsSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheGetFutureHangsSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheGetFutureHangsSelfTest.java
index 51e76f6..659520b 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheGetFutureHangsSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheGetFutureHangsSelfTest.java
@@ -32,6 +32,9 @@ import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.util.typedef.T2;
import org.apache.ignite.marshaller.optimized.OptimizedMarshaller;
import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import org.jetbrains.annotations.Nullable;
@@ -41,6 +44,9 @@ import static org.apache.ignite.cache.CacheWriteSynchronizationMode.PRIMARY_SYNC
* Test for reproducing problems during simultaneously Ignite instances stopping and cache requests executing.
*/
public class CacheGetFutureHangsSelfTest extends GridCommonAbstractTest {
+ /** */
+ private static TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true);
+
/** Grid count. */
private static final int GRID_CNT = 8;
@@ -55,6 +61,8 @@ public class CacheGetFutureHangsSelfTest extends GridCommonAbstractTest {
@Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
IgniteConfiguration cfg = super.getConfiguration(gridName);
+ ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(ipFinder);
+
((TcpCommunicationSpi)cfg.getCommunicationSpi()).setSharedMemoryPort(-1);
OptimizedMarshaller marsh = new OptimizedMarshaller();
http://git-wip-us.apache.org/repos/asf/ignite/blob/6ea3b562/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheClientReconnectTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheClientReconnectTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheClientReconnectTest.java
index 2aa4280..37c5a6b 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheClientReconnectTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheClientReconnectTest.java
@@ -94,6 +94,8 @@ public class IgniteCacheClientReconnectTest extends GridCommonAbstractTest {
/** {@inheritDoc} */
@Override protected void afterTestsStopped() throws Exception {
super.afterTestsStopped();
+
+ stopAllGrids();
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/ignite/blob/6ea3b562/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheManyClientsTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheManyClientsTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheManyClientsTest.java
index 78fc590..242b12d 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheManyClientsTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheManyClientsTest.java
@@ -113,13 +113,6 @@ public class IgniteCacheManyClientsTest extends GridCommonAbstractTest {
/**
* @throws Exception If failed.
*/
- public void testManyClients() throws Throwable {
- manyClientsPutGet();
- }
-
- /**
- * @throws Exception If failed.
- */
public void testManyClientsClientDiscovery() throws Throwable {
clientDiscovery = true;
@@ -138,6 +131,13 @@ public class IgniteCacheManyClientsTest extends GridCommonAbstractTest {
/**
* @throws Exception If failed.
*/
+ public void testManyClientsForceServerMode() throws Throwable {
+ manyClientsPutGet();
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
private void manyClientsSequentially() throws Exception {
client = true;
http://git-wip-us.apache.org/repos/asf/ignite/blob/6ea3b562/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/IgniteClientReconnectMassiveShutdownTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/IgniteClientReconnectMassiveShutdownTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/IgniteClientReconnectMassiveShutdownTest.java
new file mode 100644
index 0000000..6f0e887
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/IgniteClientReconnectMassiveShutdownTest.java
@@ -0,0 +1,303 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spi.discovery.tcp;
+
+import java.util.HashMap;
+import java.util.Random;
+import java.util.UUID;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Callable;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.atomic.AtomicBoolean;
+import javax.cache.CacheException;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteClientDisconnectedException;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.IgniteTransactions;
+import org.apache.ignite.cluster.ClusterTopologyException;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.util.typedef.X;
+import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.apache.ignite.transactions.Transaction;
+
+import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
+import static org.apache.ignite.cache.CacheMemoryMode.OFFHEAP_TIERED;
+import static org.apache.ignite.cache.CacheMode.PARTITIONED;
+import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC;
+import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ;
+
+/**
+ * Client reconnect test in multi threaded mode while cache operations are in progress.
+ */
+public class IgniteClientReconnectMassiveShutdownTest extends GridCommonAbstractTest {
+ /** */
+ private static final int GRID_CNT = 14;
+
+ /** */
+ private static final int CLIENT_GRID_CNT = 14;
+
+ /** */
+ private static volatile boolean clientMode;
+
+ /** */
+ private TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true);
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+ cfg.setClientMode(clientMode);
+
+ cfg.setDiscoverySpi(new TcpDiscoverySpi().setIpFinder(ipFinder));
+
+ ((TcpCommunicationSpi)cfg.getCommunicationSpi()).setSharedMemoryPort(-1);
+
+ return cfg;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTest() throws Exception {
+ stopAllGrids();
+
+ super.afterTest();
+
+ Thread.sleep(5000);
+ }
+
+ /** {@inheritDoc} */
+ @Override protected long getTestTimeout() {
+ return 5 * 60 * 1000;
+ }
+
+ /**
+ * @throws Exception If any error occurs.
+ */
+ public void _testMassiveServersShutdown1() throws Exception {
+ massiveServersShutdown(StopType.FAIL_EVENT);
+ }
+
+ /**
+ * @throws Exception If any error occurs.
+ */
+ public void testMassiveServersShutdown2() throws Exception {
+ massiveServersShutdown(StopType.SIMULATE_FAIL);
+ }
+
+ /**
+ * @throws Exception If any error occurs.
+ */
+ public void _testMassiveServersShutdown3() throws Exception {
+ massiveServersShutdown(StopType.CLOSE);
+ }
+
+ /**
+ * @param stopType How tp stop node.
+ * @throws Exception If any error occurs.
+ */
+ private void massiveServersShutdown(final StopType stopType) throws Exception {
+ clientMode = false;
+
+ startGridsMultiThreaded(GRID_CNT);
+
+ clientMode = true;
+
+ startGridsMultiThreaded(GRID_CNT, CLIENT_GRID_CNT);
+
+ final AtomicBoolean done = new AtomicBoolean();
+
+ // Starting a cache dynamically.
+ Ignite client = grid(GRID_CNT);
+
+ assertTrue(client.configuration().isClientMode());
+
+ CacheConfiguration<String, Integer> cfg = new CacheConfiguration<>();
+
+ cfg.setCacheMode(PARTITIONED);
+ cfg.setAtomicityMode(TRANSACTIONAL);
+ cfg.setBackups(2);
+ cfg.setOffHeapMaxMemory(0);
+ cfg.setMemoryMode(OFFHEAP_TIERED);
+
+ IgniteCache<String, Integer> cache = client.getOrCreateCache(cfg);
+
+ HashMap<String, Integer> put = new HashMap<>();
+
+ // Load some data.
+ for (int i = 0; i < 10_000; i++)
+ put.put(String.valueOf(i), i);
+
+ cache.putAll(put);
+
+ // Preparing client nodes and starting cache operations from them.
+ final BlockingQueue<Integer> clientIdx = new LinkedBlockingQueue<>();
+
+ for (int i = GRID_CNT; i < GRID_CNT + CLIENT_GRID_CNT; i++)
+ clientIdx.add(i);
+
+ IgniteInternalFuture<?> clientsFut = multithreadedAsync(
+ new Callable<Object>() {
+ @Override public Object call() throws Exception {
+ int idx = clientIdx.take();
+
+ Ignite ignite = grid(idx);
+
+ Thread.currentThread().setName("client-thread-" + ignite.name());
+
+ assertTrue(ignite.configuration().isClientMode());
+
+ IgniteCache<String, Integer> cache = ignite.cache(null);
+
+ IgniteTransactions txs = ignite.transactions();
+
+ Random rand = new Random();
+
+ while (!done.get()) {
+ try (Transaction tx = txs.txStart(PESSIMISTIC, REPEATABLE_READ)) {
+ cache.put(String.valueOf(rand.nextInt(10_000)), rand.nextInt(50_000));
+
+ tx.commit();
+ }
+ catch (ClusterTopologyException ex) {
+ ex.retryReadyFuture().get();
+ }
+ catch (IgniteException | CacheException e) {
+ if (X.hasCause(e, IgniteClientDisconnectedException.class)) {
+ IgniteClientDisconnectedException cause = X.cause(e,
+ IgniteClientDisconnectedException.class);
+
+ assert cause != null;
+
+ cause.reconnectFuture().get();
+ }
+ else if (X.hasCause(e, ClusterTopologyException.class)) {
+ ClusterTopologyException cause = X.cause(e, ClusterTopologyException.class);
+
+ assert cause != null;
+
+ cause.retryReadyFuture().get();
+ }
+ else
+ throw e;
+ }
+ }
+
+ return null;
+ }
+ },
+ CLIENT_GRID_CNT, "client-thread");
+
+ try {
+ // Killing a half of server nodes.
+ final int srvsToKill = GRID_CNT / 2;
+
+ final BlockingQueue<Integer> victims = new LinkedBlockingQueue<>();
+
+ for (int i = 0; i < srvsToKill; i++)
+ victims.add(i);
+
+ final BlockingQueue<Integer> assassins = new LinkedBlockingQueue<>();
+
+ for (int i = srvsToKill; i < GRID_CNT; i++)
+ assassins.add(i);
+
+ IgniteInternalFuture<?> srvsShutdownFut = multithreadedAsync(
+ new Callable<Object>() {
+ @Override public Object call() throws Exception {
+ Thread.sleep(5_000);
+
+ Ignite assassin = grid(assassins.take());
+
+ assertFalse(assassin.configuration().isClientMode());
+
+ Ignite victim = grid(victims.take());
+
+ assertFalse(victim.configuration().isClientMode());
+
+ log.info("Kill node [node=" + victim.name() + ", from=" + assassin.name() + ']');
+
+ switch (stopType) {
+ case CLOSE:
+ victim.close();
+
+ break;
+
+ case FAIL_EVENT:
+ UUID nodeId = victim.cluster().localNode().id();
+
+ assassin.configuration().getDiscoverySpi().failNode(nodeId, null);
+
+ break;
+
+ case SIMULATE_FAIL:
+ ((TcpDiscoverySpi)victim.configuration().getDiscoverySpi()).simulateNodeFailure();
+
+ break;
+
+ default:
+ fail();
+ }
+
+ return null;
+ }
+ },
+ assassins.size(), "kill-thread");
+
+ srvsShutdownFut.get();
+
+ Thread.sleep(15_000);
+
+ done.set(true);
+
+ clientsFut.get();
+
+ awaitPartitionMapExchange();
+
+ for (int k = 0; k < 10_000; k++) {
+ String key = String.valueOf(k);
+
+ Object val = cache.get(key);
+
+ for (int i = srvsToKill; i < GRID_CNT; i++)
+ assertEquals(val, ignite(i).cache(null).get(key));
+ }
+ }
+ finally {
+ done.set(true);
+ }
+ }
+
+ /**
+ *
+ */
+ enum StopType {
+ /** */
+ CLOSE,
+
+ /** */
+ SIMULATE_FAIL,
+
+ /** */
+ FAIL_EVENT
+ }
+}