You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by as...@apache.org on 2020/02/21 08:43:35 UTC
[ignite] branch master updated: IGNITE-12684 Use dedicated pool for
indexes rebuilding. - Fixes #7432.
This is an automated email from the ASF dual-hosted git repository.
ascherbakov pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new f52350c IGNITE-12684 Use dedicated pool for indexes rebuilding. - Fixes #7432.
f52350c is described below
commit f52350c552f957e4c867bab9431b45ea1f16dc62
Author: ktkalenko <kt...@gridgain.com>
AuthorDate: Fri Feb 21 11:42:55 2020 +0300
IGNITE-12684 Use dedicated pool for indexes rebuilding. - Fixes #7432.
Signed-off-by: Aleksei Scherbakov <as...@apache.org>
---
.../org/apache/ignite/IgniteSystemProperties.java | 9 -
.../ignite/configuration/IgniteConfiguration.java | 36 ++-
.../apache/ignite/internal/GridKernalContext.java | 7 +
.../ignite/internal/GridKernalContextImpl.java | 12 +
.../org/apache/ignite/internal/IgniteKernal.java | 3 +
.../org/apache/ignite/internal/IgnitionEx.java | 23 ++
.../GridCacheDatabaseSharedManager.java | 99 +++---
.../wal/reader/StandaloneGridKernalContext.java | 5 +
.../processors/query/GridQueryIndexing.java | 2 +-
.../processors/query/GridQueryProcessor.java | 44 ++-
.../schema/SchemaIndexCachePartitionWorker.java | 270 ++++++++++++++++
.../query/schema/SchemaIndexCacheVisitor.java | 5 +-
.../query/schema/SchemaIndexCacheVisitorImpl.java | 340 ++++-----------------
.../junits/GridTestKernalContext.java | 1 +
.../processors/query/h2/IgniteH2Indexing.java | 73 ++---
.../CacheGroupMetricsWithIndexBuildFailTest.java | 19 +-
.../cache/CacheGroupMetricsWithIndexTest.java | 9 +-
.../cache/index/AbstractIndexingCommonTest.java | 54 +++-
.../query/h2/GridIndexRebuildSelfTest.java | 22 +-
19 files changed, 611 insertions(+), 422 deletions(-)
diff --git a/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java b/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
index 22b998e..d7596d1 100644
--- a/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
+++ b/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
@@ -1247,15 +1247,6 @@ public final class IgniteSystemProperties {
public static final String IGNITE_LOG_CLASSPATH_CONTENT_ON_STARTUP = "IGNITE_LOG_CLASSPATH_CONTENT_ON_STARTUP";
/**
- * Index rebuilding parallelism level. It sets a number of threads will be used for index rebuilding.
- * Zero value means default should be used.
- * Default value is calculated as <code>CPU count / 4</code> with upper limit of <code>4</code>.
- * <p>
- * Note: Number of threads is bounded within the range from <code>1</code> up to <code>CPU count</code>.
- */
- public static final String INDEX_REBUILDING_PARALLELISM = "INDEX_REBUILDING_PARALLELISM";
-
- /**
* Threshold timeout for long transactions, if transaction exceeds it, it will be dumped in log with
* information about how much time did it spent in system time (time while aquiring locks, preparing,
* commiting, etc) and user time (time when client node runs some code while holding transaction and not
diff --git a/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java b/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java
index a086e04..2ba4e29 100644
--- a/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java
+++ b/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java
@@ -86,6 +86,8 @@ import org.apache.ignite.spi.systemview.SystemViewExporterSpi;
import org.apache.ignite.ssl.SslContextFactory;
import org.jetbrains.annotations.Nullable;
+import static java.lang.Math.max;
+import static java.lang.Math.min;
import static org.apache.ignite.plugin.segmentation.SegmentationPolicy.STOP;
/**
@@ -154,7 +156,7 @@ public class IgniteConfiguration {
public static final int AVAILABLE_PROC_CNT = Runtime.getRuntime().availableProcessors();
/** Default core size of public thread pool. */
- public static final int DFLT_PUBLIC_THREAD_CNT = Math.max(8, AVAILABLE_PROC_CNT);
+ public static final int DFLT_PUBLIC_THREAD_CNT = max(8, AVAILABLE_PROC_CNT);
/** Default size of data streamer thread pool. */
public static final int DFLT_DATA_STREAMER_POOL_SIZE = DFLT_PUBLIC_THREAD_CNT;
@@ -180,6 +182,9 @@ public class IgniteConfiguration {
/** Default size of query thread pool. */
public static final int DFLT_QUERY_THREAD_POOL_SIZE = DFLT_PUBLIC_THREAD_CNT;
+ /** Default size of index create/rebuild thread pool. */
+ public static final int DFLT_BUILD_IDX_THREAD_POOL_SIZE = min(4, max(1, AVAILABLE_PROC_CNT / 4));
+
/** Default Ignite thread keep alive time. */
public static final long DFLT_THREAD_KEEP_ALIVE_TIME = 60_000L;
@@ -302,6 +307,9 @@ public class IgniteConfiguration {
/** Query pool size. */
private int qryPoolSize = DFLT_QUERY_THREAD_POOL_SIZE;
+ /** Index create/rebuild pool size. */
+ private int buildIdxPoolSize = DFLT_BUILD_IDX_THREAD_POOL_SIZE;
+
/** SQL query history size. */
private int sqlQryHistSize = DFLT_SQL_QUERY_HISTORY_SIZE;
@@ -693,6 +701,7 @@ public class IgniteConfiguration {
pluginProvs = cfg.getPluginProviders();
pubPoolSize = cfg.getPublicThreadPoolSize();
qryPoolSize = cfg.getQueryThreadPoolSize();
+ buildIdxPoolSize = cfg.getBuildIndexThreadPoolSize();
rebalanceThreadPoolSize = cfg.getRebalanceThreadPoolSize();
rebalanceTimeout = cfg.getRebalanceTimeout();
rebalanceBatchesPrefetchCnt = cfg.getRebalanceBatchesPrefetchCount();
@@ -1082,6 +1091,31 @@ public class IgniteConfiguration {
}
/**
+ * Size of thread pool for create/rebuild index.
+ * <p>
+ * If not provided, executor service will have size
+ * {@link #DFLT_BUILD_IDX_THREAD_POOL_SIZE}.
+ *
+ * @return Thread pool size for create/rebuild index.
+ */
+ public int getBuildIndexThreadPoolSize() {
+ return buildIdxPoolSize;
+ }
+
+ /**
+ * Sets index create/rebuild thread pool size to use within grid.
+ *
+ * @param poolSize Thread pool size to use within grid.
+ * @return {@code this} for chaining.
+ * @see IgniteConfiguration#getBuildIndexThreadPoolSize()
+ */
+ public IgniteConfiguration setBuildIndexThreadPoolSize(int poolSize) {
+ buildIdxPoolSize = poolSize;
+
+ return this;
+ }
+
+ /**
* Number of SQL query history elements to keep in memory. If not provided, then default value {@link
* #DFLT_SQL_QUERY_HISTORY_SIZE} is used. If provided value is less or equals 0, then gathering SQL query history
* will be switched off.
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java
index 5be2327..af9f777 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java
@@ -776,4 +776,11 @@ public interface GridKernalContext extends Iterable<GridComponent> {
* @return Local continuous tasks processor.
*/
public DurableBackgroundTasksProcessor durableBackgroundTasksProcessor();
+
+ /**
+ * Return Thread pool for create/rebuild indexes.
+ *
+ * @return Thread pool for create/rebuild indexes.
+ */
+ public ExecutorService buildIndexExecutorService();
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java
index 4bd3aa9..7196dea 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java
@@ -383,6 +383,10 @@ public class GridKernalContextImpl implements GridKernalContext, Externalizable
@GridToStringExclude
protected ExecutorService idxExecSvc;
+ /** Thread pool for create/rebuild indexes. */
+ @GridToStringExclude
+ private ExecutorService buildIdxExecSvc;
+
/** */
@GridToStringExclude
protected IgniteStripedThreadPoolExecutor callbackExecSvc;
@@ -490,6 +494,7 @@ public class GridKernalContextImpl implements GridKernalContext, Externalizable
* @param restExecSvc REST executor service.
* @param affExecSvc Affinity executor service.
* @param idxExecSvc Indexing executor service.
+ * @param buildIdxExecSvc Create/rebuild indexes executor service.
* @param callbackExecSvc Callback executor service.
* @param qryExecSvc Query executor service.
* @param schemaExecSvc Schema executor service.
@@ -519,6 +524,7 @@ public class GridKernalContextImpl implements GridKernalContext, Externalizable
ExecutorService restExecSvc,
ExecutorService affExecSvc,
@Nullable ExecutorService idxExecSvc,
+ @Nullable ExecutorService buildIdxExecSvc,
IgniteStripedThreadPoolExecutor callbackExecSvc,
ExecutorService qryExecSvc,
ExecutorService schemaExecSvc,
@@ -550,6 +556,7 @@ public class GridKernalContextImpl implements GridKernalContext, Externalizable
this.restExecSvc = restExecSvc;
this.affExecSvc = affExecSvc;
this.idxExecSvc = idxExecSvc;
+ this.buildIdxExecSvc = buildIdxExecSvc;
this.callbackExecSvc = callbackExecSvc;
this.qryExecSvc = qryExecSvc;
this.schemaExecSvc = schemaExecSvc;
@@ -1317,4 +1324,9 @@ public class GridKernalContextImpl implements GridKernalContext, Externalizable
@Override public DurableBackgroundTasksProcessor durableBackgroundTasksProcessor() {
return durableBackgroundTasksProcessor;
}
+
+ /** {@inheritDoc} */
+ @Override public ExecutorService buildIndexExecutorService() {
+ return buildIdxExecSvc;
+ }
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
index 9892574..96c04b2 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
@@ -968,6 +968,7 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable {
* @param restExecSvc Reset executor service.
* @param affExecSvc Affinity executor service.
* @param idxExecSvc Indexing executor service.
+ * @param buildIdxExecSvc Create/rebuild indexes executor service.
* @param callbackExecSvc Callback executor service.
* @param qryExecSvc Query executor service.
* @param schemaExecSvc Schema executor service.
@@ -993,6 +994,7 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable {
ExecutorService restExecSvc,
ExecutorService affExecSvc,
@Nullable ExecutorService idxExecSvc,
+ @Nullable ExecutorService buildIdxExecSvc,
IgniteStripedThreadPoolExecutor callbackExecSvc,
ExecutorService qryExecSvc,
ExecutorService schemaExecSvc,
@@ -1119,6 +1121,7 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable {
restExecSvc,
affExecSvc,
idxExecSvc,
+ buildIdxExecSvc,
callbackExecSvc,
qryExecSvc,
schemaExecSvc,
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java b/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java
index 42194c5..f9f031f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java
@@ -1565,6 +1565,9 @@ public class IgnitionEx {
/** Indexing pool. */
private ThreadPoolExecutor idxExecSvc;
+ /** Thread pool for create/rebuild indexes. */
+ private ThreadPoolExecutor buildIdxExecSvc;
+
/** Continuous query executor service. */
private IgniteStripedThreadPoolExecutor callbackExecSvc;
@@ -1950,6 +1953,21 @@ public class IgnitionEx {
GridIoPolicy.IDX_POOL,
oomeHnd
);
+
+ int buildIdxThreadPoolSize = cfg.getBuildIndexThreadPoolSize();
+
+ validateThreadPoolSize(buildIdxThreadPoolSize, "build-idx");
+
+ buildIdxExecSvc = new IgniteThreadPoolExecutor(
+ "build-idx-runner",
+ cfg.getIgniteInstanceName(),
+ 0,
+ buildIdxThreadPoolSize,
+ 0,
+ new LinkedBlockingQueue<>(),
+ GridIoPolicy.UNDEFINED,
+ oomeHnd
+ );
}
validateThreadPoolSize(cfg.getQueryThreadPoolSize(), "query");
@@ -2047,6 +2065,7 @@ public class IgnitionEx {
restExecSvc,
affExecSvc,
idxExecSvc,
+ buildIdxExecSvc,
callbackExecSvc,
qryExecSvc,
schemaExecSvc,
@@ -2756,6 +2775,10 @@ public class IgnitionEx {
idxExecSvc = null;
+ U.shutdownNow(getClass(), buildIdxExecSvc, log);
+
+ buildIdxExecSvc = null;
+
U.shutdownNow(getClass(), callbackExecSvc, log);
callbackExecSvc = null;
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
index b11bb62..0c5c378 100755
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
@@ -76,7 +76,6 @@ import org.apache.ignite.IgniteInterruptedException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.IgniteSystemProperties;
import org.apache.ignite.cluster.ClusterNode;
-import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.CheckpointWriteOrder;
import org.apache.ignite.configuration.DataPageEvictionMode;
import org.apache.ignite.configuration.DataRegionConfiguration;
@@ -167,7 +166,6 @@ import org.apache.ignite.internal.util.future.GridFinishedFuture;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
import org.apache.ignite.internal.util.lang.GridInClosure3X;
import org.apache.ignite.internal.util.tostring.GridToStringInclude;
-import org.apache.ignite.internal.util.typedef.CI1;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.T2;
import org.apache.ignite.internal.util.typedef.X;
@@ -192,6 +190,8 @@ import org.jetbrains.annotations.Nullable;
import org.jsr166.ConcurrentLinkedHashMap;
import static java.nio.file.StandardOpenOption.READ;
+import static java.util.Objects.isNull;
+import static java.util.Objects.nonNull;
import static org.apache.ignite.IgniteSystemProperties.IGNITE_CHECKPOINT_READ_LOCK_TIMEOUT;
import static org.apache.ignite.IgniteSystemProperties.IGNITE_JVM_PAUSE_DETECTOR_THRESHOLD;
import static org.apache.ignite.IgniteSystemProperties.IGNITE_PDS_WAL_REBALANCE_THRESHOLD;
@@ -1489,68 +1489,79 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
}
/** {@inheritDoc} */
- @Override public void rebuildIndexesIfNeeded(GridDhtPartitionsExchangeFuture fut) {
+ @Override public void rebuildIndexesIfNeeded(GridDhtPartitionsExchangeFuture exchangeFut) {
GridQueryProcessor qryProc = cctx.kernalContext().query();
- if (qryProc.moduleEnabled()) {
- GridCountDownCallback rebuildIndexesCompleteCntr = new GridCountDownCallback(
- cctx.cacheContexts().size(),
- () -> log().info("Indexes rebuilding completed for all caches."),
- 1 //need at least 1 index rebuilded to print message about rebuilding completion
- );
+ if (!qryProc.moduleEnabled())
+ return;
- for (final GridCacheContext cacheCtx : (Collection<GridCacheContext>)cctx.cacheContexts()) {
- if (cacheCtx.startTopologyVersion().equals(fut.initialVersion())) {
- final int cacheId = cacheCtx.cacheId();
- final GridFutureAdapter<Void> usrFut = idxRebuildFuts.get(cacheId);
+ Collection<GridCacheContext> cacheContexts = cctx.cacheContexts();
- IgniteInternalFuture<?> rebuildFut = qryProc.rebuildIndexesFromHash(cacheCtx);
+ GridCountDownCallback rebuildIndexesCompleteCntr = new GridCountDownCallback(
+ cacheContexts.size(),
+ () -> {
+ if (log.isInfoEnabled())
+ log.info("Indexes rebuilding completed for all caches.");
+ },
+ 1 //need at least 1 index rebuilded to print message about rebuilding completion
+ );
- if (rebuildFut != null) {
- log().info("Started indexes rebuilding for cache [name=" + cacheCtx.name()
- + ", grpName=" + cacheCtx.group().name() + ']');
+ for (GridCacheContext cacheCtx : cacheContexts) {
+ if (!cacheCtx.startTopologyVersion().equals(exchangeFut.initialVersion()))
+ continue;
- assert usrFut != null : "Missing user future for cache: " + cacheCtx.name();
+ int cacheId = cacheCtx.cacheId();
+ GridFutureAdapter<Void> usrFut = idxRebuildFuts.get(cacheId);
- rebuildFut.listen(new CI1<IgniteInternalFuture>() {
- @Override public void apply(IgniteInternalFuture fut) {
- idxRebuildFuts.remove(cacheId, usrFut);
+ IgniteInternalFuture<?> rebuildFut = qryProc.rebuildIndexesFromHash(cacheCtx);
- Throwable err = fut.error();
+ if (nonNull(rebuildFut)) {
+ if (log.isInfoEnabled())
+ log.info("Started indexes rebuilding for cache [" + cacheInfo(cacheCtx) + ']');
- usrFut.onDone(err);
+ assert nonNull(usrFut) : "Missing user future for cache: " + cacheCtx.name();
- CacheConfiguration ccfg = cacheCtx.config();
+ rebuildFut.listen(fut -> {
+ idxRebuildFuts.remove(cacheId, usrFut);
- if (ccfg != null) {
- if (err == null)
- log().info("Finished indexes rebuilding for cache [name=" + ccfg.getName()
- + ", grpName=" + ccfg.getGroupName() + ']');
- else {
- if (!(err instanceof NodeStoppingException))
- log().error("Failed to rebuild indexes for cache [name=" + ccfg.getName()
- + ", grpName=" + ccfg.getGroupName() + ']', err);
- }
- }
+ Throwable err = fut.error();
- rebuildIndexesCompleteCntr.countDown(true);
- }
- });
+ usrFut.onDone(err);
+
+ if (isNull(err)) {
+ if (log.isInfoEnabled())
+ log.info("Finished indexes rebuilding for cache [" + cacheInfo(cacheCtx) + ']');
}
else {
- if (usrFut != null) {
- idxRebuildFuts.remove(cacheId, usrFut);
+ if (!(err instanceof NodeStoppingException))
+ log.error("Failed to rebuild indexes for cache [" + cacheInfo(cacheCtx) + ']', err);
+ }
- usrFut.onDone();
+ rebuildIndexesCompleteCntr.countDown(true);
+ });
+ }
+ else if (nonNull(usrFut)) {
+ idxRebuildFuts.remove(cacheId, usrFut);
- rebuildIndexesCompleteCntr.countDown(false);
- }
- }
- }
+ usrFut.onDone();
+
+ rebuildIndexesCompleteCntr.countDown(false);
}
}
}
+ /**
+ * Return short information about cache.
+ *
+ * @param cacheCtx Cache context.
+ * @return Short cache info.
+ */
+ private String cacheInfo(GridCacheContext cacheCtx) {
+ assert nonNull(cacheCtx);
+
+ return "name=" + cacheCtx.name() + ", grpName=" + cacheCtx.group().name();
+ }
+
/** {@inheritDoc} */
@Nullable @Override public IgniteInternalFuture indexRebuildFuture(int cacheId) {
return idxRebuildFuts.get(cacheId);
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneGridKernalContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneGridKernalContext.java
index e465545..88203c5 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneGridKernalContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneGridKernalContext.java
@@ -742,4 +742,9 @@ public class StandaloneGridKernalContext implements GridKernalContext {
@Override public DurableBackgroundTasksProcessor durableBackgroundTasksProcessor() {
return null;
}
+
+ /** {@inheritDoc} */
+ @Override public ExecutorService buildIndexExecutorService() {
+ return null;
+ }
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java
index 4ca363e..8e11099 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java
@@ -332,7 +332,7 @@ public interface GridQueryIndexing {
* @param cctx Cache context.
* @return Future completed when index rebuild finished.
*/
- public IgniteInternalFuture<?> rebuildIndexesFromHash(GridCacheContext cctx);
+ IgniteInternalFuture<?> rebuildIndexesFromHash(GridCacheContext cctx);
/**
* Mark as rebuild needed for the given cache.
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
index 5dfffee..01cc322 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
@@ -81,6 +81,7 @@ import org.apache.ignite.internal.processors.cacheobject.IgniteCacheObjectProces
import org.apache.ignite.internal.processors.query.property.QueryBinaryProperty;
import org.apache.ignite.internal.processors.query.schema.SchemaIndexCacheFilter;
import org.apache.ignite.internal.processors.query.schema.SchemaIndexCacheVisitor;
+import org.apache.ignite.internal.processors.query.schema.SchemaIndexCacheVisitorClosure;
import org.apache.ignite.internal.processors.query.schema.SchemaIndexCacheVisitorImpl;
import org.apache.ignite.internal.processors.query.schema.SchemaIndexOperationCancellationToken;
import org.apache.ignite.internal.processors.query.schema.SchemaOperationClientFuture;
@@ -100,6 +101,7 @@ import org.apache.ignite.internal.processors.timeout.GridTimeoutProcessor;
import org.apache.ignite.internal.util.GridBoundedConcurrentLinkedHashSet;
import org.apache.ignite.internal.util.GridSpinBusyLock;
import org.apache.ignite.internal.util.future.GridFinishedFuture;
+import org.apache.ignite.internal.util.future.GridFutureAdapter;
import org.apache.ignite.internal.util.lang.GridCloseableIterator;
import org.apache.ignite.internal.util.lang.GridClosureException;
import org.apache.ignite.internal.util.lang.IgniteOutClosureX;
@@ -1570,7 +1572,7 @@ public class GridQueryProcessor extends GridProcessorAdapter {
try {
if (op instanceof SchemaIndexCreateOperation) {
- final SchemaIndexCreateOperation op0 = (SchemaIndexCreateOperation)op;
+ SchemaIndexCreateOperation op0 = (SchemaIndexCreateOperation)op;
QueryIndexDescriptorImpl idxDesc = QueryUtils.createIndexDescriptor(type, op0.index());
@@ -1579,11 +1581,40 @@ public class GridQueryProcessor extends GridProcessorAdapter {
if (cacheInfo.isCacheContextInited()) {
GridCacheContext cctx = cacheInfo.cacheContext();
- SchemaIndexCacheFilter filter = new TableCacheFilter(cctx, op0.tableName());
+ int buildIdxPoolSize = ctx.config().getBuildIndexThreadPoolSize();
+ int parallel = op0.parallel();
- cctx.group().metrics().addIndexBuildCountPartitionsLeft(cctx.topology().localPartitions().size());
+ if (parallel > buildIdxPoolSize) {
+ String idxName = op0.indexName();
- visitor = new SchemaIndexCacheVisitorImpl(cctx, filter, cancelTok, op0.parallel());
+ log.warning("Provided parallelism " + parallel + " for creation of index " + idxName +
+ " is greater than the number of index building threads. Will use " + buildIdxPoolSize +
+ " threads to build index. Increase by IgniteConfiguration.setBuildIndexThreadPoolSize" +
+ " and restart the node if you want to use more threads. [tableName=" + op0.tableName() +
+ ", indexName=" + idxName + ", requestedParallelism=" + parallel + ", buildIndexPoolSize=" +
+ buildIdxPoolSize + "]");
+ }
+
+ GridFutureAdapter<Void> createIdxFut = new GridFutureAdapter<>();
+
+ visitor = new SchemaIndexCacheVisitorImpl(
+ cacheInfo.cacheContext(),
+ new TableCacheFilter(cctx, op0.tableName()),
+ cancelTok,
+ createIdxFut
+ ) {
+ /** {@inheritDoc} */
+ @Override public void visit(SchemaIndexCacheVisitorClosure clo) {
+ super.visit(clo);
+
+ try {
+ buildIdxFut.get();
+ }
+ catch (Exception e) {
+ throw new IgniteException(e);
+ }
+ }
+ };
}
else
//For not started caches we shouldn't add any data to index.
@@ -1974,6 +2005,8 @@ public class GridQueryProcessor extends GridProcessorAdapter {
* @return Future that will be completed when rebuilding is finished.
*/
public IgniteInternalFuture<?> rebuildIndexesFromHash(GridCacheContext cctx) {
+ assert nonNull(cctx);
+
// Indexing module is disabled, nothing to rebuild.
if (rebuildIsMeaningless(cctx))
return null;
@@ -1992,9 +2025,10 @@ public class GridQueryProcessor extends GridProcessorAdapter {
if (empty)
return null;
- if (!busyLock.enterBusy())
+ if (!busyLock.enterBusy()) {
return new GridFinishedFuture<>(new NodeStoppingException("Failed to rebuild indexes from hash " +
"(grid is stopping)."));
+ }
try {
return idx.rebuildIndexesFromHash(cctx);
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/SchemaIndexCachePartitionWorker.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/SchemaIndexCachePartitionWorker.java
new file mode 100644
index 0000000..c4657d0
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/SchemaIndexCachePartitionWorker.java
@@ -0,0 +1,270 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.schema;
+
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.internal.IgniteInterruptedCheckedException;
+import org.apache.ignite.internal.processors.cache.GridCacheContext;
+import org.apache.ignite.internal.processors.cache.GridCacheEntryEx;
+import org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedException;
+import org.apache.ignite.internal.processors.cache.KeyCacheObject;
+import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtInvalidPartitionException;
+import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtLocalPartition;
+import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState;
+import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow;
+import org.apache.ignite.internal.util.future.GridFutureAdapter;
+import org.apache.ignite.internal.util.lang.GridCursor;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.internal.util.worker.GridWorker;
+import org.jetbrains.annotations.Nullable;
+
+import static java.util.Objects.nonNull;
+import static org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState.EVICTED;
+import static org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState.MOVING;
+import static org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState.OWNING;
+import static org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState.RENTING;
+import static org.apache.ignite.internal.processors.cache.persistence.CacheDataRowAdapter.RowData.KEY_ONLY;
+
+/**
+ * Worker for creating/rebuilding indexes for cache per partition.
+ */
+public class SchemaIndexCachePartitionWorker extends GridWorker {
+ /** Count of rows, being processed within a single checkpoint lock. */
+ private static final int BATCH_SIZE = 1000;
+
+ /** Cache context. */
+ private final GridCacheContext cctx;
+
+ /** Stop flag between all workers for one cache. */
+ private final AtomicBoolean stop;
+
+ /** Cancellation token between all workers for all caches. */
+ private final SchemaIndexOperationCancellationToken cancel;
+
+ /** Index closure. */
+ private final SchemaIndexCacheVisitorClosure clo;
+
+ /** Partition. */
+ private final GridDhtLocalPartition locPart;
+
+ /** Worker future. */
+ private final GridFutureAdapter<Void> fut;
+
+ /** Row filter. */
+ private final SchemaIndexCacheFilter rowFilter;
+
+ /** Count of partitions to be processed. */
+ private final AtomicInteger partsCnt;
+
+ /**
+ * Constructor.
+ *
+ * @param cctx Cache context.
+ * @param locPart Partition.
+ * @param stop Stop flag between all workers for one cache.
+ * @param cancel Cancellation token between all workers for all caches.
+ * @param clo Index closure.
+ * @param fut Worker future.
+ * @param rowFilter Row filter.
+ * @param partsCnt Count of partitions to be processed.
+ */
+ public SchemaIndexCachePartitionWorker(
+ GridCacheContext cctx,
+ GridDhtLocalPartition locPart,
+ AtomicBoolean stop,
+ SchemaIndexOperationCancellationToken cancel,
+ SchemaIndexCacheVisitorClosure clo,
+ GridFutureAdapter<Void> fut,
+ @Nullable SchemaIndexCacheFilter rowFilter,
+ AtomicInteger partsCnt
+ ) {
+ super(
+ cctx.igniteInstanceName(),
+ "parallel-idx-worker-" + cctx.cache().name() + "-part-" + locPart.id(),
+ cctx.logger(SchemaIndexCachePartitionWorker.class)
+ );
+
+ this.cctx = cctx;
+ this.locPart = locPart;
+ this.cancel = cancel;
+
+ assert nonNull(stop);
+ assert nonNull(clo);
+ assert nonNull(fut);
+ assert nonNull(partsCnt);
+
+ this.stop = stop;
+ this.clo = clo;
+ this.fut = fut;
+ this.partsCnt = partsCnt;
+
+ this.rowFilter = rowFilter;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void body() throws InterruptedException, IgniteInterruptedCheckedException {
+ Throwable err = null;
+
+ try {
+ processPartition();
+ }
+ catch (Throwable e) {
+ err = Error.class.isInstance(e) ? new IgniteException(e) : e;
+
+ U.error(log, "Error during create/rebuild index for partition: " + locPart.id(), e);
+
+ stop.set(true);
+
+ int cnt = partsCnt.getAndSet(0);
+
+ if (cnt > 0)
+ cctx.group().metrics().addIndexBuildCountPartitionsLeft(-cnt);
+ }
+ finally {
+ fut.onDone(err);
+ }
+ }
+
+ /**
+ * Process partition.
+ *
+ * @throws IgniteCheckedException If failed.
+ */
+ private void processPartition() throws IgniteCheckedException {
+ if (stop.get() || stopNode())
+ return;
+
+ checkCancelled();
+
+ boolean reserved = false;
+
+ GridDhtPartitionState partState = locPart.state();
+ if (partState != EVICTED)
+ reserved = (partState == OWNING || partState == RENTING || partState == MOVING) && locPart.reserve();
+
+ if (!reserved)
+ return;
+
+ try {
+ GridCursor<? extends CacheDataRow> cursor = locPart.dataStore().cursor(
+ cctx.cacheId(),
+ null,
+ null,
+ KEY_ONLY
+ );
+
+ boolean locked = false;
+
+ try {
+ int cntr = 0;
+
+ while (cursor.next() && !stop.get() && !stopNode()) {
+ KeyCacheObject key = cursor.get().key();
+
+ if (!locked) {
+ cctx.shared().database().checkpointReadLock();
+
+ locked = true;
+ }
+
+ processKey(key);
+
+ if (++cntr % BATCH_SIZE == 0) {
+ cctx.shared().database().checkpointReadUnlock();
+
+ locked = false;
+ }
+
+ if (locPart.state() == RENTING)
+ break;
+ }
+ }
+ finally {
+ if (locked)
+ cctx.shared().database().checkpointReadUnlock();
+ }
+ }
+ finally {
+ locPart.release();
+
+ if (partsCnt.getAndUpdate(v -> v > 0 ? v - 1 : 0) > 0)
+ cctx.group().metrics().decrementIndexBuildCountPartitionsLeft();
+ }
+ }
+
+ /**
+ * Process single key.
+ *
+ * @param key Key.
+ * @throws IgniteCheckedException If failed.
+ */
+ private void processKey(KeyCacheObject key) throws IgniteCheckedException {
+ assert nonNull(key);
+
+ while (true) {
+ try {
+ checkCancelled();
+
+ GridCacheEntryEx entry = cctx.cache().entryEx(key);
+
+ try {
+ entry.updateIndex(rowFilter, clo);
+ }
+ finally {
+ entry.touch();
+ }
+
+ break;
+ }
+ catch (GridDhtInvalidPartitionException ignore) {
+ break;
+ }
+ catch (GridCacheEntryRemovedException ignored) {
+ // No-op.
+ }
+ }
+ }
+
+ /**
+ * Check if visit process is not cancelled.
+ *
+ * @throws IgniteCheckedException If cancelled.
+ */
+ private void checkCancelled() throws IgniteCheckedException {
+ if (nonNull(cancel) && cancel.isCancelled())
+ throw new IgniteCheckedException("Index creation was cancelled.");
+ }
+
+ /**
+ * Returns node in the process of stopping or not.
+ *
+ * @return {@code True} if node is in the process of stopping.
+ */
+ private boolean stopNode() {
+ return cctx.kernalContext().isStopping();
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(SchemaIndexCachePartitionWorker.class, this);
+ }
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/SchemaIndexCacheVisitor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/SchemaIndexCacheVisitor.java
index 3321e66..3cdf1d1 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/SchemaIndexCacheVisitor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/SchemaIndexCacheVisitor.java
@@ -17,8 +17,6 @@
package org.apache.ignite.internal.processors.query.schema;
-import org.apache.ignite.IgniteCheckedException;
-
/**
* Closure that internally applies given {@link SchemaIndexCacheVisitorClosure} to some set of entries.
*/
@@ -27,7 +25,6 @@ public interface SchemaIndexCacheVisitor {
* Visit cache entries and pass them to closure.
*
* @param clo Closure.
- * @throws IgniteCheckedException If failed.
*/
- public void visit(SchemaIndexCacheVisitorClosure clo) throws IgniteCheckedException;
+ void visit(SchemaIndexCacheVisitorClosure clo);
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/SchemaIndexCacheVisitorImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/SchemaIndexCacheVisitorImpl.java
index 5a54b81..337ae15 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/SchemaIndexCacheVisitorImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/SchemaIndexCacheVisitorImpl.java
@@ -18,44 +18,25 @@
package org.apache.ignite.internal.processors.query.schema;
import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
-import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteLogger;
-import org.apache.ignite.IgniteSystemProperties;
-import org.apache.ignite.internal.IgniteInterruptedCheckedException;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
-import org.apache.ignite.internal.processors.cache.GridCacheEntryEx;
-import org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedException;
-import org.apache.ignite.internal.processors.cache.KeyCacheObject;
-import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtInvalidPartitionException;
import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtLocalPartition;
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearCacheAdapter;
-import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow;
-import org.apache.ignite.internal.processors.cache.persistence.CacheDataRowAdapter;
import org.apache.ignite.internal.util.future.GridCompoundFuture;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
-import org.apache.ignite.internal.util.lang.GridCursor;
import org.apache.ignite.internal.util.typedef.internal.S;
-import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.internal.util.worker.GridWorker;
-import org.apache.ignite.thread.IgniteThread;
+import org.apache.ignite.internal.util.worker.GridWorkerFuture;
+import org.jetbrains.annotations.Nullable;
-import static org.apache.ignite.IgniteSystemProperties.INDEX_REBUILDING_PARALLELISM;
-import static org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState.EVICTED;
-import static org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState.MOVING;
-import static org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState.OWNING;
-import static org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState.RENTING;
+import static java.util.Objects.nonNull;
/**
- * Traversor operating all primary and backup partitions of given cache.
+ * Visitor who create/rebuild indexes in parallel by partition for a given cache.
*/
public class SchemaIndexCacheVisitorImpl implements SchemaIndexCacheVisitor {
- /** Default degree of parallelism for rebuilding indexes. */
- private static final int DFLT_INDEX_REBUILDING_PARALLELISM;
-
- /** Count of rows, being processed within a single checkpoint lock. */
- private static final int BATCH_SIZE = 1000;
-
/** Cache context. */
private final GridCacheContext cctx;
@@ -65,315 +46,98 @@ public class SchemaIndexCacheVisitorImpl implements SchemaIndexCacheVisitor {
/** Cancellation token. */
private final SchemaIndexOperationCancellationToken cancel;
- /** Parallelism. */
- private final int parallelism;
-
- /** Whether to stop the process. */
- private volatile boolean stop;
-
- /** Count of partitions to be processed. */
- private final AtomicInteger partsCnt = new AtomicInteger();
+ /** Future for create/rebuild index. */
+ protected final GridFutureAdapter<Void> buildIdxFut;
/** Logger. */
protected IgniteLogger log;
- static {
- int parallelism = IgniteSystemProperties.getInteger(INDEX_REBUILDING_PARALLELISM, 0);
-
- // Parallelism lvl is bounded to range of [1, CPUs count]
- if (parallelism > 0)
- DFLT_INDEX_REBUILDING_PARALLELISM = Math.min(parallelism, Runtime.getRuntime().availableProcessors());
- else
- DFLT_INDEX_REBUILDING_PARALLELISM = Math.min(4, Math.max(1, Runtime.getRuntime().availableProcessors() / 4));
- }
-
- /**
- * Constructor.
- * @param cctx Cache context.
- */
- public SchemaIndexCacheVisitorImpl(GridCacheContext cctx) {
- this(cctx, null, null, 0);
- }
-
/**
* Constructor.
*
* @param cctx Cache context.
* @param rowFilter Row filter.
* @param cancel Cancellation token.
- * @param parallelism Degree of parallelism.
+ * @param buildIdxFut Future for create/rebuild index.
*/
- public SchemaIndexCacheVisitorImpl(GridCacheContext cctx, SchemaIndexCacheFilter rowFilter,
- SchemaIndexOperationCancellationToken cancel, int parallelism) {
- this.rowFilter = rowFilter;
- this.cancel = cancel;
-
- // Parallelism lvl is bounded to range of [1, CPUs count]
- if (parallelism > 0)
- this.parallelism = Math.min(Runtime.getRuntime().availableProcessors(), parallelism);
- else
- this.parallelism = DFLT_INDEX_REBUILDING_PARALLELISM;
+ public SchemaIndexCacheVisitorImpl(
+ GridCacheContext cctx,
+ @Nullable SchemaIndexCacheFilter rowFilter,
+ @Nullable SchemaIndexOperationCancellationToken cancel,
+ GridFutureAdapter<Void> buildIdxFut
+ ) {
+ assert nonNull(cctx);
+ assert nonNull(buildIdxFut);
if (cctx.isNear())
cctx = ((GridNearCacheAdapter)cctx.cache()).dht().context();
this.cctx = cctx;
+ this.buildIdxFut = buildIdxFut;
+
+ this.cancel = cancel;
+ this.rowFilter = rowFilter;
log = cctx.kernalContext().log(getClass());
}
/** {@inheritDoc} */
- @Override public void visit(SchemaIndexCacheVisitorClosure clo) throws IgniteCheckedException {
- assert clo != null;
-
- List<GridDhtLocalPartition> parts = cctx.topology().localPartitions();
-
- if (parts.isEmpty())
- return;
-
- partsCnt.set(parts.size());
+ @Override public void visit(SchemaIndexCacheVisitorClosure clo) {
+ assert nonNull(clo);
- GridCompoundFuture<Void, Void> fut = null;
+ List<GridDhtLocalPartition> locParts = cctx.topology().localPartitions();
- if (parallelism > 1) {
- fut = new GridCompoundFuture<>();
+ if (locParts.isEmpty()) {
+ buildIdxFut.onDone();
- for (int i = 1; i < parallelism; i++)
- fut.add(processPartitionsAsync(parts, clo, i));
-
- fut.markInitialized();
- }
-
- try {
- processPartitions(parts, clo, 0);
- }
- catch (Throwable e) {
- U.error(log, "Error during parallel index create/rebuild.", e);
-
- stop = true;
-
- resetPartitionsCount();
-
- throw e;
- }
-
- if (fut != null)
- fut.get();
- }
-
- /**
- * Process partitions asynchronously.
- *
- * @param parts Partitions.
- * @param clo Closure.
- * @param remainder Remainder.
- * @return Future.
- */
- private GridFutureAdapter<Void> processPartitionsAsync(List<GridDhtLocalPartition> parts,
- SchemaIndexCacheVisitorClosure clo, int remainder) {
- GridFutureAdapter<Void> fut = new GridFutureAdapter<>();
-
- AsyncWorker worker = new AsyncWorker(parts, clo, remainder, fut);
-
- new IgniteThread(worker).start();
-
- return fut;
- }
-
- /**
- * Process partitions.
- *
- * @param parts Partitions.
- * @param clo Closure.
- * @param remainder Remainder.
- * @throws IgniteCheckedException If failed.
- */
- private void processPartitions(List<GridDhtLocalPartition> parts, SchemaIndexCacheVisitorClosure clo,
- int remainder)
- throws IgniteCheckedException {
- for (int i = 0, size = parts.size(); i < size; i++) {
- if (stop)
- break;
-
- if ((i % parallelism) == remainder)
- processPartition(parts.get(i), clo);
- }
- }
-
- /**
- * Process partition.
- *
- * @param part Partition.
- * @param clo Index closure.
- * @throws IgniteCheckedException If failed.
- */
- private void processPartition(GridDhtLocalPartition part, SchemaIndexCacheVisitorClosure clo)
- throws IgniteCheckedException {
- checkCancelled();
-
- boolean reserved = false;
-
- if (part != null && part.state() != EVICTED)
- reserved = (part.state() == OWNING || part.state() == RENTING || part.state() == MOVING) && part.reserve();
-
- if (!reserved)
return;
+ }
- try {
- GridCursor<? extends CacheDataRow> cursor = part.dataStore().cursor(cctx.cacheId(), null, null,
- CacheDataRowAdapter.RowData.KEY_ONLY);
-
- boolean locked = false;
+ cctx.group().metrics().addIndexBuildCountPartitionsLeft(locParts.size());
- try {
- int cntr = 0;
+ beforeExecute();
- while (cursor.next() && !stop) {
- KeyCacheObject key = cursor.get().key();
+ AtomicInteger partsCnt = new AtomicInteger(locParts.size());
- if (!locked) {
- cctx.shared().database().checkpointReadLock();
+ AtomicBoolean stop = new AtomicBoolean();
- locked = true;
- }
+ GridCompoundFuture<Void, Void> buildIdxCompoundFut = new GridCompoundFuture<>();
- processKey(key, clo);
+ for (GridDhtLocalPartition locPart : locParts) {
+ GridWorkerFuture<Void> workerFut = new GridWorkerFuture<>();
- if (++cntr % BATCH_SIZE == 0) {
- cctx.shared().database().checkpointReadUnlock();
+ GridWorker worker = new SchemaIndexCachePartitionWorker(
+ cctx,
+ locPart,
+ stop,
+ cancel,
+ clo,
+ workerFut,
+ rowFilter,
+ partsCnt
+ );
- locked = false;
- }
+ workerFut.setWorker(worker);
+ buildIdxCompoundFut.add(workerFut);
- if (part.state() == RENTING)
- break;
- }
- }
- finally {
- if (locked)
- cctx.shared().database().checkpointReadUnlock();
- }
+ cctx.kernalContext().buildIndexExecutorService().execute(worker);
}
- finally {
- part.release();
- if (partsCnt.getAndUpdate(v -> v > 0 ? v - 1 : 0) > 0)
- cctx.group().metrics().decrementIndexBuildCountPartitionsLeft();
- }
- }
+ buildIdxCompoundFut.listen(fut -> buildIdxFut.onDone(fut.error()));
- /**
- * Process single key.
- *
- * @param key Key.
- * @param clo Closure.
- * @throws IgniteCheckedException If failed.
- */
- private void processKey(KeyCacheObject key, SchemaIndexCacheVisitorClosure clo) throws IgniteCheckedException {
- while (true) {
- try {
- checkCancelled();
-
- GridCacheEntryEx entry = cctx.cache().entryEx(key);
-
- try {
- entry.updateIndex(rowFilter, clo);
- }
- finally {
- entry.touch();
- }
-
- break;
- }
- catch (GridDhtInvalidPartitionException ignore) {
- break;
- }
- catch (GridCacheEntryRemovedException ignored) {
- // No-op.
- }
- }
+ buildIdxCompoundFut.markInitialized();
}
/**
- * Check if visit process is not cancelled.
- *
- * @throws IgniteCheckedException If cancelled.
+ * This method is called before creating or rebuilding indexes.
+ * Used only for test.
*/
- private void checkCancelled() throws IgniteCheckedException {
- if (cancel != null && cancel.isCancelled())
- throw new IgniteCheckedException("Index creation was cancelled.");
- }
-
- /**
- * Resets value of partitions count to be processed and update metrics.
- */
- private void resetPartitionsCount() {
- int cnt = partsCnt.getAndSet(0);
-
- if (cnt > 0)
- cctx.group().metrics().addIndexBuildCountPartitionsLeft(-cnt);
+ protected void beforeExecute(){
+ //no-op
}
/** {@inheritDoc} */
@Override public String toString() {
return S.toString(SchemaIndexCacheVisitorImpl.class, this);
}
-
- /**
- * Async worker.
- */
- private class AsyncWorker extends GridWorker {
- /** Partitions. */
- private final List<GridDhtLocalPartition> parts;
-
- /** Closure. */
- private final SchemaIndexCacheVisitorClosure clo;
-
- /** Remained.. */
- private final int remainder;
-
- /** Future. */
- private final GridFutureAdapter<Void> fut;
-
- /**
- * Constructor.
- *
- * @param parts Partitions.
- * @param clo Closure.
- * @param remainder Remainder.
- * @param fut Future.
- */
- @SuppressWarnings("unchecked")
- public AsyncWorker(List<GridDhtLocalPartition> parts, SchemaIndexCacheVisitorClosure clo, int remainder,
- GridFutureAdapter<Void> fut) {
- super(cctx.igniteInstanceName(), "parallel-idx-worker-" + cctx.cache().name() + "-" + remainder,
- cctx.logger(AsyncWorker.class));
-
- this.parts = parts;
- this.clo = clo;
- this.remainder = remainder;
- this.fut = fut;
- }
-
- /** {@inheritDoc} */
- @Override protected void body() throws InterruptedException, IgniteInterruptedCheckedException {
- Throwable err = null;
-
- try {
- processPartitions(parts, clo, remainder);
- }
- catch (Throwable e) {
- err = e;
-
- U.error(log, "Error during parallel index create/rebuild.", e);
-
- stop = true;
-
- resetPartitionsCount();
- }
- finally {
- fut.onDone(err);
- }
- }
- }
}
diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridTestKernalContext.java b/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridTestKernalContext.java
index fe4ae05..1e75ff1 100644
--- a/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridTestKernalContext.java
+++ b/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridTestKernalContext.java
@@ -86,6 +86,7 @@ public class GridTestKernalContext extends GridKernalContextImpl {
null,
null,
null,
+ null,
cfg.getPluginProviders() != null && cfg.getPluginProviders().length > 0 ?
Arrays.asList(cfg.getPluginProviders()) : U.allPluginProviders(),
null,
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
index 0c08d3b..7fadf1a 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
@@ -158,6 +158,7 @@ import org.apache.ignite.internal.util.GridAtomicLong;
import org.apache.ignite.internal.util.GridEmptyCloseableIterator;
import org.apache.ignite.internal.util.GridSpinBusyLock;
import org.apache.ignite.internal.util.IgniteUtils;
+import org.apache.ignite.internal.util.future.GridFutureAdapter;
import org.apache.ignite.internal.util.lang.GridCloseableIterator;
import org.apache.ignite.internal.util.lang.GridPlainRunnable;
import org.apache.ignite.internal.util.lang.IgniteInClosure2X;
@@ -166,8 +167,6 @@ import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.X;
import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.U;
-import org.apache.ignite.internal.util.worker.GridWorker;
-import org.apache.ignite.internal.util.worker.GridWorkerFuture;
import org.apache.ignite.lang.IgniteBiClosure;
import org.apache.ignite.lang.IgniteBiTuple;
import org.apache.ignite.lang.IgniteFuture;
@@ -189,7 +188,11 @@ import org.h2.util.JdbcUtils;
import org.h2.value.DataType;
import org.jetbrains.annotations.Nullable;
+import static java.lang.Math.max;
+import static java.lang.Math.min;
import static java.util.Collections.singletonList;
+import static java.util.Objects.isNull;
+import static java.util.Objects.nonNull;
import static org.apache.ignite.IgniteSystemProperties.IGNITE_MVCC_TX_SIZE_CACHING_THRESHOLD;
import static org.apache.ignite.internal.processors.cache.mvcc.MvccCachingManager.TX_SIZE_THRESHOLD;
import static org.apache.ignite.internal.processors.cache.mvcc.MvccUtils.checkActive;
@@ -615,7 +618,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
if (tx != null) {
int remaining = (int)tx.remainingTime();
- return remaining > 0 && qryTimeout > 0 ? Math.min(remaining, qryTimeout) : Math.max(remaining, qryTimeout);
+ return remaining > 0 && qryTimeout > 0 ? min(remaining, qryTimeout) : max(remaining, qryTimeout);
}
return qryTimeout;
@@ -1938,16 +1941,20 @@ public class IgniteH2Indexing implements GridQueryIndexing {
/** {@inheritDoc} */
@Override public IgniteInternalFuture<?> rebuildIndexesFromHash(GridCacheContext cctx) {
+ assert nonNull(cctx);
+
// No data in fresh in-memory cache.
if (!cctx.group().persistenceEnabled())
return null;
IgnitePageStoreManager pageStore = cctx.shared().pageStore();
- assert pageStore != null;
+ assert nonNull(pageStore);
SchemaIndexCacheVisitorClosure clo;
+ String cacheName = cctx.name();
+
if (!pageStore.hasIndexStore(cctx.groupId())) {
// If there are no index store, rebuild all indexes.
clo = new IndexRebuildFullClosure(cctx.queries(), cctx.mvccEnabled());
@@ -1956,10 +1963,12 @@ public class IgniteH2Indexing implements GridQueryIndexing {
// Otherwise iterate over tables looking for missing indexes.
IndexRebuildPartialClosure clo0 = new IndexRebuildPartialClosure();
- for (H2TableDescriptor tblDesc : schemaMgr.tablesForCache(cctx.name())) {
- assert tblDesc.table() != null;
+ for (H2TableDescriptor tblDesc : schemaMgr.tablesForCache(cacheName)) {
+ GridH2Table tbl = tblDesc.table();
+
+ assert nonNull(tbl);
- tblDesc.table().collectIndexesForPartialRebuild(clo0);
+ tbl.collectIndexesForPartialRebuild(clo0);
}
if (clo0.hasIndexes())
@@ -1969,40 +1978,31 @@ public class IgniteH2Indexing implements GridQueryIndexing {
}
// Closure prepared, do rebuild.
- final GridWorkerFuture<?> fut = new GridWorkerFuture<>();
+ markIndexRebuild(cacheName, true);
- markIndexRebuild(cctx.name(), true);
+ GridFutureAdapter<Void> rebuildCacheIdxFut = new GridFutureAdapter<>();
- if (cctx.group().metrics() != null)
- cctx.group().metrics().addIndexBuildCountPartitionsLeft(cctx.topology().localPartitions().size());
+ rebuildCacheIdxFut.listen(fut -> {
+ Throwable err = fut.error();
- GridWorker worker = new GridWorker(ctx.igniteInstanceName(), "index-rebuild-worker-" + cctx.name(), log) {
- @Override protected void body() {
+ if (isNull(err)) {
try {
- rebuildIndexesFromHash0(cctx, clo);
-
- markIndexRebuild(cctx.name(), false);
-
- fut.onDone();
+ markIndexRebuild(cacheName, false);
}
- catch (Exception e) {
- fut.onDone(e);
- }
- catch (Throwable e) {
- U.error(log, "Failed to rebuild indexes for cache: " + cctx.name(), e);
-
- fut.onDone(e);
+ catch (Throwable t) {
+ err = t;
- throw e;
+ rebuildCacheIdxFut.onDone(t);
}
}
- };
- fut.setWorker(worker);
+ if (nonNull(err))
+ U.error(log, "Failed to rebuild indexes for cache: " + cacheName, err);
+ });
- ctx.getExecutorService().execute(worker);
+ rebuildIndexesFromHash0(cctx, clo, rebuildCacheIdxFut);
- return fut;
+ return rebuildCacheIdxFut;
}
/**
@@ -2010,13 +2010,14 @@ public class IgniteH2Indexing implements GridQueryIndexing {
*
* @param cctx Cache context.
* @param clo Closure.
- * @throws IgniteCheckedException If failed.
+ * @param rebuildIdxFut Future for rebuild indexes.
*/
- protected void rebuildIndexesFromHash0(GridCacheContext cctx, SchemaIndexCacheVisitorClosure clo)
- throws IgniteCheckedException {
- SchemaIndexCacheVisitor visitor = new SchemaIndexCacheVisitorImpl(cctx);
-
- visitor.visit(clo);
+ protected void rebuildIndexesFromHash0(
+ GridCacheContext cctx,
+ SchemaIndexCacheVisitorClosure clo,
+ GridFutureAdapter<Void> rebuildIdxFut
+ ) {
+ new SchemaIndexCacheVisitorImpl(cctx, null, null, rebuildIdxFut).visit(clo);
}
/**
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/CacheGroupMetricsWithIndexBuildFailTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/CacheGroupMetricsWithIndexBuildFailTest.java
index 4098616..af21440 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/CacheGroupMetricsWithIndexBuildFailTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/CacheGroupMetricsWithIndexBuildFailTest.java
@@ -39,11 +39,9 @@ import org.apache.ignite.spi.indexing.IndexingQueryFilter;
import org.apache.ignite.spi.indexing.IndexingSpi;
import org.apache.ignite.spi.metric.LongMetric;
import org.apache.ignite.testframework.GridTestUtils;
-import org.apache.ignite.testframework.junits.WithSystemProperty;
import org.jetbrains.annotations.Nullable;
import org.junit.Test;
-import static org.apache.ignite.IgniteSystemProperties.INDEX_REBUILDING_PARALLELISM;
import static org.apache.ignite.internal.processors.cache.CacheGroupMetricsImpl.CACHE_GROUP_METRICS_PREFIX;
import static org.apache.ignite.internal.processors.metric.impl.MetricUtils.metricName;
@@ -59,17 +57,13 @@ public class CacheGroupMetricsWithIndexBuildFailTest extends AbstractIndexingCom
/** {@inheritDoc} */
@Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
- IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
-
- cfg.setDataStorageConfiguration(new DataStorageConfiguration()
- .setDefaultDataRegionConfiguration(
- new DataRegionConfiguration().setPersistenceEnabled(true).setMaxSize(10 * 1024 * 1024)
+ return super.getConfiguration(igniteInstanceName)
+ .setDataStorageConfiguration(new DataStorageConfiguration()
+ .setDefaultDataRegionConfiguration(
+ new DataRegionConfiguration().setPersistenceEnabled(true).setMaxSize(10 * 1024 * 1024))
)
- );
-
- cfg.setIndexingSpi(new TestIndexingSpi());
-
- return cfg;
+ .setIndexingSpi(new TestIndexingSpi())
+ .setBuildIndexThreadPoolSize(4);
}
/**
@@ -100,7 +94,6 @@ public class CacheGroupMetricsWithIndexBuildFailTest extends AbstractIndexingCom
/** */
@Test
- @WithSystemProperty(key = INDEX_REBUILDING_PARALLELISM, value = "4")
public void testIndexRebuildCountPartitionsLeft() throws Exception {
IgniteEx ignite0 = startGrid(0);
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/CacheGroupMetricsWithIndexTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/CacheGroupMetricsWithIndexTest.java
index 57f69c7..e2431bd 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/CacheGroupMetricsWithIndexTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/CacheGroupMetricsWithIndexTest.java
@@ -171,19 +171,24 @@ public class CacheGroupMetricsWithIndexTest extends CacheGroupMetricsTest {
ignite.cluster().active(true);
+ BlockingIndexing blockingIndexing = (BlockingIndexing)ignite.context().query().getIndexing();
+
+ while (!blockingIndexing.isBlock(cacheName2) || !blockingIndexing.isBlock(cacheName3))
+ U.sleep(10);
+
MetricRegistry grpMreg = cacheGroupMetrics(0, GROUP_NAME_2).get2();
LongMetric indexBuildCountPartitionsLeft = grpMreg.findMetric("IndexBuildCountPartitionsLeft");
assertEquals(parts2 + parts3, indexBuildCountPartitionsLeft.value());
- ((BlockingIndexing)ignite.context().query().getIndexing()).stopBlock(cacheName2);
+ blockingIndexing.stopBlock(cacheName2);
ignite.cache(cacheName2).indexReadyFuture().get(30_000);
assertEquals(parts3, indexBuildCountPartitionsLeft.value());
- ((BlockingIndexing)ignite.context().query().getIndexing()).stopBlock(cacheName3);
+ blockingIndexing.stopBlock(cacheName3);
ignite.cache(cacheName3).indexReadyFuture().get(30_000);
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/AbstractIndexingCommonTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/AbstractIndexingCommonTest.java
index 581f1b8..8278ac9 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/AbstractIndexingCommonTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/AbstractIndexingCommonTest.java
@@ -20,24 +20,27 @@ package org.apache.ignite.internal.processors.cache.index;
import java.io.File;
import java.nio.file.Path;
import java.util.List;
+import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.stream.Collectors;
-import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.IgniteInternalCache;
import org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager;
import org.apache.ignite.internal.processors.query.h2.IgniteH2Indexing;
import org.apache.ignite.internal.processors.query.schema.SchemaIndexCacheVisitorClosure;
+import org.apache.ignite.internal.processors.query.schema.SchemaIndexCacheVisitorImpl;
+import org.apache.ignite.internal.util.future.GridFutureAdapter;
import org.apache.ignite.internal.util.typedef.G;
-import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.testframework.GridTestUtils;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import org.h2.engine.Session;
import org.h2.util.CloseWatcher;
+import static org.apache.ignite.internal.util.IgniteUtils.awaitQuiet;
+
/**
* Base class for all indexing tests to check H2 connection management.
*/
@@ -104,18 +107,43 @@ public class AbstractIndexingCommonTest extends GridCommonAbstractTest {
*/
public static class BlockingIndexing extends IgniteH2Indexing {
/** */
- private final ConcurrentHashMap<String, CountDownLatch> latches = new ConcurrentHashMap<>();
+ private final Map<String, CountDownLatch> latches = new ConcurrentHashMap<>();
/** {@inheritDoc} */
- @Override protected void rebuildIndexesFromHash0(GridCacheContext cctx, SchemaIndexCacheVisitorClosure clo)
- throws IgniteCheckedException {
- String cacheName = cctx.name();
-
- latches.computeIfAbsent(cacheName, l -> new CountDownLatch(1));
-
- U.await(latches.get(cacheName));
+ @Override protected void rebuildIndexesFromHash0(
+ GridCacheContext cctx,
+ SchemaIndexCacheVisitorClosure clo,
+ GridFutureAdapter<Void> rebuildIdxFut
+ ) {
+ CountDownLatch startThread = new CountDownLatch(1);
+
+ new Thread(() -> {
+ startThread.countDown();
+
+ new SchemaIndexCacheVisitorImpl(cctx, null, null, rebuildIdxFut) {
+ /** {@inheritDoc} */
+ @Override protected void beforeExecute() {
+ String cacheName = cctx.name();
+
+ if (log.isInfoEnabled())
+ log.info("Before execute build idx for cache=" + cacheName);
+
+ awaitQuiet(latches.computeIfAbsent(cacheName, l -> new CountDownLatch(1)));
+ }
+ }.visit(clo);
+ }).start();
+
+ awaitQuiet(startThread);
+ }
- super.rebuildIndexesFromHash0(cctx, clo);
+ /**
+ * Returns whether creating/rebuilding an index for cache is blocked.
+ *
+ * @return {@code True} if creating/rebuilding an index for cache is
+ * blocked.
+ */
+ public boolean isBlock(String cacheName) {
+ return latches.containsKey(cacheName) && latches.get(cacheName).getCount() != 0;
}
/**
@@ -124,9 +152,7 @@ public class AbstractIndexingCommonTest extends GridCommonAbstractTest {
* @param cacheName Cache name.
*/
public void stopBlock(String cacheName) {
- CountDownLatch latch = latches.computeIfAbsent(cacheName, l -> new CountDownLatch(1));
-
- latch.countDown();
+ latches.computeIfAbsent(cacheName, l -> new CountDownLatch(1)).countDown();
}
}
}
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/GridIndexRebuildSelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/GridIndexRebuildSelfTest.java
index b4bb9e9..91db4cf 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/GridIndexRebuildSelfTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/GridIndexRebuildSelfTest.java
@@ -22,8 +22,10 @@ import java.util.concurrent.CountDownLatch;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteException;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.IgniteInterruptedCheckedException;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.IgniteCacheOffheapManager;
import org.apache.ignite.internal.processors.cache.IgniteInternalCache;
@@ -32,6 +34,7 @@ import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow;
import org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager;
import org.apache.ignite.internal.processors.query.GridQueryProcessor;
import org.apache.ignite.internal.processors.query.schema.SchemaIndexCacheVisitorClosure;
+import org.apache.ignite.internal.util.future.GridFutureAdapter;
import org.apache.ignite.internal.util.lang.GridCursor;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.junit.Test;
@@ -232,14 +235,23 @@ public class GridIndexRebuildSelfTest extends DynamicIndexAbstractSelfTest {
private boolean firstRbld = true;
/** {@inheritDoc} */
- @Override protected void rebuildIndexesFromHash0(GridCacheContext cctx, SchemaIndexCacheVisitorClosure clo)
- throws IgniteCheckedException {
- if (!firstRbld)
- U.await(INSTANCE.rebuildLatch);
+ @Override protected void rebuildIndexesFromHash0(
+ GridCacheContext cctx,
+ SchemaIndexCacheVisitorClosure clo,
+ GridFutureAdapter<Void> rebuildIdxFut
+ ) {
+ if (!firstRbld) {
+ try {
+ U.await(INSTANCE.rebuildLatch);
+ }
+ catch (IgniteInterruptedCheckedException e) {
+ throw new IgniteException(e);
+ }
+ }
else
firstRbld = false;
- super.rebuildIndexesFromHash0(cctx, clo);
+ super.rebuildIndexesFromHash0(cctx, clo, rebuildIdxFut);
}
}
}