You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ak...@apache.org on 2017/02/13 10:32:15 UTC
[18/31] ignite git commit: ignite-4499 Drop node from topology in
case when connection creation is impossible
ignite-4499 Drop node from topology in case when connection creation is impossible
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/f9aaf035
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/f9aaf035
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/f9aaf035
Branch: refs/heads/ignite-4436-2
Commit: f9aaf0353cea54afefea4caac74b1583eb17969b
Parents: ecf4b8b
Author: agura <agura>
Authored: Wed Jan 18 18:04:45 2017 +0300
Committer: agura <agura>
Committed: Wed Jan 18 18:04:45 2017 +0300
----------------------------------------------------------------------
.../apache/ignite/IgniteSystemProperties.java | 3 +
.../communication/tcp/TcpCommunicationSpi.java | 16 +
.../ignite/spi/discovery/tcp/ClientImpl.java | 88 ++++-
.../ignite/spi/discovery/tcp/ServerImpl.java | 61 ++--
.../messages/TcpDiscoveryAbstractMessage.java | 21 ++
.../tcp/TcpCommunicationSpiDropNodesTest.java | 322 +++++++++++++++++++
.../TcpCommunicationSpiFaultyClientTest.java | 270 ++++++++++++++++
.../ignite/testframework/GridTestNode.java | 1 +
.../testframework/junits/GridAbstractTest.java | 2 +
.../IgniteSpiCommunicationSelfTestSuite.java | 5 +
.../cache/IgniteCacheAbstractQuerySelfTest.java | 8 +-
11 files changed, 758 insertions(+), 39 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/f9aaf035/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java b/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
index 0da0f49..d77b2fb 100644
--- a/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
+++ b/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
@@ -65,6 +65,9 @@ public final class IgniteSystemProperties {
*/
public static final String IGNITE_NO_DISCO_ORDER = "IGNITE_NO_DISCO_ORDER";
+ /** Defines reconnect delay in milliseconds for client node that was failed forcible. */
+ public static final String IGNITE_DISCO_FAILED_CLIENT_RECONNECT_DELAY = "IGNITE_DISCO_FAILED_CLIENT_RECONNECT_DELAY";
+
/**
* If this system property is set to {@code false} - no checks for new versions will
* be performed by Ignite. By default, Ignite periodically checks for the new
http://git-wip-us.apache.org/repos/asf/ignite/blob/f9aaf035/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
index 1fe437c..94b7efe 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
@@ -94,6 +94,7 @@ import org.apache.ignite.internal.util.nio.ssl.GridSslMeta;
import org.apache.ignite.internal.util.typedef.CI2;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.X;
+import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.LT;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
@@ -2550,6 +2551,21 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
"operating system firewall is disabled on local and remote hosts) " +
"[addrs=" + addrs + ']');
+ if (getSpiContext().node(node.id()) != null && (CU.clientNode(node) || !CU.clientNode(getLocalNode())) &&
+ X.hasCause(errs, ConnectException.class, SocketTimeoutException.class, HandshakeTimeoutException.class,
+ IgniteSpiOperationTimeoutException.class)) {
+ LT.warn(log, "TcpCommunicationSpi failed to establish connection to node, node will be dropped from " +
+ "cluster [" +
+ "rmtNode=" + node +
+ ", err=" + errs +
+ ", connectErrs=" + Arrays.toString(errs.getSuppressed()) + ']');
+
+ getSpiContext().failNode(node.id(), "TcpCommunicationSpi failed to establish connection to node [" +
+ "rmtNode=" + node +
+ ", errs=" + errs +
+ ", connectErrs=" + Arrays.toString(errs.getSuppressed()) + ']');
+ }
+
throw errs;
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/f9aaf035/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
index 9a1261c..35f0908 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
@@ -50,6 +50,7 @@ import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteClientDisconnectedException;
import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.IgniteSystemProperties;
import org.apache.ignite.cache.CacheMetrics;
import org.apache.ignite.cluster.ClusterMetrics;
import org.apache.ignite.cluster.ClusterNode;
@@ -100,6 +101,7 @@ import org.jetbrains.annotations.Nullable;
import org.jsr166.ConcurrentHashMap8;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static org.apache.ignite.IgniteSystemProperties.IGNITE_DISCO_FAILED_CLIENT_RECONNECT_DELAY;
import static org.apache.ignite.events.EventType.EVT_CLIENT_NODE_DISCONNECTED;
import static org.apache.ignite.events.EventType.EVT_CLIENT_NODE_RECONNECTED;
import static org.apache.ignite.events.EventType.EVT_NODE_FAILED;
@@ -166,6 +168,9 @@ class ClientImpl extends TcpDiscoveryImpl {
/** */
protected MessageWorker msgWorker;
+ /** Force fail message for local node. */
+ private TcpDiscoveryNodeFailedMessage forceFailMsg;
+
/** */
@GridToStringExclude
private int joinCnt;
@@ -450,6 +455,8 @@ class ClientImpl extends TcpDiscoveryImpl {
msg.warning(warning);
+ msg.force(true);
+
msgWorker.addMessage(msg);
}
}
@@ -1396,6 +1403,14 @@ class ClientImpl extends TcpDiscoveryImpl {
else
leaveLatch.countDown();
}
+ else if (msg instanceof TcpDiscoveryNodeFailedMessage &&
+ ((TcpDiscoveryNodeFailedMessage)msg).failedNodeId().equals(locNode.id())) {
+ TcpDiscoveryNodeFailedMessage msg0 = (TcpDiscoveryNodeFailedMessage)msg;
+
+ assert msg0.force() : msg0;
+
+ forceFailMsg = msg0;
+ }
else if (msg instanceof SocketClosedMessage) {
if (((SocketClosedMessage)msg).sock == currSock) {
currSock = null;
@@ -1412,25 +1427,45 @@ class ClientImpl extends TcpDiscoveryImpl {
}
}
else {
- if (log.isDebugEnabled())
- log.debug("Connection closed, will try to restore connection.");
+ if (forceFailMsg != null) {
+ if (log.isDebugEnabled()) {
+ log.debug("Connection closed, local node received force fail message, " +
+ "will not try to restore connection");
+ }
+
+ queue.addFirst(SPI_RECONNECT_FAILED);
+ }
+ else {
+ if (log.isDebugEnabled())
+ log.debug("Connection closed, will try to restore connection.");
- assert reconnector == null;
+ assert reconnector == null;
- final Reconnector reconnector = new Reconnector(join);
- this.reconnector = reconnector;
- reconnector.start();
+ final Reconnector reconnector = new Reconnector(join);
+ this.reconnector = reconnector;
+ reconnector.start();
+ }
}
}
}
else if (msg == SPI_RECONNECT_FAILED) {
- reconnector.cancel();
- reconnector.join();
+ if (reconnector != null) {
+ reconnector.cancel();
+ reconnector.join();
- reconnector = null;
+ reconnector = null;
+ }
+ else
+ assert forceFailMsg != null;
if (spi.isClientReconnectDisabled()) {
if (state != SEGMENTED && state != STOPPED) {
+ if (forceFailMsg != null) {
+ U.quietAndWarn(log, "Local node was dropped from cluster due to network problems " +
+ "[nodeInitiatedFail=" + forceFailMsg.creatorNodeId() +
+ ", msg=" + forceFailMsg.warning() + ']');
+ }
+
if (log.isDebugEnabled()) {
log.debug("Failed to restore closed connection, reconnect disabled, " +
"local node segmented [networkTimeout=" + spi.netTimeout + ']');
@@ -1445,7 +1480,9 @@ class ClientImpl extends TcpDiscoveryImpl {
if (state == STARTING || state == CONNECTED) {
if (log.isDebugEnabled()) {
log.debug("Failed to restore closed connection, will try to reconnect " +
- "[networkTimeout=" + spi.netTimeout + ", joinTimeout=" + spi.joinTimeout + ']');
+ "[networkTimeout=" + spi.netTimeout +
+ ", joinTimeout=" + spi.joinTimeout +
+ ", failMsg=" + forceFailMsg + ']');
}
state = DISCONNECTED;
@@ -1468,7 +1505,36 @@ class ClientImpl extends TcpDiscoveryImpl {
UUID newId = UUID.randomUUID();
- if (log.isInfoEnabled()) {
+ if (forceFailMsg != null) {
+ long delay = IgniteSystemProperties.getLong(IGNITE_DISCO_FAILED_CLIENT_RECONNECT_DELAY,
+ 10_000);
+
+ if (delay > 0) {
+ U.quietAndWarn(log, "Local node was dropped from cluster due to network problems, " +
+ "will try to reconnect with new id after " + delay + "ms (reconnect delay " +
+ "can be changed using IGNITE_DISCO_FAILED_CLIENT_RECONNECT_DELAY system " +
+ "property) [" +
+ "newId=" + newId +
+ ", prevId=" + locNode.id() +
+ ", locNode=" + locNode +
+ ", nodeInitiatedFail=" + forceFailMsg.creatorNodeId() +
+ ", msg=" + forceFailMsg.warning() + ']');
+
+ Thread.sleep(delay);
+ }
+ else {
+ U.quietAndWarn(log, "Local node was dropped from cluster due to network problems, " +
+ "will try to reconnect with new id [" +
+ "newId=" + newId +
+ ", prevId=" + locNode.id() +
+ ", locNode=" + locNode +
+ ", nodeInitiatedFail=" + forceFailMsg.creatorNodeId() +
+ ", msg=" + forceFailMsg.warning() + ']');
+ }
+
+ forceFailMsg = null;
+ }
+ else if (log.isInfoEnabled()) {
log.info("Client node disconnected from cluster, will try to reconnect with new id " +
"[newId=" + newId + ", prevId=" + locNode.id() + ", locNode=" + locNode + ']');
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/f9aaf035/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
index 40da281..f33566c 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
@@ -775,6 +775,8 @@ class ServerImpl extends TcpDiscoveryImpl {
msg.warning(warning);
+ msg.force(true);
+
msgWorker.addMessage(msg);
}
}
@@ -4610,8 +4612,12 @@ class ServerImpl extends TcpDiscoveryImpl {
else {
boolean contains;
+ UUID creatorId = msg.creatorNodeId();
+
+ assert creatorId != null : msg;
+
synchronized (mux) {
- contains = failedNodes.containsKey(sndNode);
+ contains = failedNodes.containsKey(sndNode) || ring.node(creatorId) == null;
}
if (contains) {
@@ -4623,25 +4629,29 @@ class ServerImpl extends TcpDiscoveryImpl {
}
}
- UUID nodeId = msg.failedNodeId();
+ UUID failedNodeId = msg.failedNodeId();
long order = msg.order();
- TcpDiscoveryNode node = ring.node(nodeId);
+ TcpDiscoveryNode failedNode = ring.node(failedNodeId);
- if (node != null && node.internalOrder() != order) {
+ if (failedNode != null && failedNode.internalOrder() != order) {
if (log.isDebugEnabled())
log.debug("Ignoring node failed message since node internal order does not match " +
- "[msg=" + msg + ", node=" + node + ']');
+ "[msg=" + msg + ", node=" + failedNode + ']');
return;
}
- if (node != null) {
- assert !node.isLocal() || !msg.verified() : msg;
+ if (failedNode != null) {
+ assert !failedNode.isLocal() || !msg.verified() : msg;
- synchronized (mux) {
- if (!failedNodes.containsKey(node))
- failedNodes.put(node, msg.senderNodeId() != null ? msg.senderNodeId() : getLocalNodeId());
+ boolean skipUpdateFailedNodes = msg.force() && !msg.verified();
+
+ if (!skipUpdateFailedNodes) {
+ synchronized (mux) {
+ if (!failedNodes.containsKey(failedNode))
+ failedNodes.put(failedNode, msg.senderNodeId() != null ? msg.senderNodeId() : getLocalNodeId());
+ }
}
}
else {
@@ -4668,11 +4678,11 @@ class ServerImpl extends TcpDiscoveryImpl {
}
if (msg.verified()) {
- node = ring.removeNode(nodeId);
+ failedNode = ring.removeNode(failedNodeId);
- interruptPing(node);
+ interruptPing(failedNode);
- assert node != null;
+ assert failedNode != null;
long topVer;
@@ -4698,16 +4708,18 @@ class ServerImpl extends TcpDiscoveryImpl {
}
synchronized (mux) {
- failedNodes.remove(node);
+ failedNodes.remove(failedNode);
- leavingNodes.remove(node);
+ leavingNodes.remove(failedNode);
- failedNodesMsgSent.remove(node.id());
+ failedNodesMsgSent.remove(failedNode.id());
- ClientMessageWorker worker = clientMsgWorkers.remove(node.id());
+ if (!msg.force()) { // ClientMessageWorker will stop after sending force fail message.
+ ClientMessageWorker worker = clientMsgWorkers.remove(failedNode.id());
- if (worker != null)
- worker.interrupt();
+ if (worker != null)
+ worker.interrupt();
+ }
}
if (msg.warning() != null && !msg.creatorNodeId().equals(getLocalNodeId())) {
@@ -4719,10 +4731,10 @@ class ServerImpl extends TcpDiscoveryImpl {
}
synchronized (mux) {
- joiningNodes.remove(node.id());
+ joiningNodes.remove(failedNode.id());
}
- notifyDiscovery(EVT_NODE_FAILED, topVer, node);
+ notifyDiscovery(EVT_NODE_FAILED, topVer, failedNode);
spi.stats.onNodeFailed();
}
@@ -6317,7 +6329,12 @@ class ServerImpl extends TcpDiscoveryImpl {
spi.failureDetectionTimeout() : spi.getSocketTimeout());
}
- success = true;
+ boolean clientFailed = msg instanceof TcpDiscoveryNodeFailedMessage &&
+ ((TcpDiscoveryNodeFailedMessage)msg).failedNodeId().equals(clientNodeId);
+
+ assert !clientFailed || msg.force() : msg;
+
+ success = !clientFailed;
}
catch (IgniteCheckedException | IOException e) {
if (log.isDebugEnabled())
http://git-wip-us.apache.org/repos/asf/ignite/blob/f9aaf035/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryAbstractMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryAbstractMessage.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryAbstractMessage.java
index 783a113..e982b2f 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryAbstractMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryAbstractMessage.java
@@ -48,6 +48,9 @@ public abstract class TcpDiscoveryAbstractMessage implements Serializable {
/** */
protected static final int CLIENT_ACK_FLAG_POS = 4;
+ /** */
+ protected static final int FORCE_FAIL_FLAG_POS = 8;
+
/** Sender of the message (transient). */
private transient UUID sndNodeId;
@@ -205,6 +208,24 @@ public abstract class TcpDiscoveryAbstractMessage implements Serializable {
}
/**
+ * Get force fail node flag.
+ *
+ * @return Force fail node flag.
+ */
+ public boolean force() {
+ return getFlag(FORCE_FAIL_FLAG_POS);
+ }
+
+ /**
+ * Sets force fail node flag.
+ *
+ * @param force Force fail node flag.
+ */
+ public void force(boolean force) {
+ setFlag(FORCE_FAIL_FLAG_POS, force);
+ }
+
+ /**
* @return Pending message index.
*/
public short pendingIndex() {
http://git-wip-us.apache.org/repos/asf/ignite/blob/f9aaf035/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpiDropNodesTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpiDropNodesTest.java b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpiDropNodesTest.java
new file mode 100644
index 0000000..d29231e
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpiDropNodesTest.java
@@ -0,0 +1,322 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spi.communication.tcp;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.CyclicBarrier;
+import java.util.concurrent.TimeUnit;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.events.Event;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.util.lang.GridAbsPredicate;
+import org.apache.ignite.internal.util.nio.GridCommunicationClient;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteBiPredicate;
+import org.apache.ignite.lang.IgnitePredicate;
+import org.apache.ignite.lang.IgniteRunnable;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.internal.TcpDiscoveryNode;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+
+import static org.apache.ignite.events.EventType.EVT_NODE_FAILED;
+
+/**
+ *
+ */
+public class TcpCommunicationSpiDropNodesTest extends GridCommonAbstractTest {
+ /** IP finder. */
+ private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true);
+
+ /** Nodes count. */
+ private static final int NODES_CNT = 4;
+
+ /** Block. */
+ private static volatile boolean block;
+
+ /** Predicate. */
+ private static IgniteBiPredicate<ClusterNode, ClusterNode> pred;
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+ cfg.setClockSyncFrequency(300000);
+ cfg.setFailureDetectionTimeout(1000);
+
+ TestCommunicationSpi spi = new TestCommunicationSpi();
+
+ spi.setIdleConnectionTimeout(100);
+ spi.setSharedMemoryPort(-1);
+
+ TcpDiscoverySpi discoSpi = (TcpDiscoverySpi) cfg.getDiscoverySpi();
+ discoSpi.setIpFinder(IP_FINDER);
+
+ cfg.setCommunicationSpi(spi);
+ cfg.setDiscoverySpi(discoSpi);
+
+ return cfg;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void beforeTest() throws Exception {
+ super.beforeTest();
+
+ block = false;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTest() throws Exception {
+ super.afterTest();
+
+ stopAllGrids();
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testOneNode() throws Exception {
+ pred = new IgniteBiPredicate<ClusterNode, ClusterNode>() {
+ @Override public boolean apply(ClusterNode locNode, ClusterNode rmtNode) {
+ return block && rmtNode.order() == 3;
+ }
+ };
+
+ startGrids(NODES_CNT);
+
+ final CountDownLatch latch = new CountDownLatch(1);
+
+ grid(0).events().localListen(new IgnitePredicate<Event>() {
+ @Override
+ public boolean apply(Event event) {
+ latch.countDown();
+
+ return true;
+ }
+ }, EVT_NODE_FAILED);
+
+ U.sleep(1000); // Wait for write timeout and closing idle connections.
+
+ block = true;
+
+ grid(0).compute().broadcast(new IgniteRunnable() {
+ @Override public void run() {
+ // No-op.
+ }
+ });
+
+ assertTrue(latch.await(15, TimeUnit.SECONDS));
+
+ assertTrue(GridTestUtils.waitForCondition(new GridAbsPredicate() {
+ @Override public boolean apply() {
+ return grid(3).cluster().topologyVersion() == NODES_CNT + 1;
+ }
+ }, 5000));
+
+ for (int i = 0; i < 10; i++) {
+ U.sleep(1000);
+
+ assertEquals(NODES_CNT - 1, grid(0).cluster().nodes().size());
+
+ int liveNodesCnt = 0;
+
+ for (int j = 0; j < NODES_CNT; j++) {
+ IgniteEx ignite;
+
+ try {
+ ignite = grid(j);
+
+ log.info("Checking topology for grid(" + j + "): " + ignite.cluster().nodes());
+
+ ClusterNode locNode = ignite.localNode();
+
+ if (locNode.order() != 3) {
+ assertEquals(NODES_CNT - 1, ignite.cluster().nodes().size());
+
+ for (ClusterNode node : ignite.cluster().nodes())
+ assertTrue(node.order() != 3);
+
+ liveNodesCnt++;
+ }
+ }
+ catch (Exception e) {
+ log.info("Checking topology for grid(" + j + "): no grid in topology.");
+ }
+ }
+
+ assertEquals(NODES_CNT - 1, liveNodesCnt);
+ }
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testTwoNodesEachOther() throws Exception {
+ pred = new IgniteBiPredicate<ClusterNode, ClusterNode>() {
+ @Override public boolean apply(ClusterNode locNode, ClusterNode rmtNode) {
+ return block && (locNode.order() == 2 || locNode.order() == 4) &&
+ (rmtNode.order() == 2 || rmtNode.order() == 4);
+ }
+ };
+
+ startGrids(NODES_CNT);
+
+ final CountDownLatch latch = new CountDownLatch(1);
+
+ grid(0).events().localListen(new IgnitePredicate<Event>() {
+ @Override
+ public boolean apply(Event event) {
+ latch.countDown();
+
+ return true;
+ }
+ }, EVT_NODE_FAILED);
+
+ U.sleep(1000); // Wait for write timeout and closing idle connections.
+
+ block = true;
+
+ final CyclicBarrier barrier = new CyclicBarrier(2);
+
+ IgniteInternalFuture<Void> fut1 = GridTestUtils.runAsync(new Callable<Void>() {
+ @Override public Void call() throws Exception {
+ barrier.await();
+
+ grid(1).compute().withNoFailover().broadcast(new IgniteRunnable() {
+ @Override public void run() {
+ // No-op.
+ }
+ });
+
+ return null;
+ }
+ });
+
+ IgniteInternalFuture<Void> fut2 = GridTestUtils.runAsync(new Callable<Void>() {
+ @Override public Void call() throws Exception {
+ barrier.await();
+
+ grid(3).compute().withNoFailover().broadcast(new IgniteRunnable() {
+ @Override public void run() {
+ // No-op.
+ }
+ });
+
+ return null;
+ }
+ });
+
+ assertTrue(latch.await(5, TimeUnit.SECONDS));
+
+ GridTestUtils.waitForCondition(new GridAbsPredicate() {
+ @Override public boolean apply() {
+ return grid(2).cluster().nodes().size() == NODES_CNT - 1;
+ }
+ }, 5000);
+
+ try {
+ fut1.get();
+ }
+ catch (IgniteCheckedException e) {
+ // No-op.
+ }
+
+ try {
+ fut2.get();
+ }
+ catch (IgniteCheckedException e) {
+ // No-op.
+ }
+
+ long failedNodeOrder = 1 + 2 + 3 + 4;
+
+ for (ClusterNode node : grid(0).cluster().nodes())
+ failedNodeOrder -= node.order();
+
+ for (int i = 0; i < 10; i++) {
+ U.sleep(1000);
+
+ assertEquals(NODES_CNT - 1, grid(0).cluster().nodes().size());
+
+ int liveNodesCnt = 0;
+
+ for (int j = 0; j < NODES_CNT; j++) {
+ IgniteEx ignite;
+
+ try {
+ ignite = grid(j);
+
+ log.info("Checking topology for grid(" + j + "): " + ignite.cluster().nodes());
+
+ ClusterNode locNode = ignite.localNode();
+
+ if (locNode.order() != failedNodeOrder) {
+ assertEquals(NODES_CNT - 1, ignite.cluster().nodes().size());
+
+ for (ClusterNode node : ignite.cluster().nodes())
+ assertTrue(node.order() != failedNodeOrder);
+
+ liveNodesCnt++;
+ }
+ }
+ catch (Exception e) {
+ log.info("Checking topology for grid(" + j + "): no grid in topology.");
+ }
+ }
+
+ assertEquals(NODES_CNT - 1, liveNodesCnt);
+ }
+ }
+
+ /**
+ *
+ */
+ private static class TestCommunicationSpi extends TcpCommunicationSpi {
+ /** {@inheritDoc} */
+ @Override protected GridCommunicationClient createTcpClient(ClusterNode node) throws IgniteCheckedException {
+ if (pred.apply(getLocalNode(), node)) {
+ Map<String, Object> attrs = new HashMap<>(node.attributes());
+
+ attrs.put(createAttributeName(ATTR_ADDRS), Collections.singleton("127.0.0.1"));
+ attrs.put(createAttributeName(ATTR_PORT), 47200);
+ attrs.put(createAttributeName(ATTR_EXT_ADDRS), Collections.emptyList());
+ attrs.put(createAttributeName(ATTR_HOST_NAMES), Collections.emptyList());
+
+ ((TcpDiscoveryNode)node).setAttributes(attrs);
+ }
+
+ return super.createTcpClient(node);
+ }
+
+ /**
+ * @param name Name.
+ */
+ private String createAttributeName(String name) {
+ return getClass().getSimpleName() + '.' + name;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/f9aaf035/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpiFaultyClientTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpiFaultyClientTest.java b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpiFaultyClientTest.java
new file mode 100644
index 0000000..6e99487
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpiFaultyClientTest.java
@@ -0,0 +1,270 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spi.communication.tcp;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.net.InetAddress;
+import java.net.ServerSocket;
+import java.net.Socket;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.events.Event;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.IgniteInterruptedCheckedException;
+import org.apache.ignite.internal.util.lang.GridAbsPredicate;
+import org.apache.ignite.internal.util.nio.GridCommunicationClient;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgnitePredicate;
+import org.apache.ignite.lang.IgniteRunnable;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.internal.TcpDiscoveryNode;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryAbstractMessage;
+import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryNodeFailedMessage;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+
+import static org.apache.ignite.events.EventType.EVT_NODE_FAILED;
+
+/**
+ * Tests that faulty client will be failed if connection can't be established.
+ */
+public class TcpCommunicationSpiFaultyClientTest extends GridCommonAbstractTest {
+ /** */
+ private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true);
+
+ /** Predicate. */
+ private static final IgnitePredicate<ClusterNode> PRED = new IgnitePredicate<ClusterNode>() {
+ @Override public boolean apply(ClusterNode node) {
+ return block && node.order() == 3;
+ }
+ };
+
+ /** Client mode. */
+ private static boolean clientMode;
+
+ /** Block. */
+ private static volatile boolean block;
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+ cfg.setClockSyncFrequency(300000);
+ cfg.setFailureDetectionTimeout(1000);
+ cfg.setClientMode(clientMode);
+
+ TestCommunicationSpi spi = new TestCommunicationSpi();
+
+ spi.setIdleConnectionTimeout(100);
+ spi.setSharedMemoryPort(-1);
+
+ TcpDiscoverySpi discoSpi = (TcpDiscoverySpi) cfg.getDiscoverySpi();
+
+ discoSpi.setIpFinder(IP_FINDER);
+ discoSpi.setClientReconnectDisabled(true);
+
+ cfg.setCommunicationSpi(spi);
+ cfg.setDiscoverySpi(discoSpi);
+
+ return cfg;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void beforeTest() throws Exception {
+ super.beforeTest();
+
+ block = false;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTest() throws Exception {
+ super.afterTest();
+
+ stopAllGrids();
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testNoServerOnHost() throws Exception {
+ testFailClient(null);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testNotAcceptedConnection() throws Exception {
+ testFailClient(new FakeServer());
+ }
+
+ /**
+ * @param srv Server.
+ * @throws Exception If failed.
+ */
+ private void testFailClient(FakeServer srv) throws Exception {
+ IgniteInternalFuture<Long> fut = null;
+
+ try {
+ if (srv != null)
+ fut = GridTestUtils.runMultiThreadedAsync(srv, 1, "fake-server");
+
+ clientMode = false;
+
+ startGrids(2);
+
+ clientMode = true;
+
+ startGrid(2);
+ startGrid(3);
+
+ U.sleep(1000); // Wait for write timeout and closing idle connections.
+
+ final CountDownLatch latch = new CountDownLatch(1);
+
+ grid(0).events().localListen(new IgnitePredicate<Event>() {
+ @Override
+ public boolean apply(Event event) {
+ latch.countDown();
+
+ return true;
+ }
+ }, EVT_NODE_FAILED);
+
+ block = true;
+
+ try {
+ grid(0).compute(grid(0).cluster().forClients()).withNoFailover().broadcast(new IgniteRunnable() {
+ @Override public void run() {
+ // No-op.
+ }
+ });
+ }
+ catch (IgniteException e) {
+ // No-op.
+ }
+
+ assertTrue(latch.await(3, TimeUnit.SECONDS));
+
+ assertTrue(GridTestUtils.waitForCondition(new GridAbsPredicate() {
+ @Override public boolean apply() {
+ return grid(0).cluster().forClients().nodes().size() == 1;
+ }
+ }, 5000));
+
+ for (int i = 0; i < 5; i++) {
+ U.sleep(1000);
+
+ log.info("Check topology (" + (i + 1) + "): " + grid(0).cluster().nodes());
+
+ assertEquals(1, grid(0).cluster().forClients().nodes().size());
+ }
+ }
+ finally {
+ if (srv != null) {
+ srv.stop();
+
+ assert fut != null;
+
+ fut.get();
+ }
+
+ stopAllGrids();
+ }
+ }
+
+ /**
+ * Server that emulates connection troubles.
+ */
+ private static class FakeServer implements Runnable {
+ /** Server. */
+ private final ServerSocket srv;
+
+ /** Stop. */
+ private volatile boolean stop;
+
+ /**
+ * Default constructor.
+ */
+ FakeServer() throws IOException {
+ this.srv = new ServerSocket(47200, 50, InetAddress.getByName("127.0.0.1"));
+ }
+
+ /**
+ *
+ */
+ public void stop() {
+ stop = true;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void run() {
+ try {
+ while (!stop) {
+ try {
+ U.sleep(10);
+ }
+ catch (IgniteInterruptedCheckedException e) {
+ // No-op.
+ }
+ }
+ }
+ finally {
+ U.closeQuiet(srv);
+ }
+ }
+ }
+
+ /**
+ *
+ */
+ private static class TestCommunicationSpi extends TcpCommunicationSpi {
+ /** {@inheritDoc} */
+ @Override protected GridCommunicationClient createTcpClient(ClusterNode node) throws IgniteCheckedException {
+ if (PRED.apply(node)) {
+ Map<String, Object> attrs = new HashMap<>(node.attributes());
+
+ attrs.put(createAttributeName(ATTR_ADDRS), Collections.singleton("127.0.0.1"));
+ attrs.put(createAttributeName(ATTR_PORT), 47200);
+ attrs.put(createAttributeName(ATTR_EXT_ADDRS), Collections.emptyList());
+ attrs.put(createAttributeName(ATTR_HOST_NAMES), Collections.emptyList());
+
+ ((TcpDiscoveryNode)node).setAttributes(attrs);
+ }
+
+ return super.createTcpClient(node);
+ }
+
+ /**
+ * @param name Name.
+ */
+ private String createAttributeName(String name) {
+ return getClass().getSimpleName() + '.' + name;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/f9aaf035/modules/core/src/test/java/org/apache/ignite/testframework/GridTestNode.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/GridTestNode.java b/modules/core/src/test/java/org/apache/ignite/testframework/GridTestNode.java
index e0e8eba..6365443 100644
--- a/modules/core/src/test/java/org/apache/ignite/testframework/GridTestNode.java
+++ b/modules/core/src/test/java/org/apache/ignite/testframework/GridTestNode.java
@@ -82,6 +82,7 @@ public class GridTestNode extends GridMetadataAwareAdapter implements ClusterNod
private void initAttributes() {
attrs.put(IgniteNodeAttributes.ATTR_BUILD_VER, "10");
attrs.put(IgniteNodeAttributes.ATTR_GRID_NAME, "null");
+ attrs.put(IgniteNodeAttributes.ATTR_CLIENT_MODE, false);
}
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/f9aaf035/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java
index 9f507e6..ffaec4f 100644
--- a/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java
@@ -102,6 +102,7 @@ import org.springframework.beans.BeansException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.support.FileSystemXmlApplicationContext;
+import static org.apache.ignite.IgniteSystemProperties.IGNITE_DISCO_FAILED_CLIENT_RECONNECT_DELAY;
import static org.apache.ignite.cache.CacheAtomicWriteOrderMode.PRIMARY;
import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
@@ -173,6 +174,7 @@ public abstract class GridAbstractTest extends TestCase {
static {
System.setProperty(IgniteSystemProperties.IGNITE_ATOMIC_CACHE_DELETE_HISTORY_SIZE, "10000");
System.setProperty(IgniteSystemProperties.IGNITE_UPDATE_NOTIFIER, "false");
+ System.setProperty(IGNITE_DISCO_FAILED_CLIENT_RECONNECT_DELAY, "1");
if (BINARY_MARSHALLER)
GridTestProperties.setProperty(GridTestProperties.MARSH_CLASS_NAME, BinaryMarshaller.class.getName());
http://git-wip-us.apache.org/repos/asf/ignite/blob/f9aaf035/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiCommunicationSelfTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiCommunicationSelfTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiCommunicationSelfTestSuite.java
index c557fbb..ddc2551 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiCommunicationSelfTestSuite.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiCommunicationSelfTestSuite.java
@@ -35,6 +35,8 @@ import org.apache.ignite.spi.communication.tcp.GridTcpCommunicationSpiTcpFailure
import org.apache.ignite.spi.communication.tcp.GridTcpCommunicationSpiTcpNoDelayOffSelfTest;
import org.apache.ignite.spi.communication.tcp.GridTcpCommunicationSpiTcpSelfTest;
import org.apache.ignite.spi.communication.tcp.IgniteTcpCommunicationRecoveryAckClosureSelfTest;
+import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpiDropNodesTest;
+import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpiFaultyClientTest;
/**
* Test suite for all communication SPIs.
@@ -72,6 +74,9 @@ public class IgniteSpiCommunicationSelfTestSuite extends TestSuite {
suite.addTest(new TestSuite(GridTcpCommunicationSpiConfigSelfTest.class));
+ suite.addTest(new TestSuite(TcpCommunicationSpiFaultyClientTest.class));
+ suite.addTest(new TestSuite(TcpCommunicationSpiDropNodesTest.class));
+
return suite;
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/f9aaf035/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAbstractQuerySelfTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAbstractQuerySelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAbstractQuerySelfTest.java
index 7e0d20b..9f56877 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAbstractQuerySelfTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAbstractQuerySelfTest.java
@@ -633,7 +633,7 @@ public abstract class IgniteCacheAbstractQuerySelfTest extends GridCommonAbstrac
assertEquals(2, qry.getAll().size());
- Throwable throwable = GridTestUtils.assertThrowsInherited(log, new GridPlainCallable<Void>() {
+ GridTestUtils.assertThrows(log, new GridPlainCallable<Void>() {
@Override public Void call() throws Exception {
QueryCursor<Cache.Entry<Integer, Type1>> qry =
cache.query(new SqlQuery<Integer, Type1>(Type1.class, "FROM Type1"));
@@ -642,11 +642,7 @@ public abstract class IgniteCacheAbstractQuerySelfTest extends GridCommonAbstrac
return null;
}
- }, RuntimeException.class, null);
-
- assertNotNull(throwable);
-
- assertTrue(throwable instanceof IgniteException || throwable instanceof CacheException);
+ }, CacheException.class, null);
}
/**