You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ip...@apache.org on 2019/12/23 15:17:16 UTC

[ignite] branch master updated: IGNITE-12466 Monitor query pool starvation - Fixes #7161.

This is an automated email from the ASF dual-hosted git repository.

ipavlukhin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/master by this push:
     new 7272bb1  IGNITE-12466 Monitor query pool starvation - Fixes #7161.
7272bb1 is described below

commit 7272bb18656b53d03b1ca7433eb9d1dbb08d18ca
Author: ktkalenko <kt...@gridgain.com>
AuthorDate: Mon Dec 23 18:17:00 2019 +0300

    IGNITE-12466 Monitor query pool starvation - Fixes #7161.
    
    Signed-off-by: ipavlukhin <vo...@gmail.com>
---
 .../org/apache/ignite/internal/IgniteKernal.java   |   9 ++
 .../query/IgniteQueryDedicatedPoolTest.java        | 124 ++++++++++++++++++++-
 2 files changed, 128 insertions(+), 5 deletions(-)

diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
index c44d2ca..0096190 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
@@ -1421,6 +1421,9 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable {
                 /** Last completed task count. */
                 private long lastCompletedCntSys;
 
+                /** Last completed task count. */
+                private long lastCompletedCntQry;
+
                 @Override public void run() {
                     if (execSvc instanceof ThreadPoolExecutor) {
                         ThreadPoolExecutor exec = (ThreadPoolExecutor)execSvc;
@@ -1434,6 +1437,12 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable {
                         lastCompletedCntSys = checkPoolStarvation(exec, lastCompletedCntSys, "system");
                     }
 
+                    if (qryExecSvc instanceof ThreadPoolExecutor) {
+                        ThreadPoolExecutor exec = (ThreadPoolExecutor)qryExecSvc;
+
+                        lastCompletedCntQry = checkPoolStarvation(exec, lastCompletedCntQry, "query");
+                    }
+
                     if (stripedExecSvc != null)
                         stripedExecSvc.detectStarvation();
                 }
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/IgniteQueryDedicatedPoolTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/IgniteQueryDedicatedPoolTest.java
index df0fc7c..15945b4 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/IgniteQueryDedicatedPoolTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/IgniteQueryDedicatedPoolTest.java
@@ -17,13 +17,14 @@
 
 package org.apache.ignite.internal.processors.query;
 
+import javax.cache.Cache;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
 import java.util.SortedMap;
 import java.util.TreeMap;
-import javax.cache.Cache;
+import java.util.concurrent.CyclicBarrier;
 import org.apache.ignite.Ignite;
 import org.apache.ignite.IgniteCache;
 import org.apache.ignite.cache.query.QueryCursor;
@@ -33,6 +34,7 @@ import org.apache.ignite.cache.query.SqlFieldsQuery;
 import org.apache.ignite.cache.query.annotations.QuerySqlFunction;
 import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteEx;
 import org.apache.ignite.internal.managers.communication.GridIoManager;
 import org.apache.ignite.internal.managers.communication.GridIoPolicy;
 import org.apache.ignite.internal.processors.cache.CacheEntryImpl;
@@ -41,27 +43,48 @@ import org.apache.ignite.lang.IgniteBiPredicate;
 import org.apache.ignite.spi.IgniteSpiAdapter;
 import org.apache.ignite.spi.indexing.IndexingQueryFilter;
 import org.apache.ignite.spi.indexing.IndexingSpi;
+import org.apache.ignite.testframework.ListeningTestLogger;
+import org.apache.ignite.testframework.LogListener;
+import org.apache.ignite.testframework.junits.SystemPropertiesRule;
+import org.apache.ignite.testframework.junits.WithSystemProperty;
 import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
 import org.jetbrains.annotations.Nullable;
+import org.junit.ClassRule;
 import org.junit.Test;
+import org.junit.rules.TestRule;
+
+import static java.util.Objects.nonNull;
+import static org.apache.ignite.IgniteSystemProperties.IGNITE_STARVATION_CHECK_INTERVAL;
+import static org.apache.ignite.testframework.GridTestUtils.runAsync;
+import static org.apache.ignite.testframework.GridTestUtils.waitForCondition;
 
 /**
  * Ensures that SQL queries are executed in a dedicated thread pool.
  */
 public class IgniteQueryDedicatedPoolTest extends GridCommonAbstractTest {
+    /** Class rule. */
+    @ClassRule public static final TestRule classRule = new SystemPropertiesRule();
+
     /** Name of the cache for test */
     private static final String CACHE_NAME = "query_pool_test";
 
+    /** Listener log messages. */
+    private static ListeningTestLogger testLog;
+
+    /** Query thread pool size. */
+    private Integer qryPoolSize;
+
     /** {@inheritDoc} */
-    @Override protected void beforeTest() throws Exception {
-        super.beforeTest();
+    @Override protected void beforeTestsStarted() throws Exception {
+        super.beforeTestsStarted();
 
-        startGrid("server");
+        testLog = new ListeningTestLogger(false, log);
     }
 
     /** {@inheritDoc} */
     @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
-        IgniteConfiguration cfg = super.getConfiguration(gridName);
+        IgniteConfiguration cfg = super.getConfiguration(gridName)
+            .setGridLogger(testLog);
 
         CacheConfiguration<Integer, Integer> ccfg = new CacheConfiguration<>(DEFAULT_CACHE_NAME);
 
@@ -77,6 +100,9 @@ public class IgniteQueryDedicatedPoolTest extends GridCommonAbstractTest {
 
         cfg.setIndexingSpi(new TestIndexingSpi());
 
+        if (nonNull(qryPoolSize))
+            cfg.setQueryThreadPoolSize(qryPoolSize);
+
         return cfg;
     }
 
@@ -85,6 +111,8 @@ public class IgniteQueryDedicatedPoolTest extends GridCommonAbstractTest {
         super.afterTest();
 
         stopAllGrids();
+
+        testLog.clearListeners();
     }
 
     /**
@@ -93,6 +121,8 @@ public class IgniteQueryDedicatedPoolTest extends GridCommonAbstractTest {
      */
     @Test
     public void testSqlQueryUsesDedicatedThreadPool() throws Exception {
+        startGrid("server");
+
         try (Ignite client = startGrid("client")) {
             IgniteCache<Integer, Integer> cache = client.cache(CACHE_NAME);
 
@@ -122,6 +152,8 @@ public class IgniteQueryDedicatedPoolTest extends GridCommonAbstractTest {
      */
     @Test
     public void testScanQueryUsesDedicatedThreadPool() throws Exception {
+        startGrid("server");
+
         try (Ignite client = startGrid("client")) {
             IgniteCache<Integer, Integer> cache = client.cache(CACHE_NAME);
 
@@ -146,6 +178,8 @@ public class IgniteQueryDedicatedPoolTest extends GridCommonAbstractTest {
      */
     @Test
     public void testSpiQueryUsesDedicatedThreadPool() throws Exception {
+        startGrid("server");
+
         try (Ignite client = startGrid("client")) {
             IgniteCache<Byte, Byte> cache = client.cache(CACHE_NAME);
 
@@ -164,6 +198,86 @@ public class IgniteQueryDedicatedPoolTest extends GridCommonAbstractTest {
     }
 
     /**
+     * Test for messages about query pool starvation in the logs.
+     *
+     * @throws Exception If failed.
+     */
+    @Test
+    @WithSystemProperty(key = IGNITE_STARVATION_CHECK_INTERVAL, value = "10")
+    public void testContainsStarvationQryPoolInLog() throws Exception {
+        checkStarvationQryPoolInLog(
+            10_000,
+            "Possible thread pool starvation detected (no task completed in last 10ms, is query thread pool size " +
+                "large enough?)",
+            true
+        );
+    }
+
+    /**
+     * Test to verify that there are no query pool starvation messages in log.
+     *
+     * @throws Exception If failed.
+     */
+    @Test
+    @WithSystemProperty(key = IGNITE_STARVATION_CHECK_INTERVAL, value = "0")
+    public void testNotContainsStarvationQryPoolInLog() throws Exception {
+        checkStarvationQryPoolInLog(
+            1_000,
+            "Possible thread pool starvation detected (no task completed in",
+            false
+        );
+    }
+
+    /**
+     * Check messages about starvation query pool in log.
+     *
+     * @param checkTimeout Check timeout.
+     * @param findLogMsg Log message of interest.
+     * @param contains Expect whether or not messages are in log.
+     * @throws Exception If failed.
+     */
+    private void checkStarvationQryPoolInLog(long checkTimeout, String findLogMsg ,boolean contains) throws Exception {
+        assertNotNull(findLogMsg);
+
+        qryPoolSize = 1;
+
+        startGrid("server");
+
+        IgniteEx clientNode = startGrid("client");
+
+        IgniteCache<Integer, Integer> cache = clientNode.cache(CACHE_NAME);
+        cache.put(0, 0);
+
+        int qrySize = 2;
+
+        CyclicBarrier barrier = new CyclicBarrier(qrySize);
+
+        LogListener logLsnr = LogListener.matches(findLogMsg).build();
+
+        testLog.registerListener(logLsnr);
+
+        for (int i = 0; i < qrySize; i++) {
+            runAsync(() -> {
+                barrier.await();
+
+                cache.query(new ScanQuery<>((o, o2) -> {
+                        doSleep(500);
+
+                        return true;
+                    })
+                ).getAll();
+
+                return null;
+            });
+        }
+
+        if (contains)
+            assertTrue(waitForCondition(logLsnr::check, checkTimeout));
+        else
+            assertFalse(waitForCondition(logLsnr::check, checkTimeout));
+    }
+
+    /**
      * Custom SQL function to return current thread name from inside query executor
      * @return Current IO policy
      */