You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2017/10/23 10:17:36 UTC
ignite git commit: ignite-6700
Repository: ignite
Updated Branches:
refs/heads/ignite-6700 [created] a30dad5d1
ignite-6700
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/a30dad5d
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/a30dad5d
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/a30dad5d
Branch: refs/heads/ignite-6700
Commit: a30dad5d104d74063fd4be8ba4e6bfe9da2b2242
Parents: 026254c
Author: sboikov <sb...@gridgain.com>
Authored: Mon Oct 23 13:16:57 2017 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Mon Oct 23 13:16:57 2017 +0300
----------------------------------------------------------------------
.../ignite/spi/discovery/tcp/ServerImpl.java | 45 ++++++--
.../spi/discovery/tcp/TcpDiscoverySpi.java | 18 +++
.../spi/discovery/tcp/TcpDiscoverySelfTest.java | 110 +++++++++++++++++++
3 files changed, 163 insertions(+), 10 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/a30dad5d/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
index 60f9d4e..7d5edbd 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
@@ -1958,8 +1958,36 @@ class ServerImpl extends TcpDiscoveryImpl {
* @param msg Message.
*/
private void processMessageFailedNodes(TcpDiscoveryAbstractMessage msg) {
- if (msg.failedNodes() != null) {
- for (UUID nodeId : msg.failedNodes()) {
+ Collection<UUID> msgFailedNodes = msg.failedNodes();
+
+ if (msgFailedNodes != null) {
+ UUID sndId = msg.senderNodeId();
+
+ if (sndId != null) {
+ if (ring.node(sndId) == null) {
+ if (log.isDebugEnabled()) {
+ log.debug("Ignore message failed nodes, sender node is not alive [nodeId=" + sndId +
+ ", failedNodes=" + msgFailedNodes + ']');
+ }
+
+ return;
+ }
+
+ synchronized (mux) {
+ for (TcpDiscoveryNode failedNode : failedNodes.keySet()) {
+ if (failedNode.id().equals(sndId)) {
+ if (log.isDebugEnabled()) {
+ log.debug("Ignore message failed nodes, sender node is in fail list [nodeId=" + sndId +
+ ", failedNodes=" + msgFailedNodes + ']');
+ }
+
+ return;
+ }
+ }
+ }
+ }
+
+ for (UUID nodeId : msgFailedNodes) {
TcpDiscoveryNode failedNode = ring.node(nodeId);
if (failedNode != null) {
@@ -2816,9 +2844,6 @@ class ServerImpl extends TcpDiscoveryImpl {
log.trace("Next node remains the same [nextId=" + next.id() +
", nextOrder=" + next.internalOrder() + ']');
- // Flag that shows whether next node exists and accepts incoming connections.
- boolean nextNodeExists = sock != null;
-
final boolean sameHost = U.sameMacs(locNode, next);
List<InetSocketAddress> locNodeAddrs = U.arrayList(locNode.socketAddresses());
@@ -2843,8 +2868,6 @@ class ServerImpl extends TcpDiscoveryImpl {
if (timeoutHelper == null)
timeoutHelper = new IgniteSpiOperationTimeoutHelper(spi, true);
- nextNodeExists = false;
-
boolean success = false;
boolean openSock = false;
@@ -2973,8 +2996,6 @@ class ServerImpl extends TcpDiscoveryImpl {
sock = null;
}
else {
- // Next node exists and accepts incoming messages.
- nextNodeExists = true;
// Resetting timeout control object to let the code below to use a new one
// for the next bunch of operations.
timeoutHelper = null;
@@ -3066,7 +3087,11 @@ class ServerImpl extends TcpDiscoveryImpl {
if (latencyCheck && log.isInfoEnabled())
log.info("Latency check message has been written to socket: " + msg.id());
- spi.writeToSocket(sock, out, msg, timeoutHelper.nextTimeoutChunk(spi.getSocketTimeout()));
+ spi.writeToSocket(newNextNode ? newNext : next,
+ sock,
+ out,
+ msg,
+ timeoutHelper.nextTimeoutChunk(spi.getSocketTimeout()));
long tstamp0 = U.currentTimeMillis();
http://git-wip-us.apache.org/repos/asf/ignite/blob/a30dad5d/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
index eb8ee30..922e787 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
@@ -1478,6 +1478,24 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi {
}
/**
+ * @param node Target node.
+ * @param sock Socket.
+ * @param out Stream to write to.
+ * @param msg Message.
+ * @param timeout Timeout.
+ * @throws IOException If IO failed or write timed out.
+ * @throws IgniteCheckedException If marshalling failed.
+ */
+ protected void writeToSocket(
+ ClusterNode node,
+ Socket sock,
+ OutputStream out,
+ TcpDiscoveryAbstractMessage msg,
+ long timeout) throws IOException, IgniteCheckedException {
+ writeToSocket(sock, out, msg, timeout);
+ }
+
+ /**
* Writes message to the socket.
*
* @param sock Socket.
http://git-wip-us.apache.org/repos/asf/ignite/blob/a30dad5d/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java
index bf48fcc..d6d484c 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java
@@ -44,6 +44,7 @@ import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteIllegalStateException;
import org.apache.ignite.Ignition;
+import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.events.DiscoveryEvent;
@@ -52,6 +53,7 @@ import org.apache.ignite.events.EventType;
import org.apache.ignite.internal.GridComponent;
import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.IgniteInterruptedCheckedException;
import org.apache.ignite.internal.IgniteKernal;
import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage;
import org.apache.ignite.internal.processors.continuous.StartRoutineAckDiscoveryMessage;
@@ -77,6 +79,7 @@ import org.apache.ignite.spi.discovery.tcp.internal.TcpDiscoveryStatistics;
import org.apache.ignite.spi.discovery.tcp.ipfinder.multicast.TcpDiscoveryMulticastIpFinder;
import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryAbstractMessage;
+import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryConnectionCheckMessage;
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryCustomEventMessage;
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryMetricsUpdateMessage;
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryNodeAddFinishedMessage;
@@ -2069,6 +2072,42 @@ public class TcpDiscoverySelfTest extends GridCommonAbstractTest {
}
/**
+ * @throws Exception If failed.
+ */
+ public void testFailedNodeRestoreConnection() throws Exception {
+ try {
+ TestRestoreConnectedSpi.startTest = false;
+
+ for (int i = 1; i < 5; i++) {
+ nodeSpi.set(new TestRestoreConnectedSpi(3));
+
+ startGrid(i);
+ }
+
+ awaitPartitionMapExchange();
+
+ info("Start fail test");
+
+ TestRestoreConnectedSpi.startTest = true;
+
+ waitNodeStop(getTestIgniteInstanceName(3));
+
+ U.sleep(5000);
+
+ for (int i = 1; i < 5; i++) {
+ if (i != 3) {
+ Ignite node = ignite(i);
+
+ assertEquals(3, node.cluster().nodes().size());
+ }
+ }
+ }
+ finally {
+ stopAllGrids();
+ }
+ }
+
+ /**
* @param nodeName Node name.
* @throws Exception If failed.
*/
@@ -2171,6 +2210,77 @@ public class TcpDiscoverySelfTest extends GridCommonAbstractTest {
/**
*
*/
+ private static class TestRestoreConnectedSpi extends TcpDiscoverySpi {
+ /** */
+ static volatile boolean startTest;
+
+ /** */
+ private long sleepEndTime;
+
+ /** */
+ private long errNodeOrder;
+
+ /** */
+ private ClusterNode errNext;
+
+ /**
+ * @param errNodeOrder
+ */
+ TestRestoreConnectedSpi(long errNodeOrder) {
+ this.errNodeOrder = errNodeOrder;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void writeToSocket(ClusterNode node,
+ Socket sock,
+ OutputStream out,
+ TcpDiscoveryAbstractMessage msg,
+ long timeout) throws IOException, IgniteCheckedException {
+ if (startTest && !(msg instanceof TcpDiscoveryConnectionCheckMessage)) {
+ if (node.order() == errNodeOrder) {
+ log.info("Fail write on message send [node=" + node.id() + ", msg=" + msg + ']');
+
+ throw new SocketTimeoutException();
+ }
+ else if (locNode.order() == errNodeOrder) {
+ if (sleepEndTime == 0) {
+ errNext = node;
+
+ sleepEndTime = System.currentTimeMillis() + 3000;
+ }
+
+ long sleepTime = sleepEndTime - System.currentTimeMillis();
+
+ if (sleepTime > 0) {
+ log.info("Start sleep on message send: " + msg);
+
+ try {
+ U.sleep(sleepTime);
+ }
+ catch (IgniteInterruptedCheckedException e) {
+ log.error("Interrupted on socket write: " + e, e);
+
+ throw new IOException(e);
+ }
+
+ log.info("Stop sleep on message send: " + msg);
+
+ if (node.equals(errNext)) {
+ log.info("Fail write after sleep [node=" + node.id() + ", msg=" + msg + ']');
+
+ throw new SocketTimeoutException();
+ }
+ }
+ }
+ }
+
+ super.writeToSocket(node, sock, out, msg, timeout);
+ }
+ }
+
+ /**
+ *
+ */
private static class TestDiscoveryDataDuplicateSpi extends TcpDiscoverySpi {
/** */
static volatile boolean fail;