You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ag...@apache.org on 2015/06/19 04:06:51 UTC
incubator-ignite git commit: IGNITE-1034 - Drop slow clients.
Repository: incubator-ignite
Updated Branches:
refs/heads/ignite-1034 ad0a026f4 -> 44bbeceae
IGNITE-1034 - Drop slow clients.
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/44bbecea
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/44bbecea
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/44bbecea
Branch: refs/heads/ignite-1034
Commit: 44bbeceae657058eee322b13e339966c625802db
Parents: ad0a026
Author: Alexey Goncharuk <ag...@gridgain.com>
Authored: Thu Jun 18 19:06:47 2015 -0700
Committer: Alexey Goncharuk <ag...@gridgain.com>
Committed: Thu Jun 18 19:06:47 2015 -0700
----------------------------------------------------------------------
.../internal/managers/GridManagerAdapter.java | 4 +
.../discovery/GridDiscoveryManager.java | 15 +++
.../ignite/internal/util/nio/GridNioServer.java | 51 ++++++++
.../org/apache/ignite/spi/IgniteSpiAdapter.java | 5 +
.../org/apache/ignite/spi/IgniteSpiContext.java | 5 +
.../communication/tcp/TcpCommunicationSpi.java | 73 +++++++++++
.../ignite/spi/discovery/tcp/ServerImpl.java | 5 +
.../IgniteSlowClientDetectionSelfTest.java | 124 +++++++++++++++++++
.../testframework/GridSpiTestContext.java | 5 +
9 files changed, 287 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/44bbecea/modules/core/src/main/java/org/apache/ignite/internal/managers/GridManagerAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/GridManagerAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/GridManagerAdapter.java
index bea4256..885d52c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/GridManagerAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/GridManagerAdapter.java
@@ -484,6 +484,10 @@ public abstract class GridManagerAdapter<T extends IgniteSpi> implements GridMan
return ctx.discovery().tryFailNode(nodeId);
}
+ @Override public void failNode(UUID nodeId) {
+ ctx.discovery().failNode(nodeId);
+ }
+
@Override public void addTimeoutObject(IgniteSpiTimeoutObject obj) {
ctx.timeout().addTimeoutObject(new GridSpiTimeoutObject(obj));
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/44bbecea/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
index 464110c..717cdf3 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
@@ -1502,6 +1502,21 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
}
/**
+ * @param nodeId Node ID to fail.
+ */
+ public void failNode(UUID nodeId) {
+ if (!busyLock.enterBusy())
+ return;
+
+ try {
+ getSpi().failNode(nodeId);
+ }
+ finally {
+ busyLock.leaveBusy();
+ }
+ }
+
+ /**
* Updates topology version if current version is smaller than updated.
*
* @param updated Updated topology version.
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/44bbecea/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
index 88fad71..b9d246a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
@@ -116,6 +116,10 @@ public class GridNioServer<T> {
@SuppressWarnings("UnusedDeclaration")
private boolean skipWrite;
+ /** For test purposes only. */
+ @SuppressWarnings("UnusedDeclaration")
+ private boolean skipRead;
+
/** Local address. */
private final InetSocketAddress locAddr;
@@ -145,6 +149,9 @@ public class GridNioServer<T> {
@GridToStringExclude
private IgnitePredicate<Message> skipRecoveryPred;
+ /** Optional listener to monitor outbound message queue size. */
+ private IgniteBiInClosure<GridNioSession, Integer> msgQueueLsnr;
+
/** Static initializer ensures single-threaded execution of workaround. */
static {
// This is a workaround for JDK bug (NPE in Selector.open()).
@@ -174,6 +181,7 @@ public class GridNioServer<T> {
* @param metricsLsnr Metrics listener.
* @param formatter Message formatter.
* @param skipRecoveryPred Skip recovery predicate.
+ * @param msgQueueLsnr Message queue size listener.
* @param filters Filters for this server.
* @throws IgniteCheckedException If failed.
*/
@@ -195,6 +203,7 @@ public class GridNioServer<T> {
GridNioMetricsListener metricsLsnr,
MessageFormatter formatter,
IgnitePredicate<Message> skipRecoveryPred,
+ IgniteBiInClosure<GridNioSession, Integer> msgQueueLsnr,
GridNioFilter... filters
) throws IgniteCheckedException {
A.notNull(addr, "addr");
@@ -215,6 +224,7 @@ public class GridNioServer<T> {
this.sockRcvBuf = sockRcvBuf;
this.sockSndBuf = sockSndBuf;
this.sndQueueLimit = sndQueueLimit;
+ this.msgQueueLsnr = msgQueueLsnr;
filterChain = new GridNioFilterChain<>(log, lsnr, new HeadFilter(), filters);
@@ -385,6 +395,11 @@ public class GridNioServer<T> {
else if (msgCnt == 1)
// Change from 0 to 1 means that worker thread should be waken up.
clientWorkers.get(ses.selectorIndex()).offer(fut);
+
+ IgniteBiInClosure<GridNioSession, Integer> lsnr0 = msgQueueLsnr;
+
+ if (lsnr0 != null)
+ lsnr0.apply(ses, msgCnt);
}
/**
@@ -634,6 +649,17 @@ public class GridNioServer<T> {
* @throws IOException If key read failed.
*/
@Override protected void processRead(SelectionKey key) throws IOException {
+ if (skipRead) {
+ try {
+ U.sleep(50);
+ }
+ catch (IgniteInterruptedCheckedException ignored) {
+ U.warn(log, "Sleep has been interrupted.");
+ }
+
+ return;
+ }
+
ReadableByteChannel sockCh = (ReadableByteChannel)key.channel();
final GridSelectorNioSessionImpl ses = (GridSelectorNioSessionImpl)key.attachment();
@@ -775,6 +801,17 @@ public class GridNioServer<T> {
* @throws IOException If key read failed.
*/
@Override protected void processRead(SelectionKey key) throws IOException {
+ if (skipRead) {
+ try {
+ U.sleep(50);
+ }
+ catch (IgniteInterruptedCheckedException ignored) {
+ U.warn(log, "Sleep has been interrupted.");
+ }
+
+ return;
+ }
+
ReadableByteChannel sockCh = (ReadableByteChannel)key.channel();
final GridSelectorNioSessionImpl ses = (GridSelectorNioSessionImpl)key.attachment();
@@ -2108,6 +2145,9 @@ public class GridNioServer<T> {
/** Skip recovery predicate. */
private IgnitePredicate<Message> skipRecoveryPred;
+ /** Message queue size listener. */
+ private IgniteBiInClosure<GridNioSession, Integer> msgQueueLsnr;
+
/**
* Finishes building the instance.
*
@@ -2133,6 +2173,7 @@ public class GridNioServer<T> {
metricsLsnr,
formatter,
skipRecoveryPred,
+ msgQueueLsnr,
filters != null ? Arrays.copyOf(filters, filters.length) : EMPTY_FILTERS
);
@@ -2345,5 +2386,15 @@ public class GridNioServer<T> {
return this;
}
+
+ /**
+ * @param msgQueueLsnr Message queue size listener.
+ * @return Instance of this builder for chaining.
+ */
+ public Builder<T> messageQueueSizeListener(IgniteBiInClosure<GridNioSession, Integer> msgQueueLsnr) {
+ this.msgQueueLsnr = msgQueueLsnr;
+
+ return this;
+ }
}
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/44bbecea/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java b/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java
index 476f8a8..18191a1 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java
@@ -759,6 +759,11 @@ public abstract class IgniteSpiAdapter implements IgniteSpi, IgniteSpiManagement
}
/** {@inheritDoc} */
+ @Override public void failNode(UUID nodeId) {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
@Override public void addTimeoutObject(IgniteSpiTimeoutObject obj) {
assert ignite instanceof IgniteKernal : ignite;
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/44bbecea/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiContext.java b/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiContext.java
index f83326c..a655a73 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiContext.java
@@ -312,6 +312,11 @@ public interface IgniteSpiContext {
public boolean tryFailNode(UUID nodeId);
/**
+ * @param nodeId Node ID.
+ */
+ public void failNode(UUID nodeId);
+
+ /**
* @param c Timeout object.
*/
public void addTimeoutObject(IgniteSpiTimeoutObject c);
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/44bbecea/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
index 87d5b65..538e9a8 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
@@ -29,6 +29,7 @@ import org.apache.ignite.internal.util.ipc.*;
import org.apache.ignite.internal.util.ipc.shmem.*;
import org.apache.ignite.internal.util.lang.*;
import org.apache.ignite.internal.util.nio.*;
+import org.apache.ignite.internal.util.tostring.*;
import org.apache.ignite.internal.util.typedef.*;
import org.apache.ignite.internal.util.typedef.internal.*;
import org.apache.ignite.internal.util.worker.*;
@@ -610,6 +611,11 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
@LoggerResource
private IgniteLogger log;
+ /** Ignite. */
+ @IgniteInstanceResource
+ @GridToStringExclude
+ private Ignite ignite;
+
/** Local IP address. */
private String locAddr;
@@ -653,6 +659,9 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
/** Message queue limit. */
private int msgQueueLimit = DFLT_MSG_QUEUE_LIMIT;
+ /** Slow client queue limit. */
+ private int slowClientQueueLimit;
+
/** Min buffered message count. */
private int minBufferedMsgCnt = Integer.getInteger(IGNITE_MIN_BUFFERED_COMMUNICATION_MSG_CNT, 512);
@@ -1145,6 +1154,30 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
}
/**
+ * Gets slow client queue limit.
+ * <p/>
+ * When set to a positive number, communication SPI will monitor clients outbound queue sizes and will drop
+ * those clients whose queue exceeded this limit.
+ *
+ * @return Slow client queue limit.
+ */
+ public int getSlowClientQueueLimit() {
+ return slowClientQueueLimit;
+ }
+
+ /**
+ * Sets slow client queue limit.
+ * <p/>
+ * When set to a positive number, communication SPI will monitor clients outbound queue sizes and will drop
+ * those clients whose queue exceeded this limit.
+ *
+ * @param slowClientQueueLimit Slow cilent queue limit.
+ */
+ public void setSlowClientQueueLimit(int slowClientQueueLimit) {
+ this.slowClientQueueLimit = slowClientQueueLimit;
+ }
+
+ /**
* Sets the minimum number of messages for this SPI, that are buffered
* prior to sending.
* <p>
@@ -1315,6 +1348,13 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
U.quietAndWarn(log, "'TCP_NO_DELAY' for communication is off, which should be used with caution " +
"since may produce significant delays with some scenarios.");
+ if (slowClientQueueLimit > 0 && msgQueueLimit > 0) {
+ if (slowClientQueueLimit >= msgQueueLimit) {
+ U.quietAndWarn(log, "Slow client queue limit is set to a value greater than message queue limit. " +
+ "Slow client queue limit will have no effect.");
+ }
+ }
+
registerMBean(gridName, this, TcpCommunicationSpiMBean.class);
if (shmemSrv != null) {
@@ -1425,6 +1465,15 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
}
};
+ IgniteBiInClosure<GridNioSession, Integer> queueSizeMonitor =
+ !ignite.configuration().isClientMode() && slowClientQueueLimit > 0 ?
+ new CI2<GridNioSession, Integer>() {
+ @Override public void apply(GridNioSession ses, Integer qSize) {
+ checkClientQueueSize(ses, qSize);
+ }
+ } :
+ null;
+
GridNioServer<Message> srvr =
GridNioServer.<Message>builder()
.address(locHost)
@@ -1446,6 +1495,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
new GridConnectionBytesVerifyFilter(log))
.messageFormatter(msgFormatter)
.skipRecoveryPredicate(skipRecoveryPred)
+ .messageQueueSizeListener(queueSizeMonitor)
.build();
boundTcpPort = port;
@@ -1860,6 +1910,29 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
}
/**
+ * Checks client message queue size and initiates client drop if message queue size exceeds the configured limit.
+ *
+ * @param ses Node communication session.
+ * @param msgQueueSize Message queue size.
+ */
+ private void checkClientQueueSize(GridNioSession ses, int msgQueueSize) {
+ if (slowClientQueueLimit > 0 && msgQueueSize > slowClientQueueLimit) {
+ UUID id = ses.meta(NODE_ID_META);
+
+ if (id != null) {
+ ClusterNode node = getSpiContext().node(id);
+
+ if (node != null && node.isClient()) {
+ LT.warn(log, null, "Client node outbound queue size exceed configured slow client queue limit, " +
+ "will fail the node (consider changing \'slowClientQueueLimit\'): " + node);
+
+ getSpiContext().failNode(id);
+ }
+ }
+ }
+ }
+
+ /**
* Establish TCP connection to remote node and returns client.
*
* @param node Remote node.
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/44bbecea/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
index b743a1a..8eb82ac 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
@@ -3402,6 +3402,11 @@ class ServerImpl extends TcpDiscoveryImpl {
failedNodes.remove(node);
leavingNodes.remove(node);
+
+ ClientMessageWorker worker = clientMsgWorkers.remove(node.id());
+
+ if (worker != null)
+ worker.interrupt();
}
notifyDiscovery(EVT_NODE_FAILED, topVer, node);
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/44bbecea/modules/core/src/test/java/org/apache/ignite/internal/IgniteSlowClientDetectionSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/IgniteSlowClientDetectionSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/IgniteSlowClientDetectionSelfTest.java
new file mode 100644
index 0000000..09b4215
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/IgniteSlowClientDetectionSelfTest.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal;
+
+import org.apache.ignite.*;
+import org.apache.ignite.cache.query.*;
+import org.apache.ignite.configuration.*;
+import org.apache.ignite.internal.managers.communication.*;
+import org.apache.ignite.internal.util.lang.*;
+import org.apache.ignite.internal.util.nio.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.apache.ignite.spi.communication.tcp.*;
+import org.apache.ignite.testframework.*;
+import org.apache.ignite.testframework.junits.common.*;
+
+import javax.cache.event.*;
+
+/**
+ *
+ */
+public class IgniteSlowClientDetectionSelfTest extends GridCommonAbstractTest {
+
+ public static final String PARTITIONED = "partitioned";
+
+ /**
+ * @return Node count.
+ */
+ private int nodeCount() {
+ return 5;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+ if (getTestGridName(nodeCount() - 1).equals(gridName) || getTestGridName(nodeCount() - 2).equals(gridName))
+ cfg.setClientMode(true);
+
+ TcpCommunicationSpi commSpi = new TcpCommunicationSpi();
+
+ commSpi.setSlowClientQueueLimit(50);
+ commSpi.setSharedMemoryPort(-1);
+ commSpi.setIdleConnectionTimeout(300_000);
+
+ cfg.setCommunicationSpi(commSpi);
+
+ return cfg;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void beforeTestsStarted() throws Exception {
+ super.beforeTestsStarted();
+
+ startGridsMultiThreaded(nodeCount());
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTestsStopped() throws Exception {
+ super.afterTestsStopped();
+
+ stopAllGrids();
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testSlowClient() throws Exception {
+ final IgniteEx slowClient = grid(nodeCount() - 1);
+
+ assertTrue(slowClient.cluster().localNode().isClient());
+
+ IgniteCache<Object, Object> cache = slowClient.getOrCreateCache(PARTITIONED);
+
+ IgniteEx client0 = grid(nodeCount() - 2);
+
+ assertTrue(client0.cluster().localNode().isClient());
+
+ IgniteCache<Object, Object> cache0 = client0.getOrCreateCache(PARTITIONED);
+
+ cache.query(new ContinuousQuery<>().setLocalListener(new Listener()));
+
+ for (int i = 0; i < 100; i++)
+ cache0.put(0, i);
+
+ GridIoManager ioMgr = slowClient.context().io();
+
+ TcpCommunicationSpi commSpi = (TcpCommunicationSpi)((Object[])U.field(ioMgr, "spis"))[0];
+
+ GridNioServer nioSrvr = U.field(commSpi, "nioSrvr");
+
+ GridTestUtils.setFieldValue(nioSrvr, "skipRead", true);
+
+ // Initiate messages for client.
+ for (int i = 0; i < 100; i++)
+ cache0.put(0, new byte[10 * 1024]);
+
+ GridTestUtils.waitForCondition(new GridAbsPredicate() {
+ @Override public boolean apply() {
+ return Ignition.state(slowClient.name()) == IgniteState.STOPPED_ON_SEGMENTATION;
+ }
+ }, getTestTimeout());
+ }
+
+ private static class Listener implements CacheEntryUpdatedListener<Object, Object> {
+ @Override public void onUpdated(Iterable iterable) throws CacheEntryListenerException {
+ System.out.println(">>>> Received update: " + iterable);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/44bbecea/modules/core/src/test/java/org/apache/ignite/testframework/GridSpiTestContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/GridSpiTestContext.java b/modules/core/src/test/java/org/apache/ignite/testframework/GridSpiTestContext.java
index 21f9424..c20ff2e 100644
--- a/modules/core/src/test/java/org/apache/ignite/testframework/GridSpiTestContext.java
+++ b/modules/core/src/test/java/org/apache/ignite/testframework/GridSpiTestContext.java
@@ -502,6 +502,11 @@ public class GridSpiTestContext implements IgniteSpiContext {
}
/** {@inheritDoc} */
+ @Override public void failNode(UUID nodeId) {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
@Override public void addTimeoutObject(IgniteSpiTimeoutObject obj) {
// No-op.
}