You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vo...@apache.org on 2018/04/10 09:52:44 UTC
[2/2] ignite git commit: IGNITE-7944: Disconnected client node tries
to send JOB_CANCEL message. Applied fix: - Skip sending message if client
disconnected;
- Throw IgniteCheckedException if a client node is disconnected and
communication client is null.
IGNITE-7944: Disconnected client node tries to send JOB_CANCEL message. Applied fix:
- Skip sending message if client disconnected;
- Throw IgniteCheckedException if a client node is disconnected and communication client is null.
This closes #3737.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/82a4c024
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/82a4c024
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/82a4c024
Branch: refs/heads/ignite-2.5
Commit: 82a4c024fe06ef8c8deeaf762f0cc20a8e481252
Parents: b096a46
Author: Roman Guseinov <gr...@gmail.com>
Authored: Mon Apr 9 14:45:44 2018 +0300
Committer: devozerov <vo...@gridgain.com>
Committed: Tue Apr 10 12:49:38 2018 +0300
----------------------------------------------------------------------
.../processors/task/GridTaskProcessor.java | 2 +-
.../communication/tcp/TcpCommunicationSpi.java | 26 +-
.../TcpCommunicationSpiSkipMessageSendTest.java | 414 +++++++++++++++++++
.../IgniteSpiCommunicationSelfTestSuite.java | 3 +
4 files changed, 442 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/82a4c024/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskProcessor.java
index d27e116..2f0aa7b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskProcessor.java
@@ -171,7 +171,7 @@ public class GridTaskProcessor extends GridProcessorAdapter implements IgniteCha
IgniteClientDisconnectedCheckedException err = disconnectedError(reconnectFut);
for (GridTaskWorker<?, ?> worker : tasks.values())
- worker.finishTask(null, err);
+ worker.finishTask(null, err, false);
}
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/82a4c024/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
index a3fccbc..4a0710e 100755
--- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
@@ -65,6 +65,7 @@ import org.apache.ignite.events.Event;
import org.apache.ignite.internal.IgniteClientDisconnectedCheckedException;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.IgniteInterruptedCheckedException;
+import org.apache.ignite.internal.IgniteKernal;
import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
import org.apache.ignite.internal.managers.discovery.IgniteDiscoverySpi;
import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener;
@@ -2656,6 +2657,11 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati
if (log.isTraceEnabled())
log.trace("Sending message with ack to node [node=" + node + ", msg=" + msg + ']');
+ if (isLocalNodeDisconnected()) {
+ throw new IgniteSpiException("Failed to send a message to remote node because local node has " +
+ "been disconnected [rmtNodeId=" + node.id() + ']');
+ }
+
ClusterNode locNode = getLocalNode();
if (locNode == null)
@@ -2709,6 +2715,18 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati
}
/**
+ * @return {@code True} if local node in disconnected state.
+ */
+ private boolean isLocalNodeDisconnected() {
+ boolean disconnected = false;
+
+ if (ignite instanceof IgniteKernal)
+ disconnected = ((IgniteKernal)ignite).context().clientDisconnected();
+
+ return disconnected;
+ }
+
+ /**
* @param nodeId Node ID.
* @param rmvClient Client to remove.
* @return {@code True} if client was removed.
@@ -2853,8 +2871,12 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati
client = fut.get();
- if (client == null)
- continue;
+ if (client == null) {
+ if (isLocalNodeDisconnected())
+ throw new IgniteCheckedException("Unable to create TCP client due to local node disconnecting.");
+ else
+ continue;
+ }
if (getSpiContext().node(nodeId) == null) {
if (removeNodeClient(nodeId, client))
http://git-wip-us.apache.org/repos/asf/ignite/blob/82a4c024/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpiSkipMessageSendTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpiSkipMessageSendTest.java b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpiSkipMessageSendTest.java
new file mode 100644
index 0000000..c4bc8f2
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpiSkipMessageSendTest.java
@@ -0,0 +1,414 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spi.communication.tcp;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.Socket;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteCompute;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.Ignition;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.events.Event;
+import org.apache.ignite.events.EventType;
+import org.apache.ignite.internal.IgniteInterruptedCheckedException;
+import org.apache.ignite.internal.util.nio.GridCommunicationClient;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteCallable;
+import org.apache.ignite.lang.IgniteInClosure;
+import org.apache.ignite.lang.IgnitePredicate;
+import org.apache.ignite.plugin.extensions.communication.Message;
+import org.apache.ignite.spi.IgniteSpiException;
+import org.apache.ignite.spi.IgniteSpiOperationTimeoutHelper;
+import org.apache.ignite.spi.collision.fifoqueue.FifoQueueCollisionSpi;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryAbstractMessage;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Tests that the client will be segmented in time and won't hang due to canceling compute jobs.
+ */
+public class TcpCommunicationSpiSkipMessageSendTest extends GridCommonAbstractTest {
+ /** */
+ private static final CountDownLatch COMPUTE_JOB_STARTED = new CountDownLatch(1);
+
+ /** */
+ private static final long FAILURE_DETECTION_TIMEOUT = 10000;
+
+ /** */
+ private static final long JOIN_TIMEOUT = 10000;
+
+ /** */
+ private static final long START_JOB_TIMEOUT = 10000;
+
+ /** */
+ private static final long DISABLE_NETWORK_DELAY = 2000;
+
+ /** {@inheritDoc} */
+ @Override protected long getTestTimeout() {
+ return 2 * 60 * 1000;
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testClientSegmented() throws Exception {
+ Ignite server = null;
+ Ignite client = null;
+
+ try {
+ server = Ignition.start(getConfig(false));
+
+ final CountDownLatch clientDisconnected = new CountDownLatch(1);
+ final CountDownLatch clientSegmented = new CountDownLatch(1);
+
+ client = startClient(clientDisconnected, clientSegmented);
+
+ final IgniteCompute compute = client.compute();
+
+ runJobAsync(compute);
+
+ if (!COMPUTE_JOB_STARTED.await(START_JOB_TIMEOUT, TimeUnit.MILLISECONDS))
+ fail("Compute job wasn't started.");
+
+ disableNetwork(client);
+
+ if (!clientDisconnected.await(FAILURE_DETECTION_TIMEOUT * 3, TimeUnit.MILLISECONDS))
+ fail("Client wasn't disconnected.");
+
+ if (!clientSegmented.await(JOIN_TIMEOUT * 2, TimeUnit.MILLISECONDS))
+ fail("Client wasn't segmented.");
+ }
+ finally {
+ if (client != null)
+ client.close();
+
+ if (server != null)
+ server.close();
+ }
+ }
+
+ /**
+ * Simulate network disabling.
+ *
+ * @param ignite Ignite instance.
+ * @throws IgniteInterruptedCheckedException If thread sleep interrupted.
+ * @throws InterruptedException If waiting for network disabled failed (interrupted).
+ */
+ private void disableNetwork(Ignite ignite) throws IgniteInterruptedCheckedException, InterruptedException {
+ U.sleep(DISABLE_NETWORK_DELAY);
+
+ CustomCommunicationSpi communicationSpi = (CustomCommunicationSpi)ignite.configuration().getCommunicationSpi();
+
+ CustomDiscoverySpi discoverySpi = (CustomDiscoverySpi)ignite.configuration().getDiscoverySpi();
+
+ discoverySpi.disableNetwork();
+
+ communicationSpi.disableNetwork();
+
+ if (!discoverySpi.awaitNetworkDisabled(FAILURE_DETECTION_TIMEOUT * 2))
+ fail("Network wasn't disabled.");
+ }
+
+ /**
+ * Start compute jobs in the separate thread.
+ *
+ * @param compute Ignite compute instance.
+ */
+ private void runJobAsync(final IgniteCompute compute) {
+ new Thread(new Runnable() {
+ @Override public void run() {
+ try {
+ compute.call(new IgniteCallable<Integer>() {
+ @Override public Integer call() throws Exception {
+ COMPUTE_JOB_STARTED.countDown();
+
+ // Simulate long-running job.
+ new CountDownLatch(1).await();
+
+ return null;
+ }
+ });
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+ }).start();
+ }
+
+ /**
+ * Create Communication Spi instance.
+ *
+ * @param client Is a client node.
+ * @return Communication Spi.
+ */
+ private TcpCommunicationSpi getCommunicationSpi(boolean client) {
+ TcpCommunicationSpi spi = new CustomCommunicationSpi(client);
+
+ spi.setName("CustomCommunicationSpi");
+
+ return spi;
+ }
+
+ /**
+ * Create Discovery Spi instance.
+ *
+ * @return Discovery Spi.
+ */
+ private TcpDiscoverySpi getDiscoverySpi() {
+ TcpDiscoverySpi spi = new CustomDiscoverySpi();
+
+ spi.setName("CustomDiscoverySpi");
+
+ spi.setIpFinder(LOCAL_IP_FINDER);
+
+ return spi;
+ }
+
+ /**
+ * Create Ignite configuration.
+ *
+ * @param clientMode Client mode.
+ * @return Ignite configuration.
+ */
+ private IgniteConfiguration getConfig(boolean clientMode) {
+ IgniteConfiguration cfg = new IgniteConfiguration();
+
+ cfg.setIgniteInstanceName(clientMode ? "client-node" : "server-node");
+
+ cfg.setClientMode(clientMode);
+
+ cfg.setCommunicationSpi(getCommunicationSpi(clientMode));
+
+ if (!clientMode) {
+ cfg.setDiscoverySpi(getDiscoverySpi());
+
+ FifoQueueCollisionSpi collisionSpi = new FifoQueueCollisionSpi();
+
+ collisionSpi.setParallelJobsNumber(1);
+
+ cfg.setCollisionSpi(collisionSpi);
+ }
+ else {
+ cfg.setFailureDetectionTimeout(FAILURE_DETECTION_TIMEOUT);
+
+ cfg.setDiscoverySpi(getDiscoverySpi().setJoinTimeout(JOIN_TIMEOUT));
+ }
+
+ return cfg;
+ }
+
+ /**
+ * Start client node.
+ *
+ * @param clientDisconnected Client is disconnected.
+ * @param clientSegmented Client is segmented.
+ * @return Ignite instance.
+ */
+ private Ignite startClient(final CountDownLatch clientDisconnected, final CountDownLatch clientSegmented) {
+ Ignite ignite = Ignition.start(getConfig(true));
+
+ IgnitePredicate<Event> locLsnr = new IgnitePredicate<Event>() {
+ @Override public boolean apply(Event event) {
+ log.info("Client node received event: " + event.name());
+
+ if (event.type() == EventType.EVT_CLIENT_NODE_DISCONNECTED)
+ clientDisconnected.countDown();
+
+ if (event.type() == EventType.EVT_NODE_SEGMENTED)
+ clientSegmented.countDown();
+
+ return true;
+ }
+ };
+
+ ignite.events().localListen(locLsnr,
+ EventType.EVT_NODE_SEGMENTED,
+ EventType.EVT_CLIENT_NODE_DISCONNECTED);
+
+ return ignite;
+ }
+
+ /**
+ * Communication Spi that emulates connection troubles.
+ */
+ class CustomCommunicationSpi extends TcpCommunicationSpi {
+ /** Network is disabled. */
+ private volatile boolean networkDisabled = false;
+
+ /** Additional logging is enabled. */
+ private final boolean logEnabled;
+
+ /**
+ * @param enableLogs Enable additional logging.
+ */
+ CustomCommunicationSpi(boolean enableLogs) {
+ super();
+ this.logEnabled = enableLogs;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void sendMessage(ClusterNode node, Message msg,
+ IgniteInClosure<IgniteException> ackC) throws IgniteSpiException {
+ String message = msg.toString();
+
+ if (logEnabled)
+ log.info("CustomCommunicationSpi.sendMessage: " + message);
+
+ if (message.contains("TOPIC_JOB_CANCEL"))
+ closeTcpConnections();
+
+ super.sendMessage(node, msg, ackC);
+ }
+
+ /** {@inheritDoc} */
+ @Override protected GridCommunicationClient createTcpClient(ClusterNode node, int connIdx) throws IgniteCheckedException {
+ if (logEnabled)
+ log.info(String.format("CustomCommunicationSpi.createTcpClient [networkDisabled=%s, node=%s]", networkDisabled, node));
+
+ if (networkDisabled) {
+ IgniteSpiOperationTimeoutHelper timeoutHelper = new IgniteSpiOperationTimeoutHelper(this, !node.isClient());
+
+ long timeout = timeoutHelper.nextTimeoutChunk(getConnectTimeout());
+
+ if (logEnabled)
+ log.info("CustomCommunicationSpi.createTcpClient [timeoutHelper.nextTimeoutChunk=" + timeout + "]");
+
+ sleep(timeout);
+
+ return null;
+ }
+ else
+ return super.createTcpClient(node, connIdx);
+ }
+
+ /**
+ * Simulate network disabling.
+ */
+ void disableNetwork() {
+ networkDisabled = true;
+ }
+
+ /**
+ * Close communication clients. It will lead that sendMessage method will be trying to create new ones.
+ */
+ private void closeTcpConnections() {
+ final ConcurrentMap<UUID, GridCommunicationClient[]> clients = U.field(this, "clients");
+
+ Set<UUID> ids = clients.keySet();
+
+ if (ids.size() > 0) {
+ log.info("Close TCP clients: " + ids);
+
+ for (UUID nodeId : ids) {
+ GridCommunicationClient[] clients0 = clients.remove(nodeId);
+
+ if (clients0 != null) {
+ for (GridCommunicationClient client : clients0) {
+ if (client != null)
+ client.forceClose();
+ }
+ }
+ }
+
+ log.info("TCP clients are closed.");
+ }
+ }
+ }
+
+ /**
+ * Discovery Spi that emulates connection troubles.
+ */
+ class CustomDiscoverySpi extends TcpDiscoverySpi {
+ /** Network is disabled. */
+ private volatile boolean networkDisabled = false;
+
+ /** */
+ private final CountDownLatch networkDisabledLatch = new CountDownLatch(1);
+
+ /** */
+ CustomDiscoverySpi() {
+ super();
+
+ setName("CustomDiscoverySpi");
+ }
+
+ /** {@inheritDoc} */
+ @Override protected <T> T readMessage(Socket sock, @Nullable InputStream in,
+ long timeout) throws IOException, IgniteCheckedException {
+ if (networkDisabled) {
+ sleep(timeout);
+
+ return null;
+ }
+ else
+ return super.readMessage(sock, in, timeout);
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void writeToSocket(Socket sock, TcpDiscoveryAbstractMessage msg,
+ long timeout) throws IOException, IgniteCheckedException {
+ if (networkDisabled) {
+ sleep(timeout);
+
+ networkDisabledLatch.countDown();
+
+ throw new IgniteCheckedException("CustomDiscoverySpi: network is disabled.");
+ }
+ else
+ super.writeToSocket(sock, msg, timeout);
+ }
+
+ /**
+ * Simulate network disabling.
+ */
+ void disableNetwork() {
+ networkDisabled = true;
+ }
+
+ /**
+ * Wait until the network is disabled.
+ */
+ boolean awaitNetworkDisabled(long timeout) throws InterruptedException {
+ return networkDisabledLatch.await(timeout, TimeUnit.MILLISECONDS);
+ }
+ }
+
+ /**
+ * Sleeps for given number of milliseconds.
+ *
+ * @param timeout Time to sleep (2000 ms by default).
+ * @throws IgniteInterruptedCheckedException If current thread interrupted.
+ */
+ static void sleep(long timeout) throws IgniteInterruptedCheckedException {
+ if (timeout > 0)
+ U.sleep(timeout);
+ else
+ U.sleep(2000);
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/82a4c024/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiCommunicationSelfTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiCommunicationSelfTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiCommunicationSelfTestSuite.java
index 7a4de1b..1b962bc 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiCommunicationSelfTestSuite.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiCommunicationSelfTestSuite.java
@@ -39,6 +39,7 @@ import org.apache.ignite.spi.communication.tcp.IgniteTcpCommunicationRecoveryAck
import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpiDropNodesTest;
import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpiFaultyClientTest;
import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpiHalfOpenedConnectionTest;
+import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpiSkipMessageSendTest;
import org.apache.ignite.spi.communication.tcp.TcpCommunicationStatisticsTest;
/**
@@ -78,6 +79,8 @@ public class IgniteSpiCommunicationSelfTestSuite extends TestSuite {
suite.addTest(new TestSuite(GridTcpCommunicationSpiConfigSelfTest.class));
+ suite.addTest(new TestSuite(TcpCommunicationSpiSkipMessageSendTest.class));
+
suite.addTest(new TestSuite(TcpCommunicationSpiFaultyClientTest.class));
suite.addTest(new TestSuite(TcpCommunicationSpiDropNodesTest.class));
suite.addTest(new TestSuite(TcpCommunicationSpiHalfOpenedConnectionTest.class));