You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ip...@apache.org on 2019/12/23 15:17:16 UTC
[ignite] branch master updated: IGNITE-12466 Monitor query pool
starvation - Fixes #7161.
This is an automated email from the ASF dual-hosted git repository.
ipavlukhin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new 7272bb1 IGNITE-12466 Monitor query pool starvation - Fixes #7161.
7272bb1 is described below
commit 7272bb18656b53d03b1ca7433eb9d1dbb08d18ca
Author: ktkalenko <kt...@gridgain.com>
AuthorDate: Mon Dec 23 18:17:00 2019 +0300
IGNITE-12466 Monitor query pool starvation - Fixes #7161.
Signed-off-by: ipavlukhin <vo...@gmail.com>
---
.../org/apache/ignite/internal/IgniteKernal.java | 9 ++
.../query/IgniteQueryDedicatedPoolTest.java | 124 ++++++++++++++++++++-
2 files changed, 128 insertions(+), 5 deletions(-)
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
index c44d2ca..0096190 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
@@ -1421,6 +1421,9 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable {
/** Last completed task count. */
private long lastCompletedCntSys;
+ /** Last completed task count. */
+ private long lastCompletedCntQry;
+
@Override public void run() {
if (execSvc instanceof ThreadPoolExecutor) {
ThreadPoolExecutor exec = (ThreadPoolExecutor)execSvc;
@@ -1434,6 +1437,12 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable {
lastCompletedCntSys = checkPoolStarvation(exec, lastCompletedCntSys, "system");
}
+ if (qryExecSvc instanceof ThreadPoolExecutor) {
+ ThreadPoolExecutor exec = (ThreadPoolExecutor)qryExecSvc;
+
+ lastCompletedCntQry = checkPoolStarvation(exec, lastCompletedCntQry, "query");
+ }
+
if (stripedExecSvc != null)
stripedExecSvc.detectStarvation();
}
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/IgniteQueryDedicatedPoolTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/IgniteQueryDedicatedPoolTest.java
index df0fc7c..15945b4 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/IgniteQueryDedicatedPoolTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/IgniteQueryDedicatedPoolTest.java
@@ -17,13 +17,14 @@
package org.apache.ignite.internal.processors.query;
+import javax.cache.Cache;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.SortedMap;
import java.util.TreeMap;
-import javax.cache.Cache;
+import java.util.concurrent.CyclicBarrier;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.cache.query.QueryCursor;
@@ -33,6 +34,7 @@ import org.apache.ignite.cache.query.SqlFieldsQuery;
import org.apache.ignite.cache.query.annotations.QuerySqlFunction;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.managers.communication.GridIoManager;
import org.apache.ignite.internal.managers.communication.GridIoPolicy;
import org.apache.ignite.internal.processors.cache.CacheEntryImpl;
@@ -41,27 +43,48 @@ import org.apache.ignite.lang.IgniteBiPredicate;
import org.apache.ignite.spi.IgniteSpiAdapter;
import org.apache.ignite.spi.indexing.IndexingQueryFilter;
import org.apache.ignite.spi.indexing.IndexingSpi;
+import org.apache.ignite.testframework.ListeningTestLogger;
+import org.apache.ignite.testframework.LogListener;
+import org.apache.ignite.testframework.junits.SystemPropertiesRule;
+import org.apache.ignite.testframework.junits.WithSystemProperty;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import org.jetbrains.annotations.Nullable;
+import org.junit.ClassRule;
import org.junit.Test;
+import org.junit.rules.TestRule;
+
+import static java.util.Objects.nonNull;
+import static org.apache.ignite.IgniteSystemProperties.IGNITE_STARVATION_CHECK_INTERVAL;
+import static org.apache.ignite.testframework.GridTestUtils.runAsync;
+import static org.apache.ignite.testframework.GridTestUtils.waitForCondition;
/**
* Ensures that SQL queries are executed in a dedicated thread pool.
*/
public class IgniteQueryDedicatedPoolTest extends GridCommonAbstractTest {
+ /** Class rule. */
+ @ClassRule public static final TestRule classRule = new SystemPropertiesRule();
+
/** Name of the cache for test */
private static final String CACHE_NAME = "query_pool_test";
+ /** Listener log messages. */
+ private static ListeningTestLogger testLog;
+
+ /** Query thread pool size. */
+ private Integer qryPoolSize;
+
/** {@inheritDoc} */
- @Override protected void beforeTest() throws Exception {
- super.beforeTest();
+ @Override protected void beforeTestsStarted() throws Exception {
+ super.beforeTestsStarted();
- startGrid("server");
+ testLog = new ListeningTestLogger(false, log);
}
/** {@inheritDoc} */
@Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
- IgniteConfiguration cfg = super.getConfiguration(gridName);
+ IgniteConfiguration cfg = super.getConfiguration(gridName)
+ .setGridLogger(testLog);
CacheConfiguration<Integer, Integer> ccfg = new CacheConfiguration<>(DEFAULT_CACHE_NAME);
@@ -77,6 +100,9 @@ public class IgniteQueryDedicatedPoolTest extends GridCommonAbstractTest {
cfg.setIndexingSpi(new TestIndexingSpi());
+ if (nonNull(qryPoolSize))
+ cfg.setQueryThreadPoolSize(qryPoolSize);
+
return cfg;
}
@@ -85,6 +111,8 @@ public class IgniteQueryDedicatedPoolTest extends GridCommonAbstractTest {
super.afterTest();
stopAllGrids();
+
+ testLog.clearListeners();
}
/**
@@ -93,6 +121,8 @@ public class IgniteQueryDedicatedPoolTest extends GridCommonAbstractTest {
*/
@Test
public void testSqlQueryUsesDedicatedThreadPool() throws Exception {
+ startGrid("server");
+
try (Ignite client = startGrid("client")) {
IgniteCache<Integer, Integer> cache = client.cache(CACHE_NAME);
@@ -122,6 +152,8 @@ public class IgniteQueryDedicatedPoolTest extends GridCommonAbstractTest {
*/
@Test
public void testScanQueryUsesDedicatedThreadPool() throws Exception {
+ startGrid("server");
+
try (Ignite client = startGrid("client")) {
IgniteCache<Integer, Integer> cache = client.cache(CACHE_NAME);
@@ -146,6 +178,8 @@ public class IgniteQueryDedicatedPoolTest extends GridCommonAbstractTest {
*/
@Test
public void testSpiQueryUsesDedicatedThreadPool() throws Exception {
+ startGrid("server");
+
try (Ignite client = startGrid("client")) {
IgniteCache<Byte, Byte> cache = client.cache(CACHE_NAME);
@@ -164,6 +198,86 @@ public class IgniteQueryDedicatedPoolTest extends GridCommonAbstractTest {
}
/**
+ * Test for messages about query pool starvation in the logs.
+ *
+ * @throws Exception If failed.
+ */
+ @Test
+ @WithSystemProperty(key = IGNITE_STARVATION_CHECK_INTERVAL, value = "10")
+ public void testContainsStarvationQryPoolInLog() throws Exception {
+ checkStarvationQryPoolInLog(
+ 10_000,
+ "Possible thread pool starvation detected (no task completed in last 10ms, is query thread pool size " +
+ "large enough?)",
+ true
+ );
+ }
+
+ /**
+ * Test to verify that there are no query pool starvation messages in log.
+ *
+ * @throws Exception If failed.
+ */
+ @Test
+ @WithSystemProperty(key = IGNITE_STARVATION_CHECK_INTERVAL, value = "0")
+ public void testNotContainsStarvationQryPoolInLog() throws Exception {
+ checkStarvationQryPoolInLog(
+ 1_000,
+ "Possible thread pool starvation detected (no task completed in",
+ false
+ );
+ }
+
+ /**
+ * Check messages about starvation query pool in log.
+ *
+ * @param checkTimeout Check timeout.
+ * @param findLogMsg Log message of interest.
+ * @param contains Expect whether or not messages are in log.
+ * @throws Exception If failed.
+ */
+ private void checkStarvationQryPoolInLog(long checkTimeout, String findLogMsg ,boolean contains) throws Exception {
+ assertNotNull(findLogMsg);
+
+ qryPoolSize = 1;
+
+ startGrid("server");
+
+ IgniteEx clientNode = startGrid("client");
+
+ IgniteCache<Integer, Integer> cache = clientNode.cache(CACHE_NAME);
+ cache.put(0, 0);
+
+ int qrySize = 2;
+
+ CyclicBarrier barrier = new CyclicBarrier(qrySize);
+
+ LogListener logLsnr = LogListener.matches(findLogMsg).build();
+
+ testLog.registerListener(logLsnr);
+
+ for (int i = 0; i < qrySize; i++) {
+ runAsync(() -> {
+ barrier.await();
+
+ cache.query(new ScanQuery<>((o, o2) -> {
+ doSleep(500);
+
+ return true;
+ })
+ ).getAll();
+
+ return null;
+ });
+ }
+
+ if (contains)
+ assertTrue(waitForCondition(logLsnr::check, checkTimeout));
+ else
+ assertFalse(waitForCondition(logLsnr::check, checkTimeout));
+ }
+
+ /**
* Custom SQL function to return current thread name from inside query executor
* @return Current IO policy
*/