You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by am...@apache.org on 2020/03/10 10:08:23 UTC
[ignite] branch master updated: IGNITE-12750: Fix SQL index build
thread pool creation. This closes #7502.
This is an automated email from the ASF dual-hosted git repository.
amashenkov pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new ba95fd1 IGNITE-12750: Fix SQL index build thread pool creation. This closes #7502.
ba95fd1 is described below
commit ba95fd13829a821ed8c7cb7c13820e293bb3aa5d
Author: Kirill Tkalenko <tk...@yandex.ru>
AuthorDate: Tue Mar 10 13:08:12 2020 +0300
IGNITE-12750: Fix SQL index build thread pool creation. This closes #7502.
Signed-off-by: Andrey V. Mashenkov <an...@gmail.com>
---
.../org/apache/ignite/internal/IgnitionEx.java | 6 +-
.../communication/TransmissionHandler.java | 1 -
.../ignite/thread/IgniteThreadPoolExecutor.java | 9 ++
.../query/h2/GridIndexRebuildSelfTest.java | 152 ++++++++++++++++++---
.../GridIndexRebuildWithMvccEnabledSelfTest.java | 5 +-
5 files changed, 144 insertions(+), 29 deletions(-)
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java b/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java
index f9f031f..cdc2353 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java
@@ -1961,13 +1961,15 @@ public class IgnitionEx {
buildIdxExecSvc = new IgniteThreadPoolExecutor(
"build-idx-runner",
cfg.getIgniteInstanceName(),
- 0,
buildIdxThreadPoolSize,
- 0,
+ buildIdxThreadPoolSize,
+ DFLT_THREAD_KEEP_ALIVE_TIME,
new LinkedBlockingQueue<>(),
GridIoPolicy.UNDEFINED,
oomeHnd
);
+
+ buildIdxExecSvc.allowCoreThreadTimeOut(true);
}
validateThreadPoolSize(cfg.getQueryThreadPoolSize(), "query");
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/TransmissionHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/TransmissionHandler.java
index c809dc7..da0dd69 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/TransmissionHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/TransmissionHandler.java
@@ -78,7 +78,6 @@ public interface TransmissionHandler {
*/
public Consumer<File> fileHandler(UUID nodeId, TransmissionMeta initMeta);
-
/**
* @param nodeId Remote node id on which the error occurred.
* @param err The err of fail handling process.
diff --git a/modules/core/src/main/java/org/apache/ignite/thread/IgniteThreadPoolExecutor.java b/modules/core/src/main/java/org/apache/ignite/thread/IgniteThreadPoolExecutor.java
index fed77ad..20d2e49 100644
--- a/modules/core/src/main/java/org/apache/ignite/thread/IgniteThreadPoolExecutor.java
+++ b/modules/core/src/main/java/org/apache/ignite/thread/IgniteThreadPoolExecutor.java
@@ -32,6 +32,9 @@ public class IgniteThreadPoolExecutor extends ThreadPoolExecutor {
/**
* Creates a new service with the given initial parameters.
*
+ * NOTE: There is a known bug. If 'corePoolSize' equals {@code 0},
+ * then the pool will degrade to a single-threaded pool.
+ *
* @param threadNamePrefix Will be added at the beginning of all created threads.
* @param igniteInstanceName Must be the name of the grid.
* @param corePoolSize The number of threads to keep in the pool, even if they are idle.
@@ -61,6 +64,9 @@ public class IgniteThreadPoolExecutor extends ThreadPoolExecutor {
/**
* Creates a new service with the given initial parameters.
*
+ * NOTE: There is a known bug. If 'corePoolSize' equals {@code 0},
+ * then the pool will degrade to a single-threaded pool.
+ *
* @param threadNamePrefix Will be added at the beginning of all created threads.
* @param igniteInstanceName Must be the name of the grid.
* @param corePoolSize The number of threads to keep in the pool, even if they are idle.
@@ -94,6 +100,9 @@ public class IgniteThreadPoolExecutor extends ThreadPoolExecutor {
/**
* Creates a new service with the given initial parameters.
*
+ * NOTE: There is a known bug. If 'corePoolSize' equals {@code 0},
+ * then the pool will degrade to a single-threaded pool.
+ * *
* @param corePoolSize The number of threads to keep in the pool, even if they are idle.
* @param maxPoolSize The maximum number of threads to allow in the pool.
* @param keepAliveTime When the number of threads is greater than the core, this is the maximum time
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/GridIndexRebuildSelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/GridIndexRebuildSelfTest.java
index 91db4cf..fc98c03 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/GridIndexRebuildSelfTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/GridIndexRebuildSelfTest.java
@@ -18,7 +18,10 @@
package org.apache.ignite.internal.processors.query.h2;
import java.io.File;
+import java.lang.management.ManagementFactory;
+import java.lang.management.ThreadMXBean;
import java.util.concurrent.CountDownLatch;
+import java.util.stream.LongStream;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.IgniteCheckedException;
@@ -32,6 +35,7 @@ import org.apache.ignite.internal.processors.cache.IgniteInternalCache;
import org.apache.ignite.internal.processors.cache.index.DynamicIndexAbstractSelfTest;
import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow;
import org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager;
+import org.apache.ignite.internal.processors.query.GridQueryIndexing;
import org.apache.ignite.internal.processors.query.GridQueryProcessor;
import org.apache.ignite.internal.processors.query.schema.SchemaIndexCacheVisitorClosure;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
@@ -39,6 +43,9 @@ import org.apache.ignite.internal.util.lang.GridCursor;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.junit.Test;
+import static java.util.Objects.nonNull;
+import static java.util.Objects.requireNonNull;
+
/**
* Index rebuild after node restart test.
*/
@@ -55,6 +62,12 @@ public class GridIndexRebuildSelfTest extends DynamicIndexAbstractSelfTest {
/** Latch to signal that rebuild may start. */
private final CountDownLatch rebuildLatch = new CountDownLatch(1);
+ /** Thread pool size for build index. */
+ private Integer buildIdxThreadPoolSize;
+
+ /** GridQueryIndexing class. */
+ private Class<? extends GridQueryIndexing> qryIndexingCls = BlockingIndexing.class;
+
/** {@inheritDoc} */
@Override protected IgniteConfiguration commonConfiguration(int idx) throws Exception {
IgniteConfiguration cfg = super.commonConfiguration(idx);
@@ -63,6 +76,19 @@ public class GridIndexRebuildSelfTest extends DynamicIndexAbstractSelfTest {
.setMaxSize(300*1024L*1024L)
.setPersistenceEnabled(true);
+ if (nonNull(buildIdxThreadPoolSize))
+ cfg.setBuildIndexThreadPoolSize(buildIdxThreadPoolSize);
+
+ return cfg;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration serverConfiguration(int idx) throws Exception {
+ IgniteConfiguration cfg = super.serverConfiguration(idx);
+
+ if (nonNull(qryIndexingCls))
+ GridQueryProcessor.idxCls = qryIndexingCls;
+
return cfg;
}
@@ -83,6 +109,7 @@ public class GridIndexRebuildSelfTest extends DynamicIndexAbstractSelfTest {
stopAllGrids();
cleanPersistenceDir();
+ GridQueryProcessor.idxCls = null;
}
/** {@inheritDoc} */
@@ -119,32 +146,121 @@ public class GridIndexRebuildSelfTest extends DynamicIndexAbstractSelfTest {
public void testIndexRebuild() throws Exception {
IgniteEx srv = startServer();
- execute(srv, "CREATE TABLE T(k int primary key, v int) WITH \"cache_name=T,wrap_value=false," +
- "atomicity=transactional\"");
+ IgniteInternalCache cc = createAndFillTableWithIndex(srv);
- execute(srv, "CREATE INDEX IDX ON T(v)");
+ checkDataState(srv, false);
- IgniteInternalCache cc = srv.cachex(CACHE_NAME);
+ File idxPath = indexFile(cc);
- assertNotNull(cc);
+ stopAllGrids();
- putData(srv, false);
+ assertTrue(U.delete(idxPath));
- checkDataState(srv, false);
+ srv = startServer();
+
+ putData(srv, true);
+
+ checkDataState(srv, true);
+ }
+
+ /**
+ * Test checks that index rebuild will be with default pool size.
+ *
+ * @throws Exception if failed.
+ */
+ @Test
+ public void testDefaultCntThreadForRebuildIdx() throws Exception {
+ checkCntThreadForRebuildIdx(IgniteConfiguration.DFLT_BUILD_IDX_THREAD_POOL_SIZE);
+ }
+
+ /**
+ * Test checks that index rebuild uses the number of threads that specified
+ * in configuration.
+ *
+ * @throws Exception if failed.
+ */
+ @Test
+ public void testCustomCntThreadForRebuildIdx() throws Exception {
+ checkCntThreadForRebuildIdx(6);
+ }
+
+ /**
+ * Check that index rebuild uses the number of threads
+ * that specified in configuration.
+ *
+ * @param buildIdxThreadCnt Thread pool size for build index,
+ * after restart node.
+ * @throws Exception if failed.
+ */
+ private void checkCntThreadForRebuildIdx(int buildIdxThreadCnt) throws Exception {
+ qryIndexingCls = null;
+
+ IgniteEx srv = startServer();
- File cacheWorkDir = ((FilePageStoreManager)cc.context().shared().pageStore()).cacheWorkDir(cc.configuration());
+ IgniteInternalCache internalCache = createAndFillTableWithIndex(srv);
- File idxPath = cacheWorkDir.toPath().resolve("index.bin").toFile();
+ int partCnt = internalCache.configuration().getAffinity().partitions();
+
+ assertTrue(partCnt > buildIdxThreadCnt);
+
+ File idxPath = indexFile(internalCache);
stopAllGrids();
assertTrue(U.delete(idxPath));
+ buildIdxThreadPoolSize = buildIdxThreadCnt;
+
srv = startServer();
+ srv.cache(CACHE_NAME).indexReadyFuture().get();
- putData(srv, true);
+ ThreadMXBean threadMXBean = ManagementFactory.getThreadMXBean();
- checkDataState(srv, true);
+ long buildIdxRunnerCnt = LongStream.of(threadMXBean.getAllThreadIds()).mapToObj(threadMXBean::getThreadInfo)
+ .filter(threadInfo -> threadInfo.getThreadName().startsWith("build-idx-runner")).count();
+
+ assertEquals(buildIdxThreadCnt, buildIdxRunnerCnt);
+ }
+
+ /**
+ * Creating a cache, table, index and populating data.
+ *
+ * @param node Node.
+ * @return Cache.
+ * @throws Exception if failed.
+ */
+ private IgniteInternalCache createAndFillTableWithIndex(IgniteEx node) throws Exception {
+ requireNonNull(node);
+
+ String cacheName = CACHE_NAME;
+
+ execute(node, "CREATE TABLE T(k int primary key, v int) WITH \"cache_name=" + cacheName +
+ ",wrap_value=false,atomicity=transactional\"");
+
+ execute(node, "CREATE INDEX IDX ON T(v)");
+
+ IgniteInternalCache cc = node.cachex(cacheName);
+
+ assertNotNull(cc);
+
+ putData(node, false);
+
+ return cc;
+ }
+
+ /**
+ * Get index file.
+ *
+ * @param internalCache Cache.
+ * @return Index file.
+ */
+ protected File indexFile(IgniteInternalCache internalCache) {
+ requireNonNull(internalCache);
+
+ File cacheWorkDir = ((FilePageStoreManager)internalCache.context().shared().pageStore())
+ .cacheWorkDir(internalCache.configuration());
+
+ return cacheWorkDir.toPath().resolve("index.bin").toFile();
}
/**
@@ -214,17 +330,9 @@ public class GridIndexRebuildSelfTest extends DynamicIndexAbstractSelfTest {
* @throws Exception if failed.
*/
protected IgniteEx startServer() throws Exception {
- // Have to do this for each starting node - see GridQueryProcessor ctor, it nulls
- // idxCls static field on each call.
- GridQueryProcessor.idxCls = BlockingIndexing.class;
-
- IgniteConfiguration cfg = serverConfiguration(0);
-
- IgniteEx res = startGrid(cfg);
-
- res.active(true);
-
- return res;
+ IgniteEx srvNode = startGrid(serverConfiguration(0));
+ srvNode.active(true);
+ return srvNode;
}
/**
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/GridIndexRebuildWithMvccEnabledSelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/GridIndexRebuildWithMvccEnabledSelfTest.java
index 7211843..0eb7b29 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/GridIndexRebuildWithMvccEnabledSelfTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/GridIndexRebuildWithMvccEnabledSelfTest.java
@@ -28,7 +28,6 @@ import org.apache.ignite.internal.processors.cache.IgniteCacheOffheapManager;
import org.apache.ignite.internal.processors.cache.IgniteInternalCache;
import org.apache.ignite.internal.processors.cache.mvcc.MvccVersion;
import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow;
-import org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager;
import org.apache.ignite.internal.util.lang.GridCursor;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteBiTuple;
@@ -64,9 +63,7 @@ public class GridIndexRebuildWithMvccEnabledSelfTest extends GridIndexRebuildSel
checkDataState(srv, false);
- File cacheWorkDir = ((FilePageStoreManager)cc.context().shared().pageStore()).cacheWorkDir(cc.configuration());
-
- File idxPath = cacheWorkDir.toPath().resolve("index.bin").toFile();
+ File idxPath = indexFile(cc);
stopAllGrids();