You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by am...@apache.org on 2020/03/10 10:08:23 UTC

[ignite] branch master updated: IGNITE-12750: Fix SQL index build thread pool creation. This closes #7502.

This is an automated email from the ASF dual-hosted git repository.

amashenkov pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/master by this push:
     new ba95fd1  IGNITE-12750: Fix SQL index build thread pool creation. This closes #7502.
ba95fd1 is described below

commit ba95fd13829a821ed8c7cb7c13820e293bb3aa5d
Author: Kirill Tkalenko <tk...@yandex.ru>
AuthorDate: Tue Mar 10 13:08:12 2020 +0300

    IGNITE-12750: Fix SQL index build thread pool creation. This closes #7502.
    
    Signed-off-by: Andrey V. Mashenkov <an...@gmail.com>
---
 .../org/apache/ignite/internal/IgnitionEx.java     |   6 +-
 .../communication/TransmissionHandler.java         |   1 -
 .../ignite/thread/IgniteThreadPoolExecutor.java    |   9 ++
 .../query/h2/GridIndexRebuildSelfTest.java         | 152 ++++++++++++++++++---
 .../GridIndexRebuildWithMvccEnabledSelfTest.java   |   5 +-
 5 files changed, 144 insertions(+), 29 deletions(-)

diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java b/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java
index f9f031f..cdc2353 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java
@@ -1961,13 +1961,15 @@ public class IgnitionEx {
                 buildIdxExecSvc = new IgniteThreadPoolExecutor(
                     "build-idx-runner",
                     cfg.getIgniteInstanceName(),
-                    0,
                     buildIdxThreadPoolSize,
-                    0,
+                    buildIdxThreadPoolSize,
+                    DFLT_THREAD_KEEP_ALIVE_TIME,
                     new LinkedBlockingQueue<>(),
                     GridIoPolicy.UNDEFINED,
                     oomeHnd
                 );
+
+                buildIdxExecSvc.allowCoreThreadTimeOut(true);
             }
 
             validateThreadPoolSize(cfg.getQueryThreadPoolSize(), "query");
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/TransmissionHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/TransmissionHandler.java
index c809dc7..da0dd69 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/TransmissionHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/TransmissionHandler.java
@@ -78,7 +78,6 @@ public interface TransmissionHandler {
      */
     public Consumer<File> fileHandler(UUID nodeId, TransmissionMeta initMeta);
 
-
     /**
      * @param nodeId Remote node id on which the error occurred.
      * @param err The err of fail handling process.
diff --git a/modules/core/src/main/java/org/apache/ignite/thread/IgniteThreadPoolExecutor.java b/modules/core/src/main/java/org/apache/ignite/thread/IgniteThreadPoolExecutor.java
index fed77ad..20d2e49 100644
--- a/modules/core/src/main/java/org/apache/ignite/thread/IgniteThreadPoolExecutor.java
+++ b/modules/core/src/main/java/org/apache/ignite/thread/IgniteThreadPoolExecutor.java
@@ -32,6 +32,9 @@ public class IgniteThreadPoolExecutor extends ThreadPoolExecutor {
     /**
      * Creates a new service with the given initial parameters.
      *
+     * NOTE: There is a known bug. If 'corePoolSize' equals {@code 0},
+     * then the pool will degrade to a single-threaded pool.
+     *
      * @param threadNamePrefix Will be added at the beginning of all created threads.
      * @param igniteInstanceName Must be the name of the grid.
      * @param corePoolSize The number of threads to keep in the pool, even if they are idle.
@@ -61,6 +64,9 @@ public class IgniteThreadPoolExecutor extends ThreadPoolExecutor {
     /**
      * Creates a new service with the given initial parameters.
      *
+     * NOTE: There is a known bug. If 'corePoolSize' equals {@code 0},
+     * then the pool will degrade to a single-threaded pool.
+     *
      * @param threadNamePrefix Will be added at the beginning of all created threads.
      * @param igniteInstanceName Must be the name of the grid.
      * @param corePoolSize The number of threads to keep in the pool, even if they are idle.
@@ -94,6 +100,9 @@ public class IgniteThreadPoolExecutor extends ThreadPoolExecutor {
     /**
      * Creates a new service with the given initial parameters.
      *
+     * NOTE: There is a known bug. If 'corePoolSize' equals {@code 0},
+     * then the pool will degrade to a single-threaded pool.
+     * *
      * @param corePoolSize The number of threads to keep in the pool, even if they are idle.
      * @param maxPoolSize The maximum number of threads to allow in the pool.
      * @param keepAliveTime When the number of threads is greater than the core, this is the maximum time
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/GridIndexRebuildSelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/GridIndexRebuildSelfTest.java
index 91db4cf..fc98c03 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/GridIndexRebuildSelfTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/GridIndexRebuildSelfTest.java
@@ -18,7 +18,10 @@
 package org.apache.ignite.internal.processors.query.h2;
 
 import java.io.File;
+import java.lang.management.ManagementFactory;
+import java.lang.management.ThreadMXBean;
 import java.util.concurrent.CountDownLatch;
+import java.util.stream.LongStream;
 import org.apache.ignite.Ignite;
 import org.apache.ignite.IgniteCache;
 import org.apache.ignite.IgniteCheckedException;
@@ -32,6 +35,7 @@ import org.apache.ignite.internal.processors.cache.IgniteInternalCache;
 import org.apache.ignite.internal.processors.cache.index.DynamicIndexAbstractSelfTest;
 import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow;
 import org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager;
+import org.apache.ignite.internal.processors.query.GridQueryIndexing;
 import org.apache.ignite.internal.processors.query.GridQueryProcessor;
 import org.apache.ignite.internal.processors.query.schema.SchemaIndexCacheVisitorClosure;
 import org.apache.ignite.internal.util.future.GridFutureAdapter;
@@ -39,6 +43,9 @@ import org.apache.ignite.internal.util.lang.GridCursor;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.junit.Test;
 
+import static java.util.Objects.nonNull;
+import static java.util.Objects.requireNonNull;
+
 /**
  * Index rebuild after node restart test.
  */
@@ -55,6 +62,12 @@ public class GridIndexRebuildSelfTest extends DynamicIndexAbstractSelfTest {
     /** Latch to signal that rebuild may start. */
     private final CountDownLatch rebuildLatch = new CountDownLatch(1);
 
+    /** Thread pool size for build index. */
+    private Integer buildIdxThreadPoolSize;
+
+    /** GridQueryIndexing class. */
+    private Class<? extends GridQueryIndexing> qryIndexingCls = BlockingIndexing.class;
+
     /** {@inheritDoc} */
     @Override protected IgniteConfiguration commonConfiguration(int idx) throws Exception {
         IgniteConfiguration cfg =  super.commonConfiguration(idx);
@@ -63,6 +76,19 @@ public class GridIndexRebuildSelfTest extends DynamicIndexAbstractSelfTest {
             .setMaxSize(300*1024L*1024L)
             .setPersistenceEnabled(true);
 
+        if (nonNull(buildIdxThreadPoolSize))
+            cfg.setBuildIndexThreadPoolSize(buildIdxThreadPoolSize);
+
+        return cfg;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration serverConfiguration(int idx) throws Exception {
+        IgniteConfiguration cfg = super.serverConfiguration(idx);
+
+        if (nonNull(qryIndexingCls))
+            GridQueryProcessor.idxCls = qryIndexingCls;
+
         return cfg;
     }
 
@@ -83,6 +109,7 @@ public class GridIndexRebuildSelfTest extends DynamicIndexAbstractSelfTest {
         stopAllGrids();
 
         cleanPersistenceDir();
+        GridQueryProcessor.idxCls = null;
     }
 
     /** {@inheritDoc} */
@@ -119,32 +146,121 @@ public class GridIndexRebuildSelfTest extends DynamicIndexAbstractSelfTest {
     public void testIndexRebuild() throws Exception {
         IgniteEx srv = startServer();
 
-        execute(srv, "CREATE TABLE T(k int primary key, v int) WITH \"cache_name=T,wrap_value=false," +
-            "atomicity=transactional\"");
+        IgniteInternalCache cc = createAndFillTableWithIndex(srv);
 
-        execute(srv, "CREATE INDEX IDX ON T(v)");
+        checkDataState(srv, false);
 
-        IgniteInternalCache cc = srv.cachex(CACHE_NAME);
+        File idxPath = indexFile(cc);
 
-        assertNotNull(cc);
+        stopAllGrids();
 
-        putData(srv, false);
+        assertTrue(U.delete(idxPath));
 
-        checkDataState(srv, false);
+        srv = startServer();
+
+        putData(srv, true);
+
+        checkDataState(srv, true);
+    }
+
+    /**
+     * Test checks that index rebuild will be with default pool size.
+     *
+     * @throws Exception if failed.
+     */
+    @Test
+    public void testDefaultCntThreadForRebuildIdx() throws Exception {
+        checkCntThreadForRebuildIdx(IgniteConfiguration.DFLT_BUILD_IDX_THREAD_POOL_SIZE);
+    }
+
+    /**
+     * Test checks that index rebuild uses the number of threads that specified
+     * in configuration.
+     *
+     * @throws Exception if failed.
+     */
+    @Test
+    public void testCustomCntThreadForRebuildIdx() throws Exception {
+        checkCntThreadForRebuildIdx(6);
+    }
+
+    /**
+     * Check that index rebuild uses the number of threads
+     * that specified in configuration.
+     *
+     * @param buildIdxThreadCnt Thread pool size for build index,
+     *      after restart node.
+     * @throws Exception if failed.
+     */
+    private void checkCntThreadForRebuildIdx(int buildIdxThreadCnt) throws Exception {
+        qryIndexingCls = null;
+
+        IgniteEx srv = startServer();
 
-        File cacheWorkDir = ((FilePageStoreManager)cc.context().shared().pageStore()).cacheWorkDir(cc.configuration());
+        IgniteInternalCache internalCache = createAndFillTableWithIndex(srv);
 
-        File idxPath = cacheWorkDir.toPath().resolve("index.bin").toFile();
+        int partCnt = internalCache.configuration().getAffinity().partitions();
+
+        assertTrue(partCnt > buildIdxThreadCnt);
+
+        File idxPath = indexFile(internalCache);
 
         stopAllGrids();
 
         assertTrue(U.delete(idxPath));
 
+        buildIdxThreadPoolSize = buildIdxThreadCnt;
+
         srv = startServer();
+        srv.cache(CACHE_NAME).indexReadyFuture().get();
 
-        putData(srv, true);
+        ThreadMXBean threadMXBean = ManagementFactory.getThreadMXBean();
 
-        checkDataState(srv, true);
+        long buildIdxRunnerCnt = LongStream.of(threadMXBean.getAllThreadIds()).mapToObj(threadMXBean::getThreadInfo)
+            .filter(threadInfo -> threadInfo.getThreadName().startsWith("build-idx-runner")).count();
+
+        assertEquals(buildIdxThreadCnt, buildIdxRunnerCnt);
+    }
+
+    /**
+     * Creating a cache, table, index and populating data.
+     *
+     * @param node Node.
+     * @return Cache.
+     * @throws Exception if failed.
+     */
+    private IgniteInternalCache createAndFillTableWithIndex(IgniteEx node) throws Exception {
+        requireNonNull(node);
+
+        String cacheName = CACHE_NAME;
+
+        execute(node, "CREATE TABLE T(k int primary key, v int) WITH \"cache_name=" + cacheName +
+            ",wrap_value=false,atomicity=transactional\"");
+
+        execute(node, "CREATE INDEX IDX ON T(v)");
+
+        IgniteInternalCache cc = node.cachex(cacheName);
+
+        assertNotNull(cc);
+
+        putData(node, false);
+
+        return cc;
+    }
+
+    /**
+     * Get index file.
+     *
+     * @param internalCache Cache.
+     * @return Index file.
+     */
+    protected File indexFile(IgniteInternalCache internalCache) {
+        requireNonNull(internalCache);
+
+        File cacheWorkDir = ((FilePageStoreManager)internalCache.context().shared().pageStore())
+            .cacheWorkDir(internalCache.configuration());
+
+        return cacheWorkDir.toPath().resolve("index.bin").toFile();
     }
 
     /**
@@ -214,17 +330,9 @@ public class GridIndexRebuildSelfTest extends DynamicIndexAbstractSelfTest {
      * @throws Exception if failed.
      */
     protected IgniteEx startServer() throws Exception {
-        // Have to do this for each starting node - see GridQueryProcessor ctor, it nulls
-        // idxCls static field on each call.
-        GridQueryProcessor.idxCls = BlockingIndexing.class;
-
-        IgniteConfiguration cfg = serverConfiguration(0);
-
-        IgniteEx res = startGrid(cfg);
-
-        res.active(true);
-
-        return res;
+        IgniteEx srvNode = startGrid(serverConfiguration(0));
+        srvNode.active(true);
+        return srvNode;
     }
 
     /**
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/GridIndexRebuildWithMvccEnabledSelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/GridIndexRebuildWithMvccEnabledSelfTest.java
index 7211843..0eb7b29 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/GridIndexRebuildWithMvccEnabledSelfTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/GridIndexRebuildWithMvccEnabledSelfTest.java
@@ -28,7 +28,6 @@ import org.apache.ignite.internal.processors.cache.IgniteCacheOffheapManager;
 import org.apache.ignite.internal.processors.cache.IgniteInternalCache;
 import org.apache.ignite.internal.processors.cache.mvcc.MvccVersion;
 import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow;
-import org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager;
 import org.apache.ignite.internal.util.lang.GridCursor;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgniteBiTuple;
@@ -64,9 +63,7 @@ public class GridIndexRebuildWithMvccEnabledSelfTest extends GridIndexRebuildSel
 
         checkDataState(srv, false);
 
-        File cacheWorkDir = ((FilePageStoreManager)cc.context().shared().pageStore()).cacheWorkDir(cc.configuration());
-
-        File idxPath = cacheWorkDir.toPath().resolve("index.bin").toFile();
+        File idxPath = indexFile(cc);
 
         stopAllGrids();