You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by as...@apache.org on 2020/02/21 08:43:35 UTC

[ignite] branch master updated: IGNITE-12684 Use dedicated pool for indexes rebuilding. - Fixes #7432.

This is an automated email from the ASF dual-hosted git repository.

ascherbakov pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/master by this push:
     new f52350c  IGNITE-12684 Use dedicated pool for indexes rebuilding. - Fixes #7432.
f52350c is described below

commit f52350c552f957e4c867bab9431b45ea1f16dc62
Author: ktkalenko <kt...@gridgain.com>
AuthorDate: Fri Feb 21 11:42:55 2020 +0300

    IGNITE-12684 Use dedicated pool for indexes rebuilding. - Fixes #7432.
    
    Signed-off-by: Aleksei Scherbakov <as...@apache.org>
---
 .../org/apache/ignite/IgniteSystemProperties.java  |   9 -
 .../ignite/configuration/IgniteConfiguration.java  |  36 ++-
 .../apache/ignite/internal/GridKernalContext.java  |   7 +
 .../ignite/internal/GridKernalContextImpl.java     |  12 +
 .../org/apache/ignite/internal/IgniteKernal.java   |   3 +
 .../org/apache/ignite/internal/IgnitionEx.java     |  23 ++
 .../GridCacheDatabaseSharedManager.java            |  99 +++---
 .../wal/reader/StandaloneGridKernalContext.java    |   5 +
 .../processors/query/GridQueryIndexing.java        |   2 +-
 .../processors/query/GridQueryProcessor.java       |  44 ++-
 .../schema/SchemaIndexCachePartitionWorker.java    | 270 ++++++++++++++++
 .../query/schema/SchemaIndexCacheVisitor.java      |   5 +-
 .../query/schema/SchemaIndexCacheVisitorImpl.java  | 340 ++++-----------------
 .../junits/GridTestKernalContext.java              |   1 +
 .../processors/query/h2/IgniteH2Indexing.java      |  73 ++---
 .../CacheGroupMetricsWithIndexBuildFailTest.java   |  19 +-
 .../cache/CacheGroupMetricsWithIndexTest.java      |   9 +-
 .../cache/index/AbstractIndexingCommonTest.java    |  54 +++-
 .../query/h2/GridIndexRebuildSelfTest.java         |  22 +-
 19 files changed, 611 insertions(+), 422 deletions(-)

diff --git a/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java b/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
index 22b998e..d7596d1 100644
--- a/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
+++ b/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
@@ -1247,15 +1247,6 @@ public final class IgniteSystemProperties {
     public static final String IGNITE_LOG_CLASSPATH_CONTENT_ON_STARTUP = "IGNITE_LOG_CLASSPATH_CONTENT_ON_STARTUP";
 
     /**
-     * Index rebuilding parallelism level. It sets a number of threads will be used for index rebuilding.
-     * Zero value means default should be used.
-     * Default value is calculated as <code>CPU count / 4</code> with upper limit of <code>4</code>.
-     * <p>
-     * Note: Number of threads is bounded within the range from <code>1</code> up to <code>CPU count</code>.
-     */
-    public static final String INDEX_REBUILDING_PARALLELISM = "INDEX_REBUILDING_PARALLELISM";
-
-    /**
      * Threshold timeout for long transactions, if transaction exceeds it, it will be dumped in log with
      * information about how much time did it spent in system time (time while aquiring locks, preparing,
      * commiting, etc) and user time (time when client node runs some code while holding transaction and not
diff --git a/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java b/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java
index a086e04..2ba4e29 100644
--- a/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java
+++ b/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java
@@ -86,6 +86,8 @@ import org.apache.ignite.spi.systemview.SystemViewExporterSpi;
 import org.apache.ignite.ssl.SslContextFactory;
 import org.jetbrains.annotations.Nullable;
 
+import static java.lang.Math.max;
+import static java.lang.Math.min;
 import static org.apache.ignite.plugin.segmentation.SegmentationPolicy.STOP;
 
 /**
@@ -154,7 +156,7 @@ public class IgniteConfiguration {
     public static final int AVAILABLE_PROC_CNT = Runtime.getRuntime().availableProcessors();
 
     /** Default core size of public thread pool. */
-    public static final int DFLT_PUBLIC_THREAD_CNT = Math.max(8, AVAILABLE_PROC_CNT);
+    public static final int DFLT_PUBLIC_THREAD_CNT = max(8, AVAILABLE_PROC_CNT);
 
     /** Default size of data streamer thread pool. */
     public static final int DFLT_DATA_STREAMER_POOL_SIZE = DFLT_PUBLIC_THREAD_CNT;
@@ -180,6 +182,9 @@ public class IgniteConfiguration {
     /** Default size of query thread pool. */
     public static final int DFLT_QUERY_THREAD_POOL_SIZE = DFLT_PUBLIC_THREAD_CNT;
 
+    /** Default size of index create/rebuild thread pool. */
+    public static final int DFLT_BUILD_IDX_THREAD_POOL_SIZE = min(4, max(1, AVAILABLE_PROC_CNT / 4));
+
     /** Default Ignite thread keep alive time. */
     public static final long DFLT_THREAD_KEEP_ALIVE_TIME = 60_000L;
 
@@ -302,6 +307,9 @@ public class IgniteConfiguration {
     /** Query pool size. */
     private int qryPoolSize = DFLT_QUERY_THREAD_POOL_SIZE;
 
+    /** Index create/rebuild pool size. */
+    private int buildIdxPoolSize = DFLT_BUILD_IDX_THREAD_POOL_SIZE;
+
     /** SQL query history size. */
     private int sqlQryHistSize = DFLT_SQL_QUERY_HISTORY_SIZE;
 
@@ -693,6 +701,7 @@ public class IgniteConfiguration {
         pluginProvs = cfg.getPluginProviders();
         pubPoolSize = cfg.getPublicThreadPoolSize();
         qryPoolSize = cfg.getQueryThreadPoolSize();
+        buildIdxPoolSize = cfg.getBuildIndexThreadPoolSize();
         rebalanceThreadPoolSize = cfg.getRebalanceThreadPoolSize();
         rebalanceTimeout = cfg.getRebalanceTimeout();
         rebalanceBatchesPrefetchCnt = cfg.getRebalanceBatchesPrefetchCount();
@@ -1082,6 +1091,31 @@ public class IgniteConfiguration {
     }
 
     /**
+     * Size of thread pool for create/rebuild index.
+     * <p>
+     * If not provided, executor service will have size
+     * {@link #DFLT_BUILD_IDX_THREAD_POOL_SIZE}.
+     *
+     * @return Thread pool size for create/rebuild index.
+     */
+    public int getBuildIndexThreadPoolSize() {
+        return buildIdxPoolSize;
+    }
+
+    /**
+     * Sets index create/rebuild thread pool size to use within grid.
+     *
+     * @param poolSize Thread pool size to use within grid.
+     * @return {@code this} for chaining.
+     * @see IgniteConfiguration#getBuildIndexThreadPoolSize()
+     */
+    public IgniteConfiguration setBuildIndexThreadPoolSize(int poolSize) {
+        buildIdxPoolSize = poolSize;
+
+        return this;
+    }
+
+    /**
      * Number of SQL query history elements to keep in memory. If not provided, then default value {@link
      * #DFLT_SQL_QUERY_HISTORY_SIZE} is used. If provided value is less or equals 0, then gathering SQL query history
      * will be switched off.
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java
index 5be2327..af9f777 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java
@@ -776,4 +776,11 @@ public interface GridKernalContext extends Iterable<GridComponent> {
      * @return Local continuous tasks processor.
      */
     public DurableBackgroundTasksProcessor durableBackgroundTasksProcessor();
+
+    /**
+     * Return Thread pool for create/rebuild indexes.
+     *
+     * @return Thread pool for create/rebuild indexes.
+     */
+    public ExecutorService buildIndexExecutorService();
 }
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java
index 4bd3aa9..7196dea 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java
@@ -383,6 +383,10 @@ public class GridKernalContextImpl implements GridKernalContext, Externalizable
     @GridToStringExclude
     protected ExecutorService idxExecSvc;
 
+    /** Thread pool for create/rebuild indexes. */
+    @GridToStringExclude
+    private ExecutorService buildIdxExecSvc;
+
     /** */
     @GridToStringExclude
     protected IgniteStripedThreadPoolExecutor callbackExecSvc;
@@ -490,6 +494,7 @@ public class GridKernalContextImpl implements GridKernalContext, Externalizable
      * @param restExecSvc REST executor service.
      * @param affExecSvc Affinity executor service.
      * @param idxExecSvc Indexing executor service.
+     * @param buildIdxExecSvc Create/rebuild indexes executor service.
      * @param callbackExecSvc Callback executor service.
      * @param qryExecSvc Query executor service.
      * @param schemaExecSvc Schema executor service.
@@ -519,6 +524,7 @@ public class GridKernalContextImpl implements GridKernalContext, Externalizable
         ExecutorService restExecSvc,
         ExecutorService affExecSvc,
         @Nullable ExecutorService idxExecSvc,
+        @Nullable ExecutorService buildIdxExecSvc,
         IgniteStripedThreadPoolExecutor callbackExecSvc,
         ExecutorService qryExecSvc,
         ExecutorService schemaExecSvc,
@@ -550,6 +556,7 @@ public class GridKernalContextImpl implements GridKernalContext, Externalizable
         this.restExecSvc = restExecSvc;
         this.affExecSvc = affExecSvc;
         this.idxExecSvc = idxExecSvc;
+        this.buildIdxExecSvc = buildIdxExecSvc;
         this.callbackExecSvc = callbackExecSvc;
         this.qryExecSvc = qryExecSvc;
         this.schemaExecSvc = schemaExecSvc;
@@ -1317,4 +1324,9 @@ public class GridKernalContextImpl implements GridKernalContext, Externalizable
     @Override public DurableBackgroundTasksProcessor durableBackgroundTasksProcessor() {
         return durableBackgroundTasksProcessor;
     }
+
+    /** {@inheritDoc} */
+    @Override public ExecutorService buildIndexExecutorService() {
+        return buildIdxExecSvc;
+    }
 }
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
index 9892574..96c04b2 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
@@ -968,6 +968,7 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable {
      * @param restExecSvc Reset executor service.
      * @param affExecSvc Affinity executor service.
      * @param idxExecSvc Indexing executor service.
+     * @param buildIdxExecSvc Create/rebuild indexes executor service.
      * @param callbackExecSvc Callback executor service.
      * @param qryExecSvc Query executor service.
      * @param schemaExecSvc Schema executor service.
@@ -993,6 +994,7 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable {
         ExecutorService restExecSvc,
         ExecutorService affExecSvc,
         @Nullable ExecutorService idxExecSvc,
+        @Nullable ExecutorService buildIdxExecSvc,
         IgniteStripedThreadPoolExecutor callbackExecSvc,
         ExecutorService qryExecSvc,
         ExecutorService schemaExecSvc,
@@ -1119,6 +1121,7 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable {
                 restExecSvc,
                 affExecSvc,
                 idxExecSvc,
+                buildIdxExecSvc,
                 callbackExecSvc,
                 qryExecSvc,
                 schemaExecSvc,
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java b/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java
index 42194c5..f9f031f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java
@@ -1565,6 +1565,9 @@ public class IgnitionEx {
         /** Indexing pool. */
         private ThreadPoolExecutor idxExecSvc;
 
+        /** Thread pool for create/rebuild indexes. */
+        private ThreadPoolExecutor buildIdxExecSvc;
+
         /** Continuous query executor service. */
         private IgniteStripedThreadPoolExecutor callbackExecSvc;
 
@@ -1950,6 +1953,21 @@ public class IgnitionEx {
                     GridIoPolicy.IDX_POOL,
                     oomeHnd
                 );
+
+                int buildIdxThreadPoolSize = cfg.getBuildIndexThreadPoolSize();
+
+                validateThreadPoolSize(buildIdxThreadPoolSize, "build-idx");
+
+                buildIdxExecSvc = new IgniteThreadPoolExecutor(
+                    "build-idx-runner",
+                    cfg.getIgniteInstanceName(),
+                    0,
+                    buildIdxThreadPoolSize,
+                    0,
+                    new LinkedBlockingQueue<>(),
+                    GridIoPolicy.UNDEFINED,
+                    oomeHnd
+                );
             }
 
             validateThreadPoolSize(cfg.getQueryThreadPoolSize(), "query");
@@ -2047,6 +2065,7 @@ public class IgnitionEx {
                     restExecSvc,
                     affExecSvc,
                     idxExecSvc,
+                    buildIdxExecSvc,
                     callbackExecSvc,
                     qryExecSvc,
                     schemaExecSvc,
@@ -2756,6 +2775,10 @@ public class IgnitionEx {
 
             idxExecSvc = null;
 
+            U.shutdownNow(getClass(), buildIdxExecSvc, log);
+
+            buildIdxExecSvc = null;
+
             U.shutdownNow(getClass(), callbackExecSvc, log);
 
             callbackExecSvc = null;
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
index b11bb62..0c5c378 100755
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
@@ -76,7 +76,6 @@ import org.apache.ignite.IgniteInterruptedException;
 import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.IgniteSystemProperties;
 import org.apache.ignite.cluster.ClusterNode;
-import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.configuration.CheckpointWriteOrder;
 import org.apache.ignite.configuration.DataPageEvictionMode;
 import org.apache.ignite.configuration.DataRegionConfiguration;
@@ -167,7 +166,6 @@ import org.apache.ignite.internal.util.future.GridFinishedFuture;
 import org.apache.ignite.internal.util.future.GridFutureAdapter;
 import org.apache.ignite.internal.util.lang.GridInClosure3X;
 import org.apache.ignite.internal.util.tostring.GridToStringInclude;
-import org.apache.ignite.internal.util.typedef.CI1;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.T2;
 import org.apache.ignite.internal.util.typedef.X;
@@ -192,6 +190,8 @@ import org.jetbrains.annotations.Nullable;
 import org.jsr166.ConcurrentLinkedHashMap;
 
 import static java.nio.file.StandardOpenOption.READ;
+import static java.util.Objects.isNull;
+import static java.util.Objects.nonNull;
 import static org.apache.ignite.IgniteSystemProperties.IGNITE_CHECKPOINT_READ_LOCK_TIMEOUT;
 import static org.apache.ignite.IgniteSystemProperties.IGNITE_JVM_PAUSE_DETECTOR_THRESHOLD;
 import static org.apache.ignite.IgniteSystemProperties.IGNITE_PDS_WAL_REBALANCE_THRESHOLD;
@@ -1489,68 +1489,79 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
     }
 
     /** {@inheritDoc} */
-    @Override public void rebuildIndexesIfNeeded(GridDhtPartitionsExchangeFuture fut) {
+    @Override public void rebuildIndexesIfNeeded(GridDhtPartitionsExchangeFuture exchangeFut) {
         GridQueryProcessor qryProc = cctx.kernalContext().query();
 
-        if (qryProc.moduleEnabled()) {
-            GridCountDownCallback rebuildIndexesCompleteCntr = new GridCountDownCallback(
-                cctx.cacheContexts().size(),
-                () -> log().info("Indexes rebuilding completed for all caches."),
-                1  //need at least 1 index rebuilded to print message about rebuilding completion
-            );
+        if (!qryProc.moduleEnabled())
+            return;
 
-            for (final GridCacheContext cacheCtx : (Collection<GridCacheContext>)cctx.cacheContexts()) {
-                if (cacheCtx.startTopologyVersion().equals(fut.initialVersion())) {
-                    final int cacheId = cacheCtx.cacheId();
-                    final GridFutureAdapter<Void> usrFut = idxRebuildFuts.get(cacheId);
+        Collection<GridCacheContext> cacheContexts = cctx.cacheContexts();
 
-                    IgniteInternalFuture<?> rebuildFut = qryProc.rebuildIndexesFromHash(cacheCtx);
+        GridCountDownCallback rebuildIndexesCompleteCntr = new GridCountDownCallback(
+            cacheContexts.size(),
+            () -> {
+                if (log.isInfoEnabled())
+                    log.info("Indexes rebuilding completed for all caches.");
+            },
+            1  //need at least 1 index rebuilded to print message about rebuilding completion
+        );
 
-                    if (rebuildFut != null) {
-                        log().info("Started indexes rebuilding for cache [name=" + cacheCtx.name()
-                            + ", grpName=" + cacheCtx.group().name() + ']');
+        for (GridCacheContext cacheCtx : cacheContexts) {
+            if (!cacheCtx.startTopologyVersion().equals(exchangeFut.initialVersion()))
+                continue;
 
-                        assert usrFut != null : "Missing user future for cache: " + cacheCtx.name();
+            int cacheId = cacheCtx.cacheId();
+            GridFutureAdapter<Void> usrFut = idxRebuildFuts.get(cacheId);
 
-                        rebuildFut.listen(new CI1<IgniteInternalFuture>() {
-                            @Override public void apply(IgniteInternalFuture fut) {
-                                idxRebuildFuts.remove(cacheId, usrFut);
+            IgniteInternalFuture<?> rebuildFut = qryProc.rebuildIndexesFromHash(cacheCtx);
 
-                                Throwable err = fut.error();
+            if (nonNull(rebuildFut)) {
+                if (log.isInfoEnabled())
+                    log.info("Started indexes rebuilding for cache [" + cacheInfo(cacheCtx) + ']');
 
-                                usrFut.onDone(err);
+                assert nonNull(usrFut) : "Missing user future for cache: " + cacheCtx.name();
 
-                                CacheConfiguration ccfg = cacheCtx.config();
+                rebuildFut.listen(fut -> {
+                    idxRebuildFuts.remove(cacheId, usrFut);
 
-                                if (ccfg != null) {
-                                    if (err == null)
-                                        log().info("Finished indexes rebuilding for cache [name=" + ccfg.getName()
-                                            + ", grpName=" + ccfg.getGroupName() + ']');
-                                    else {
-                                        if (!(err instanceof NodeStoppingException))
-                                            log().error("Failed to rebuild indexes for cache  [name=" + ccfg.getName()
-                                                + ", grpName=" + ccfg.getGroupName() + ']', err);
-                                    }
-                                }
+                    Throwable err = fut.error();
 
-                                rebuildIndexesCompleteCntr.countDown(true);
-                            }
-                        });
+                    usrFut.onDone(err);
+
+                    if (isNull(err)) {
+                        if (log.isInfoEnabled())
+                            log.info("Finished indexes rebuilding for cache [" + cacheInfo(cacheCtx) + ']');
                     }
                     else {
-                        if (usrFut != null) {
-                            idxRebuildFuts.remove(cacheId, usrFut);
+                        if (!(err instanceof NodeStoppingException))
+                            log.error("Failed to rebuild indexes for cache [" + cacheInfo(cacheCtx) + ']', err);
+                    }
 
-                            usrFut.onDone();
+                    rebuildIndexesCompleteCntr.countDown(true);
+                });
+            }
+            else if (nonNull(usrFut)) {
+                idxRebuildFuts.remove(cacheId, usrFut);
 
-                            rebuildIndexesCompleteCntr.countDown(false);
-                        }
-                    }
-                }
+                usrFut.onDone();
+
+                rebuildIndexesCompleteCntr.countDown(false);
             }
         }
     }
 
+    /**
+     * Return short information about cache.
+     *
+     * @param cacheCtx Cache context.
+     * @return Short cache info.
+     */
+    private String cacheInfo(GridCacheContext cacheCtx) {
+        assert nonNull(cacheCtx);
+
+        return "name=" + cacheCtx.name() + ", grpName=" + cacheCtx.group().name();
+    }
+
     /** {@inheritDoc} */
     @Nullable @Override public IgniteInternalFuture indexRebuildFuture(int cacheId) {
         return idxRebuildFuts.get(cacheId);
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneGridKernalContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneGridKernalContext.java
index e465545..88203c5 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneGridKernalContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneGridKernalContext.java
@@ -742,4 +742,9 @@ public class StandaloneGridKernalContext implements GridKernalContext {
     @Override public DurableBackgroundTasksProcessor durableBackgroundTasksProcessor() {
         return null;
     }
+
+    /** {@inheritDoc} */
+    @Override public ExecutorService buildIndexExecutorService() {
+        return null;
+    }
 }
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java
index 4ca363e..8e11099 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java
@@ -332,7 +332,7 @@ public interface GridQueryIndexing {
      * @param cctx Cache context.
      * @return Future completed when index rebuild finished.
      */
-    public IgniteInternalFuture<?> rebuildIndexesFromHash(GridCacheContext cctx);
+    IgniteInternalFuture<?> rebuildIndexesFromHash(GridCacheContext cctx);
 
     /**
      * Mark as rebuild needed for the given cache.
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
index 5dfffee..01cc322 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
@@ -81,6 +81,7 @@ import org.apache.ignite.internal.processors.cacheobject.IgniteCacheObjectProces
 import org.apache.ignite.internal.processors.query.property.QueryBinaryProperty;
 import org.apache.ignite.internal.processors.query.schema.SchemaIndexCacheFilter;
 import org.apache.ignite.internal.processors.query.schema.SchemaIndexCacheVisitor;
+import org.apache.ignite.internal.processors.query.schema.SchemaIndexCacheVisitorClosure;
 import org.apache.ignite.internal.processors.query.schema.SchemaIndexCacheVisitorImpl;
 import org.apache.ignite.internal.processors.query.schema.SchemaIndexOperationCancellationToken;
 import org.apache.ignite.internal.processors.query.schema.SchemaOperationClientFuture;
@@ -100,6 +101,7 @@ import org.apache.ignite.internal.processors.timeout.GridTimeoutProcessor;
 import org.apache.ignite.internal.util.GridBoundedConcurrentLinkedHashSet;
 import org.apache.ignite.internal.util.GridSpinBusyLock;
 import org.apache.ignite.internal.util.future.GridFinishedFuture;
+import org.apache.ignite.internal.util.future.GridFutureAdapter;
 import org.apache.ignite.internal.util.lang.GridCloseableIterator;
 import org.apache.ignite.internal.util.lang.GridClosureException;
 import org.apache.ignite.internal.util.lang.IgniteOutClosureX;
@@ -1570,7 +1572,7 @@ public class GridQueryProcessor extends GridProcessorAdapter {
 
         try {
             if (op instanceof SchemaIndexCreateOperation) {
-                final SchemaIndexCreateOperation op0 = (SchemaIndexCreateOperation)op;
+                SchemaIndexCreateOperation op0 = (SchemaIndexCreateOperation)op;
 
                 QueryIndexDescriptorImpl idxDesc = QueryUtils.createIndexDescriptor(type, op0.index());
 
@@ -1579,11 +1581,40 @@ public class GridQueryProcessor extends GridProcessorAdapter {
                 if (cacheInfo.isCacheContextInited()) {
                     GridCacheContext cctx = cacheInfo.cacheContext();
 
-                    SchemaIndexCacheFilter filter = new TableCacheFilter(cctx, op0.tableName());
+                    int buildIdxPoolSize = ctx.config().getBuildIndexThreadPoolSize();
+                    int parallel = op0.parallel();
 
-                    cctx.group().metrics().addIndexBuildCountPartitionsLeft(cctx.topology().localPartitions().size());
+                    if (parallel > buildIdxPoolSize) {
+                        String idxName = op0.indexName();
 
-                    visitor = new SchemaIndexCacheVisitorImpl(cctx, filter, cancelTok, op0.parallel());
+                        log.warning("Provided parallelism " + parallel + " for creation of index " + idxName +
+                            " is greater than the number of index building threads. Will use " + buildIdxPoolSize +
+                            " threads to build index. Increase by IgniteConfiguration.setBuildIndexThreadPoolSize" +
+                            " and restart the node if you want to use more threads. [tableName=" + op0.tableName() +
+                            ", indexName=" + idxName + ", requestedParallelism=" + parallel + ", buildIndexPoolSize=" +
+                            buildIdxPoolSize + "]");
+                    }
+
+                    GridFutureAdapter<Void> createIdxFut = new GridFutureAdapter<>();
+
+                    visitor = new SchemaIndexCacheVisitorImpl(
+                        cacheInfo.cacheContext(),
+                        new TableCacheFilter(cctx, op0.tableName()),
+                        cancelTok,
+                        createIdxFut
+                    ) {
+                        /** {@inheritDoc} */
+                        @Override public void visit(SchemaIndexCacheVisitorClosure clo) {
+                            super.visit(clo);
+
+                            try {
+                                buildIdxFut.get();
+                            }
+                            catch (Exception e) {
+                                throw new IgniteException(e);
+                            }
+                        }
+                    };
                 }
                 else
                     //For not started caches we shouldn't add any data to index.
@@ -1974,6 +2005,8 @@ public class GridQueryProcessor extends GridProcessorAdapter {
      * @return Future that will be completed when rebuilding is finished.
      */
     public IgniteInternalFuture<?> rebuildIndexesFromHash(GridCacheContext cctx) {
+        assert nonNull(cctx);
+
         // Indexing module is disabled, nothing to rebuild.
         if (rebuildIsMeaningless(cctx))
             return null;
@@ -1992,9 +2025,10 @@ public class GridQueryProcessor extends GridProcessorAdapter {
         if (empty)
             return null;
 
-        if (!busyLock.enterBusy())
+        if (!busyLock.enterBusy()) {
             return new GridFinishedFuture<>(new NodeStoppingException("Failed to rebuild indexes from hash " +
                 "(grid is stopping)."));
+        }
 
         try {
             return idx.rebuildIndexesFromHash(cctx);
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/SchemaIndexCachePartitionWorker.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/SchemaIndexCachePartitionWorker.java
new file mode 100644
index 0000000..c4657d0
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/SchemaIndexCachePartitionWorker.java
@@ -0,0 +1,270 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.schema;
+
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.internal.IgniteInterruptedCheckedException;
+import org.apache.ignite.internal.processors.cache.GridCacheContext;
+import org.apache.ignite.internal.processors.cache.GridCacheEntryEx;
+import org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedException;
+import org.apache.ignite.internal.processors.cache.KeyCacheObject;
+import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtInvalidPartitionException;
+import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtLocalPartition;
+import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState;
+import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow;
+import org.apache.ignite.internal.util.future.GridFutureAdapter;
+import org.apache.ignite.internal.util.lang.GridCursor;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.internal.util.worker.GridWorker;
+import org.jetbrains.annotations.Nullable;
+
+import static java.util.Objects.nonNull;
+import static org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState.EVICTED;
+import static org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState.MOVING;
+import static org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState.OWNING;
+import static org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState.RENTING;
+import static org.apache.ignite.internal.processors.cache.persistence.CacheDataRowAdapter.RowData.KEY_ONLY;
+
+/**
+ * Worker for creating/rebuilding indexes for cache per partition.
+ */
+public class SchemaIndexCachePartitionWorker extends GridWorker {
+    /** Count of rows, being processed within a single checkpoint lock. */
+    private static final int BATCH_SIZE = 1000;
+
+    /** Cache context. */
+    private final GridCacheContext cctx;
+
+    /** Stop flag between all workers for one cache. */
+    private final AtomicBoolean stop;
+
+    /** Cancellation token between all workers for all caches. */
+    private final SchemaIndexOperationCancellationToken cancel;
+
+    /** Index closure. */
+    private final SchemaIndexCacheVisitorClosure clo;
+
+    /** Partition. */
+    private final GridDhtLocalPartition locPart;
+
+    /** Worker future. */
+    private final GridFutureAdapter<Void> fut;
+
+    /** Row filter. */
+    private final SchemaIndexCacheFilter rowFilter;
+
+    /** Count of partitions to be processed. */
+    private final AtomicInteger partsCnt;
+
+    /**
+     * Constructor.
+     *
+     * @param cctx Cache context.
+     * @param locPart Partition.
+     * @param stop Stop flag between all workers for one cache.
+     * @param cancel Cancellation token between all workers for all caches.
+     * @param clo Index closure.
+     * @param fut Worker future.
+     * @param rowFilter Row filter.
+     * @param partsCnt Count of partitions to be processed.
+     */
+    public SchemaIndexCachePartitionWorker(
+        GridCacheContext cctx,
+        GridDhtLocalPartition locPart,
+        AtomicBoolean stop,
+        SchemaIndexOperationCancellationToken cancel,
+        SchemaIndexCacheVisitorClosure clo,
+        GridFutureAdapter<Void> fut,
+        @Nullable SchemaIndexCacheFilter rowFilter,
+        AtomicInteger partsCnt
+    ) {
+        super(
+            cctx.igniteInstanceName(),
+            "parallel-idx-worker-" + cctx.cache().name() + "-part-" + locPart.id(),
+            cctx.logger(SchemaIndexCachePartitionWorker.class)
+        );
+
+        this.cctx = cctx;
+        this.locPart = locPart;
+        this.cancel = cancel;
+
+        assert nonNull(stop);
+        assert nonNull(clo);
+        assert nonNull(fut);
+        assert nonNull(partsCnt);
+
+        this.stop = stop;
+        this.clo = clo;
+        this.fut = fut;
+        this.partsCnt = partsCnt;
+
+        this.rowFilter = rowFilter;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void body() throws InterruptedException, IgniteInterruptedCheckedException {
+        Throwable err = null;
+
+        try {
+            processPartition();
+        }
+        catch (Throwable e) {
+            err = Error.class.isInstance(e) ? new IgniteException(e) : e;
+
+            U.error(log, "Error during create/rebuild index for partition: " + locPart.id(), e);
+
+            stop.set(true);
+
+            int cnt = partsCnt.getAndSet(0);
+
+            if (cnt > 0)
+                cctx.group().metrics().addIndexBuildCountPartitionsLeft(-cnt);
+        }
+        finally {
+            fut.onDone(err);
+        }
+    }
+
+    /**
+     * Process partition.
+     *
+     * @throws IgniteCheckedException If failed.
+     */
+    private void processPartition() throws IgniteCheckedException {
+        if (stop.get() || stopNode())
+            return;
+
+        checkCancelled();
+
+        boolean reserved = false;
+
+        GridDhtPartitionState partState = locPart.state();
+        if (partState != EVICTED)
+            reserved = (partState == OWNING || partState == RENTING || partState == MOVING) && locPart.reserve();
+
+        if (!reserved)
+            return;
+
+        try {
+            GridCursor<? extends CacheDataRow> cursor = locPart.dataStore().cursor(
+                cctx.cacheId(),
+                null,
+                null,
+                KEY_ONLY
+            );
+
+            boolean locked = false;
+
+            try {
+                int cntr = 0;
+
+                while (cursor.next() && !stop.get() && !stopNode()) {
+                    KeyCacheObject key = cursor.get().key();
+
+                    if (!locked) {
+                        cctx.shared().database().checkpointReadLock();
+
+                        locked = true;
+                    }
+
+                    processKey(key);
+
+                    if (++cntr % BATCH_SIZE == 0) {
+                        cctx.shared().database().checkpointReadUnlock();
+
+                        locked = false;
+                    }
+
+                    if (locPart.state() == RENTING)
+                        break;
+                }
+            }
+            finally {
+                if (locked)
+                    cctx.shared().database().checkpointReadUnlock();
+            }
+        }
+        finally {
+            locPart.release();
+
+            if (partsCnt.getAndUpdate(v -> v > 0 ? v - 1 : 0) > 0)
+                cctx.group().metrics().decrementIndexBuildCountPartitionsLeft();
+        }
+    }
+
+    /**
+     * Process single key.
+     *
+     * @param key Key.
+     * @throws IgniteCheckedException If failed.
+     */
+    private void processKey(KeyCacheObject key) throws IgniteCheckedException {
+        assert nonNull(key);
+
+        while (true) {
+            try {
+                checkCancelled();
+
+                GridCacheEntryEx entry = cctx.cache().entryEx(key);
+
+                try {
+                    entry.updateIndex(rowFilter, clo);
+                }
+                finally {
+                    entry.touch();
+                }
+
+                break;
+            }
+            catch (GridDhtInvalidPartitionException ignore) {
+                break;
+            }
+            catch (GridCacheEntryRemovedException ignored) {
+                // No-op.
+            }
+        }
+    }
+
+    /**
+     * Check if visit process is not cancelled.
+     *
+     * @throws IgniteCheckedException If cancelled.
+     */
+    private void checkCancelled() throws IgniteCheckedException {
+        if (nonNull(cancel) && cancel.isCancelled())
+            throw new IgniteCheckedException("Index creation was cancelled.");
+    }
+
+    /**
+     * Returns node in the process of stopping or not.
+     *
+     * @return {@code True} if node is in the process of stopping.
+     */
+    private boolean stopNode() {
+        return cctx.kernalContext().isStopping();
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(SchemaIndexCachePartitionWorker.class, this);
+    }
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/SchemaIndexCacheVisitor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/SchemaIndexCacheVisitor.java
index 3321e66..3cdf1d1 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/SchemaIndexCacheVisitor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/SchemaIndexCacheVisitor.java
@@ -17,8 +17,6 @@
 
 package org.apache.ignite.internal.processors.query.schema;
 
-import org.apache.ignite.IgniteCheckedException;
-
 /**
  * Closure that internally applies given {@link SchemaIndexCacheVisitorClosure} to some set of entries.
  */
@@ -27,7 +25,6 @@ public interface SchemaIndexCacheVisitor {
      * Visit cache entries and pass them to closure.
      *
      * @param clo Closure.
-     * @throws IgniteCheckedException If failed.
      */
-    public void visit(SchemaIndexCacheVisitorClosure clo) throws IgniteCheckedException;
+    void visit(SchemaIndexCacheVisitorClosure clo);
 }
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/SchemaIndexCacheVisitorImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/SchemaIndexCacheVisitorImpl.java
index 5a54b81..337ae15 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/SchemaIndexCacheVisitorImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/SchemaIndexCacheVisitorImpl.java
@@ -18,44 +18,25 @@
 package org.apache.ignite.internal.processors.query.schema;
 
 import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
-import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteLogger;
-import org.apache.ignite.IgniteSystemProperties;
-import org.apache.ignite.internal.IgniteInterruptedCheckedException;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
-import org.apache.ignite.internal.processors.cache.GridCacheEntryEx;
-import org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedException;
-import org.apache.ignite.internal.processors.cache.KeyCacheObject;
-import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtInvalidPartitionException;
 import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtLocalPartition;
 import org.apache.ignite.internal.processors.cache.distributed.near.GridNearCacheAdapter;
-import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow;
-import org.apache.ignite.internal.processors.cache.persistence.CacheDataRowAdapter;
 import org.apache.ignite.internal.util.future.GridCompoundFuture;
 import org.apache.ignite.internal.util.future.GridFutureAdapter;
-import org.apache.ignite.internal.util.lang.GridCursor;
 import org.apache.ignite.internal.util.typedef.internal.S;
-import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.internal.util.worker.GridWorker;
-import org.apache.ignite.thread.IgniteThread;
+import org.apache.ignite.internal.util.worker.GridWorkerFuture;
+import org.jetbrains.annotations.Nullable;
 
-import static org.apache.ignite.IgniteSystemProperties.INDEX_REBUILDING_PARALLELISM;
-import static org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState.EVICTED;
-import static org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState.MOVING;
-import static org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState.OWNING;
-import static org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState.RENTING;
+import static java.util.Objects.nonNull;
 
 /**
- * Traversor operating all primary and backup partitions of given cache.
+ * Visitor who create/rebuild indexes in parallel by partition for a given cache.
  */
 public class SchemaIndexCacheVisitorImpl implements SchemaIndexCacheVisitor {
-    /** Default degree of parallelism for rebuilding indexes. */
-    private static final int DFLT_INDEX_REBUILDING_PARALLELISM;
-
-    /** Count of rows, being processed within a single checkpoint lock. */
-    private static final int BATCH_SIZE = 1000;
-
     /** Cache context. */
     private final GridCacheContext cctx;
 
@@ -65,315 +46,98 @@ public class SchemaIndexCacheVisitorImpl implements SchemaIndexCacheVisitor {
     /** Cancellation token. */
     private final SchemaIndexOperationCancellationToken cancel;
 
-    /** Parallelism. */
-    private final int parallelism;
-
-    /** Whether to stop the process. */
-    private volatile boolean stop;
-
-    /** Count of partitions to be processed. */
-    private final AtomicInteger partsCnt = new AtomicInteger();
+    /** Future for create/rebuild index. */
+    protected final GridFutureAdapter<Void> buildIdxFut;
 
     /** Logger. */
     protected IgniteLogger log;
 
-    static {
-        int parallelism = IgniteSystemProperties.getInteger(INDEX_REBUILDING_PARALLELISM, 0);
-
-        // Parallelism lvl is bounded to range of [1, CPUs count]
-        if (parallelism > 0)
-            DFLT_INDEX_REBUILDING_PARALLELISM = Math.min(parallelism, Runtime.getRuntime().availableProcessors());
-        else
-            DFLT_INDEX_REBUILDING_PARALLELISM = Math.min(4, Math.max(1, Runtime.getRuntime().availableProcessors() / 4));
-    }
-
-    /**
-     * Constructor.
-     *  @param cctx Cache context.
-     */
-    public SchemaIndexCacheVisitorImpl(GridCacheContext cctx) {
-        this(cctx, null, null, 0);
-    }
-
     /**
      * Constructor.
      *
      * @param cctx Cache context.
      * @param rowFilter Row filter.
      * @param cancel Cancellation token.
-     * @param parallelism Degree of parallelism.
+     * @param buildIdxFut Future for create/rebuild index.
      */
-    public SchemaIndexCacheVisitorImpl(GridCacheContext cctx, SchemaIndexCacheFilter rowFilter,
-        SchemaIndexOperationCancellationToken cancel, int parallelism) {
-        this.rowFilter = rowFilter;
-        this.cancel = cancel;
-
-        // Parallelism lvl is bounded to range of [1, CPUs count]
-        if (parallelism > 0)
-            this.parallelism = Math.min(Runtime.getRuntime().availableProcessors(), parallelism);
-        else
-            this.parallelism = DFLT_INDEX_REBUILDING_PARALLELISM;
+    public SchemaIndexCacheVisitorImpl(
+        GridCacheContext cctx,
+        @Nullable SchemaIndexCacheFilter rowFilter,
+        @Nullable SchemaIndexOperationCancellationToken cancel,
+        GridFutureAdapter<Void> buildIdxFut
+    ) {
+        assert nonNull(cctx);
+        assert nonNull(buildIdxFut);
 
         if (cctx.isNear())
             cctx = ((GridNearCacheAdapter)cctx.cache()).dht().context();
 
         this.cctx = cctx;
+        this.buildIdxFut = buildIdxFut;
+
+        this.cancel = cancel;
+        this.rowFilter = rowFilter;
 
         log = cctx.kernalContext().log(getClass());
     }
 
     /** {@inheritDoc} */
-    @Override public void visit(SchemaIndexCacheVisitorClosure clo) throws IgniteCheckedException {
-        assert clo != null;
-
-        List<GridDhtLocalPartition> parts = cctx.topology().localPartitions();
-
-        if (parts.isEmpty())
-            return;
-
-        partsCnt.set(parts.size());
+    @Override public void visit(SchemaIndexCacheVisitorClosure clo) {
+        assert nonNull(clo);
 
-        GridCompoundFuture<Void, Void> fut = null;
+        List<GridDhtLocalPartition> locParts = cctx.topology().localPartitions();
 
-        if (parallelism > 1) {
-            fut = new GridCompoundFuture<>();
+        if (locParts.isEmpty()) {
+            buildIdxFut.onDone();
 
-            for (int i = 1; i < parallelism; i++)
-                fut.add(processPartitionsAsync(parts, clo, i));
-
-            fut.markInitialized();
-        }
-
-        try {
-            processPartitions(parts, clo, 0);
-        }
-        catch (Throwable e) {
-            U.error(log, "Error during parallel index create/rebuild.", e);
-
-            stop = true;
-
-            resetPartitionsCount();
-
-            throw e;
-        }
-
-        if (fut != null)
-            fut.get();
-    }
-
-    /**
-     * Process partitions asynchronously.
-     *
-     * @param parts Partitions.
-     * @param clo Closure.
-     * @param remainder Remainder.
-     * @return Future.
-     */
-    private GridFutureAdapter<Void> processPartitionsAsync(List<GridDhtLocalPartition> parts,
-        SchemaIndexCacheVisitorClosure clo, int remainder) {
-        GridFutureAdapter<Void> fut = new GridFutureAdapter<>();
-
-        AsyncWorker worker = new AsyncWorker(parts, clo, remainder, fut);
-
-        new IgniteThread(worker).start();
-
-        return fut;
-    }
-
-    /**
-     * Process partitions.
-     *
-     * @param parts Partitions.
-     * @param clo Closure.
-     * @param remainder Remainder.
-     * @throws IgniteCheckedException If failed.
-     */
-    private void processPartitions(List<GridDhtLocalPartition> parts, SchemaIndexCacheVisitorClosure clo,
-        int remainder)
-        throws IgniteCheckedException {
-        for (int i = 0, size = parts.size(); i < size; i++) {
-            if (stop)
-                break;
-
-            if ((i % parallelism) == remainder)
-                processPartition(parts.get(i), clo);
-        }
-    }
-
-    /**
-     * Process partition.
-     *
-     * @param part Partition.
-     * @param clo Index closure.
-     * @throws IgniteCheckedException If failed.
-     */
-    private void processPartition(GridDhtLocalPartition part, SchemaIndexCacheVisitorClosure clo)
-        throws IgniteCheckedException {
-        checkCancelled();
-
-        boolean reserved = false;
-
-        if (part != null && part.state() != EVICTED)
-            reserved = (part.state() == OWNING || part.state() == RENTING || part.state() == MOVING) && part.reserve();
-
-        if (!reserved)
             return;
+        }
 
-        try {
-            GridCursor<? extends CacheDataRow> cursor = part.dataStore().cursor(cctx.cacheId(), null, null,
-                CacheDataRowAdapter.RowData.KEY_ONLY);
-
-            boolean locked = false;
+        cctx.group().metrics().addIndexBuildCountPartitionsLeft(locParts.size());
 
-            try {
-                int cntr = 0;
+        beforeExecute();
 
-                while (cursor.next() && !stop) {
-                    KeyCacheObject key = cursor.get().key();
+        AtomicInteger partsCnt = new AtomicInteger(locParts.size());
 
-                    if (!locked) {
-                        cctx.shared().database().checkpointReadLock();
+        AtomicBoolean stop = new AtomicBoolean();
 
-                        locked = true;
-                    }
+        GridCompoundFuture<Void, Void> buildIdxCompoundFut = new GridCompoundFuture<>();
 
-                    processKey(key, clo);
+        for (GridDhtLocalPartition locPart : locParts) {
+            GridWorkerFuture<Void> workerFut = new GridWorkerFuture<>();
 
-                    if (++cntr % BATCH_SIZE == 0) {
-                        cctx.shared().database().checkpointReadUnlock();
+            GridWorker worker = new SchemaIndexCachePartitionWorker(
+                cctx,
+                locPart,
+                stop,
+                cancel,
+                clo,
+                workerFut,
+                rowFilter,
+                partsCnt
+            );
 
-                        locked = false;
-                    }
+            workerFut.setWorker(worker);
+            buildIdxCompoundFut.add(workerFut);
 
-                    if (part.state() == RENTING)
-                        break;
-                }
-            }
-            finally {
-                if (locked)
-                    cctx.shared().database().checkpointReadUnlock();
-            }
+            cctx.kernalContext().buildIndexExecutorService().execute(worker);
         }
-        finally {
-            part.release();
 
-            if (partsCnt.getAndUpdate(v -> v > 0 ? v - 1 : 0) > 0)
-                cctx.group().metrics().decrementIndexBuildCountPartitionsLeft();
-        }
-    }
+        buildIdxCompoundFut.listen(fut -> buildIdxFut.onDone(fut.error()));
 
-    /**
-     * Process single key.
-     *
-     * @param key Key.
-     * @param clo Closure.
-     * @throws IgniteCheckedException If failed.
-     */
-    private void processKey(KeyCacheObject key, SchemaIndexCacheVisitorClosure clo) throws IgniteCheckedException {
-        while (true) {
-            try {
-                checkCancelled();
-
-                GridCacheEntryEx entry = cctx.cache().entryEx(key);
-
-                try {
-                    entry.updateIndex(rowFilter, clo);
-                }
-                finally {
-                    entry.touch();
-                }
-
-                break;
-            }
-            catch (GridDhtInvalidPartitionException ignore) {
-                break;
-            }
-            catch (GridCacheEntryRemovedException ignored) {
-                // No-op.
-            }
-        }
+        buildIdxCompoundFut.markInitialized();
     }
 
     /**
-     * Check if visit process is not cancelled.
-     *
-     * @throws IgniteCheckedException If cancelled.
+     * This method is called before creating or rebuilding indexes.
+     * Used only for test.
      */
-    private void checkCancelled() throws IgniteCheckedException {
-        if (cancel != null && cancel.isCancelled())
-            throw new IgniteCheckedException("Index creation was cancelled.");
-    }
-
-    /**
-     * Resets value of partitions count to be processed and update metrics.
-     */
-    private void resetPartitionsCount() {
-        int cnt = partsCnt.getAndSet(0);
-
-        if (cnt > 0)
-            cctx.group().metrics().addIndexBuildCountPartitionsLeft(-cnt);
+    protected void beforeExecute(){
+        //no-op
     }
 
     /** {@inheritDoc} */
     @Override public String toString() {
         return S.toString(SchemaIndexCacheVisitorImpl.class, this);
     }
-
-    /**
-     * Async worker.
-     */
-    private class AsyncWorker extends GridWorker {
-        /** Partitions. */
-        private final List<GridDhtLocalPartition> parts;
-
-        /** Closure. */
-        private final SchemaIndexCacheVisitorClosure clo;
-
-        /** Remained.. */
-        private final int remainder;
-
-        /** Future. */
-        private final GridFutureAdapter<Void> fut;
-
-        /**
-         * Constructor.
-         *
-         * @param parts Partitions.
-         * @param clo Closure.
-         * @param remainder Remainder.
-         * @param fut Future.
-         */
-        @SuppressWarnings("unchecked")
-        public AsyncWorker(List<GridDhtLocalPartition> parts, SchemaIndexCacheVisitorClosure clo, int remainder,
-            GridFutureAdapter<Void> fut) {
-            super(cctx.igniteInstanceName(), "parallel-idx-worker-" + cctx.cache().name() + "-" + remainder,
-                cctx.logger(AsyncWorker.class));
-
-            this.parts = parts;
-            this.clo = clo;
-            this.remainder = remainder;
-            this.fut = fut;
-        }
-
-        /** {@inheritDoc} */
-        @Override protected void body() throws InterruptedException, IgniteInterruptedCheckedException {
-            Throwable err = null;
-
-            try {
-                processPartitions(parts, clo, remainder);
-            }
-            catch (Throwable e) {
-                err = e;
-
-                U.error(log, "Error during parallel index create/rebuild.", e);
-
-                stop = true;
-
-                resetPartitionsCount();
-            }
-            finally {
-                fut.onDone(err);
-            }
-        }
-    }
 }
diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridTestKernalContext.java b/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridTestKernalContext.java
index fe4ae05..1e75ff1 100644
--- a/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridTestKernalContext.java
+++ b/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridTestKernalContext.java
@@ -86,6 +86,7 @@ public class GridTestKernalContext extends GridKernalContextImpl {
                 null,
                 null,
                 null,
+                null,
                 cfg.getPluginProviders() != null && cfg.getPluginProviders().length > 0 ?
                     Arrays.asList(cfg.getPluginProviders()) : U.allPluginProviders(),
                 null,
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
index 0c08d3b..7fadf1a 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
@@ -158,6 +158,7 @@ import org.apache.ignite.internal.util.GridAtomicLong;
 import org.apache.ignite.internal.util.GridEmptyCloseableIterator;
 import org.apache.ignite.internal.util.GridSpinBusyLock;
 import org.apache.ignite.internal.util.IgniteUtils;
+import org.apache.ignite.internal.util.future.GridFutureAdapter;
 import org.apache.ignite.internal.util.lang.GridCloseableIterator;
 import org.apache.ignite.internal.util.lang.GridPlainRunnable;
 import org.apache.ignite.internal.util.lang.IgniteInClosure2X;
@@ -166,8 +167,6 @@ import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.X;
 import org.apache.ignite.internal.util.typedef.internal.CU;
 import org.apache.ignite.internal.util.typedef.internal.U;
-import org.apache.ignite.internal.util.worker.GridWorker;
-import org.apache.ignite.internal.util.worker.GridWorkerFuture;
 import org.apache.ignite.lang.IgniteBiClosure;
 import org.apache.ignite.lang.IgniteBiTuple;
 import org.apache.ignite.lang.IgniteFuture;
@@ -189,7 +188,11 @@ import org.h2.util.JdbcUtils;
 import org.h2.value.DataType;
 import org.jetbrains.annotations.Nullable;
 
+import static java.lang.Math.max;
+import static java.lang.Math.min;
 import static java.util.Collections.singletonList;
+import static java.util.Objects.isNull;
+import static java.util.Objects.nonNull;
 import static org.apache.ignite.IgniteSystemProperties.IGNITE_MVCC_TX_SIZE_CACHING_THRESHOLD;
 import static org.apache.ignite.internal.processors.cache.mvcc.MvccCachingManager.TX_SIZE_THRESHOLD;
 import static org.apache.ignite.internal.processors.cache.mvcc.MvccUtils.checkActive;
@@ -615,7 +618,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
         if (tx != null) {
             int remaining = (int)tx.remainingTime();
 
-            return remaining > 0 && qryTimeout > 0 ? Math.min(remaining, qryTimeout) : Math.max(remaining, qryTimeout);
+            return remaining > 0 && qryTimeout > 0 ? min(remaining, qryTimeout) : max(remaining, qryTimeout);
         }
 
         return qryTimeout;
@@ -1938,16 +1941,20 @@ public class IgniteH2Indexing implements GridQueryIndexing {
 
     /** {@inheritDoc} */
     @Override public IgniteInternalFuture<?> rebuildIndexesFromHash(GridCacheContext cctx) {
+        assert nonNull(cctx);
+
         // No data in fresh in-memory cache.
         if (!cctx.group().persistenceEnabled())
             return null;
 
         IgnitePageStoreManager pageStore = cctx.shared().pageStore();
 
-        assert pageStore != null;
+        assert nonNull(pageStore);
 
         SchemaIndexCacheVisitorClosure clo;
 
+        String cacheName = cctx.name();
+
         if (!pageStore.hasIndexStore(cctx.groupId())) {
             // If there are no index store, rebuild all indexes.
             clo = new IndexRebuildFullClosure(cctx.queries(), cctx.mvccEnabled());
@@ -1956,10 +1963,12 @@ public class IgniteH2Indexing implements GridQueryIndexing {
             // Otherwise iterate over tables looking for missing indexes.
             IndexRebuildPartialClosure clo0 = new IndexRebuildPartialClosure();
 
-            for (H2TableDescriptor tblDesc : schemaMgr.tablesForCache(cctx.name())) {
-                assert tblDesc.table() != null;
+            for (H2TableDescriptor tblDesc : schemaMgr.tablesForCache(cacheName)) {
+                GridH2Table tbl = tblDesc.table();
+
+                assert nonNull(tbl);
 
-                tblDesc.table().collectIndexesForPartialRebuild(clo0);
+                tbl.collectIndexesForPartialRebuild(clo0);
             }
 
             if (clo0.hasIndexes())
@@ -1969,40 +1978,31 @@ public class IgniteH2Indexing implements GridQueryIndexing {
         }
 
         // Closure prepared, do rebuild.
-        final GridWorkerFuture<?> fut = new GridWorkerFuture<>();
+        markIndexRebuild(cacheName, true);
 
-        markIndexRebuild(cctx.name(), true);
+        GridFutureAdapter<Void> rebuildCacheIdxFut = new GridFutureAdapter<>();
 
-        if (cctx.group().metrics() != null)
-            cctx.group().metrics().addIndexBuildCountPartitionsLeft(cctx.topology().localPartitions().size());
+        rebuildCacheIdxFut.listen(fut -> {
+            Throwable err = fut.error();
 
-        GridWorker worker = new GridWorker(ctx.igniteInstanceName(), "index-rebuild-worker-" + cctx.name(), log) {
-            @Override protected void body() {
+            if (isNull(err)) {
                 try {
-                    rebuildIndexesFromHash0(cctx, clo);
-
-                    markIndexRebuild(cctx.name(), false);
-
-                    fut.onDone();
+                    markIndexRebuild(cacheName, false);
                 }
-                catch (Exception e) {
-                    fut.onDone(e);
-                }
-                catch (Throwable e) {
-                    U.error(log, "Failed to rebuild indexes for cache: " + cctx.name(), e);
-
-                    fut.onDone(e);
+                catch (Throwable t) {
+                    err = t;
 
-                    throw e;
+                    rebuildCacheIdxFut.onDone(t);
                 }
             }
-        };
 
-        fut.setWorker(worker);
+            if (nonNull(err))
+                U.error(log, "Failed to rebuild indexes for cache: " + cacheName, err);
+        });
 
-        ctx.getExecutorService().execute(worker);
+        rebuildIndexesFromHash0(cctx, clo, rebuildCacheIdxFut);
 
-        return fut;
+        return rebuildCacheIdxFut;
     }
 
     /**
@@ -2010,13 +2010,14 @@ public class IgniteH2Indexing implements GridQueryIndexing {
      *
      * @param cctx Cache context.
      * @param clo Closure.
-     * @throws IgniteCheckedException If failed.
+     * @param rebuildIdxFut Future for rebuild indexes.
      */
-    protected void rebuildIndexesFromHash0(GridCacheContext cctx, SchemaIndexCacheVisitorClosure clo)
-        throws IgniteCheckedException {
-        SchemaIndexCacheVisitor visitor = new SchemaIndexCacheVisitorImpl(cctx);
-
-        visitor.visit(clo);
+    protected void rebuildIndexesFromHash0(
+        GridCacheContext cctx,
+        SchemaIndexCacheVisitorClosure clo,
+        GridFutureAdapter<Void> rebuildIdxFut
+    ) {
+        new SchemaIndexCacheVisitorImpl(cctx, null, null, rebuildIdxFut).visit(clo);
     }
 
     /**
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/CacheGroupMetricsWithIndexBuildFailTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/CacheGroupMetricsWithIndexBuildFailTest.java
index 4098616..af21440 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/CacheGroupMetricsWithIndexBuildFailTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/CacheGroupMetricsWithIndexBuildFailTest.java
@@ -39,11 +39,9 @@ import org.apache.ignite.spi.indexing.IndexingQueryFilter;
 import org.apache.ignite.spi.indexing.IndexingSpi;
 import org.apache.ignite.spi.metric.LongMetric;
 import org.apache.ignite.testframework.GridTestUtils;
-import org.apache.ignite.testframework.junits.WithSystemProperty;
 import org.jetbrains.annotations.Nullable;
 import org.junit.Test;
 
-import static org.apache.ignite.IgniteSystemProperties.INDEX_REBUILDING_PARALLELISM;
 import static org.apache.ignite.internal.processors.cache.CacheGroupMetricsImpl.CACHE_GROUP_METRICS_PREFIX;
 import static org.apache.ignite.internal.processors.metric.impl.MetricUtils.metricName;
 
@@ -59,17 +57,13 @@ public class CacheGroupMetricsWithIndexBuildFailTest extends AbstractIndexingCom
 
     /** {@inheritDoc} */
     @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
-        IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
-
-        cfg.setDataStorageConfiguration(new DataStorageConfiguration()
-            .setDefaultDataRegionConfiguration(
-                new DataRegionConfiguration().setPersistenceEnabled(true).setMaxSize(10 * 1024 * 1024)
+        return super.getConfiguration(igniteInstanceName)
+            .setDataStorageConfiguration(new DataStorageConfiguration()
+                .setDefaultDataRegionConfiguration(
+                    new DataRegionConfiguration().setPersistenceEnabled(true).setMaxSize(10 * 1024 * 1024))
             )
-        );
-
-        cfg.setIndexingSpi(new TestIndexingSpi());
-
-        return cfg;
+            .setIndexingSpi(new TestIndexingSpi())
+            .setBuildIndexThreadPoolSize(4);
     }
 
     /**
@@ -100,7 +94,6 @@ public class CacheGroupMetricsWithIndexBuildFailTest extends AbstractIndexingCom
 
     /** */
     @Test
-    @WithSystemProperty(key = INDEX_REBUILDING_PARALLELISM, value = "4")
     public void testIndexRebuildCountPartitionsLeft() throws Exception {
         IgniteEx ignite0 = startGrid(0);
 
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/CacheGroupMetricsWithIndexTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/CacheGroupMetricsWithIndexTest.java
index 57f69c7..e2431bd 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/CacheGroupMetricsWithIndexTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/CacheGroupMetricsWithIndexTest.java
@@ -171,19 +171,24 @@ public class CacheGroupMetricsWithIndexTest extends CacheGroupMetricsTest {
 
         ignite.cluster().active(true);
 
+        BlockingIndexing blockingIndexing = (BlockingIndexing)ignite.context().query().getIndexing();
+
+        while (!blockingIndexing.isBlock(cacheName2) || !blockingIndexing.isBlock(cacheName3))
+            U.sleep(10);
+
         MetricRegistry grpMreg = cacheGroupMetrics(0, GROUP_NAME_2).get2();
 
         LongMetric indexBuildCountPartitionsLeft = grpMreg.findMetric("IndexBuildCountPartitionsLeft");
 
         assertEquals(parts2 + parts3, indexBuildCountPartitionsLeft.value());
 
-        ((BlockingIndexing)ignite.context().query().getIndexing()).stopBlock(cacheName2);
+        blockingIndexing.stopBlock(cacheName2);
 
         ignite.cache(cacheName2).indexReadyFuture().get(30_000);
 
         assertEquals(parts3, indexBuildCountPartitionsLeft.value());
 
-        ((BlockingIndexing)ignite.context().query().getIndexing()).stopBlock(cacheName3);
+        blockingIndexing.stopBlock(cacheName3);
 
         ignite.cache(cacheName3).indexReadyFuture().get(30_000);
 
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/AbstractIndexingCommonTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/AbstractIndexingCommonTest.java
index 581f1b8..8278ac9 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/AbstractIndexingCommonTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/AbstractIndexingCommonTest.java
@@ -20,24 +20,27 @@ package org.apache.ignite.internal.processors.cache.index;
 import java.io.File;
 import java.nio.file.Path;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CountDownLatch;
 import java.util.stream.Collectors;
-import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.internal.IgniteEx;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
 import org.apache.ignite.internal.processors.cache.IgniteInternalCache;
 import org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager;
 import org.apache.ignite.internal.processors.query.h2.IgniteH2Indexing;
 import org.apache.ignite.internal.processors.query.schema.SchemaIndexCacheVisitorClosure;
+import org.apache.ignite.internal.processors.query.schema.SchemaIndexCacheVisitorImpl;
+import org.apache.ignite.internal.util.future.GridFutureAdapter;
 import org.apache.ignite.internal.util.typedef.G;
-import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.testframework.GridTestUtils;
 import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
 import org.h2.engine.Session;
 import org.h2.util.CloseWatcher;
 
+import static org.apache.ignite.internal.util.IgniteUtils.awaitQuiet;
+
 /**
  * Base class for all indexing tests to check H2 connection management.
  */
@@ -104,18 +107,43 @@ public class AbstractIndexingCommonTest extends GridCommonAbstractTest {
      */
     public static class BlockingIndexing extends IgniteH2Indexing {
         /** */
-        private final ConcurrentHashMap<String, CountDownLatch> latches = new ConcurrentHashMap<>();
+        private final Map<String, CountDownLatch> latches = new ConcurrentHashMap<>();
 
         /** {@inheritDoc} */
-        @Override protected void rebuildIndexesFromHash0(GridCacheContext cctx, SchemaIndexCacheVisitorClosure clo)
-            throws IgniteCheckedException {
-            String cacheName = cctx.name();
-
-            latches.computeIfAbsent(cacheName, l -> new CountDownLatch(1));
-
-            U.await(latches.get(cacheName));
+        @Override protected void rebuildIndexesFromHash0(
+            GridCacheContext cctx,
+            SchemaIndexCacheVisitorClosure clo,
+            GridFutureAdapter<Void> rebuildIdxFut
+        ) {
+            CountDownLatch startThread = new CountDownLatch(1);
+
+            new Thread(() -> {
+                startThread.countDown();
+
+                new SchemaIndexCacheVisitorImpl(cctx, null, null, rebuildIdxFut) {
+                    /** {@inheritDoc} */
+                    @Override protected void beforeExecute() {
+                        String cacheName = cctx.name();
+
+                        if (log.isInfoEnabled())
+                            log.info("Before execute build idx for cache=" + cacheName);
+
+                        awaitQuiet(latches.computeIfAbsent(cacheName, l -> new CountDownLatch(1)));
+                    }
+                }.visit(clo);
+            }).start();
+
+            awaitQuiet(startThread);
+        }
 
-            super.rebuildIndexesFromHash0(cctx, clo);
+        /**
+         * Returns whether creating/rebuilding an index for cache is blocked.
+         *
+         * @return {@code True} if creating/rebuilding an index for cache is
+         *      blocked.
+         */
+        public boolean isBlock(String cacheName) {
+            return latches.containsKey(cacheName) && latches.get(cacheName).getCount() != 0;
         }
 
         /**
@@ -124,9 +152,7 @@ public class AbstractIndexingCommonTest extends GridCommonAbstractTest {
          * @param cacheName Cache name.
          */
         public void stopBlock(String cacheName) {
-            CountDownLatch latch = latches.computeIfAbsent(cacheName, l -> new CountDownLatch(1));
-
-            latch.countDown();
+            latches.computeIfAbsent(cacheName, l -> new CountDownLatch(1)).countDown();
         }
     }
 }
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/GridIndexRebuildSelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/GridIndexRebuildSelfTest.java
index b4bb9e9..91db4cf 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/GridIndexRebuildSelfTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/GridIndexRebuildSelfTest.java
@@ -22,8 +22,10 @@ import java.util.concurrent.CountDownLatch;
 import org.apache.ignite.Ignite;
 import org.apache.ignite.IgniteCache;
 import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteException;
 import org.apache.ignite.configuration.IgniteConfiguration;
 import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.IgniteInterruptedCheckedException;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
 import org.apache.ignite.internal.processors.cache.IgniteCacheOffheapManager;
 import org.apache.ignite.internal.processors.cache.IgniteInternalCache;
@@ -32,6 +34,7 @@ import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow;
 import org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager;
 import org.apache.ignite.internal.processors.query.GridQueryProcessor;
 import org.apache.ignite.internal.processors.query.schema.SchemaIndexCacheVisitorClosure;
+import org.apache.ignite.internal.util.future.GridFutureAdapter;
 import org.apache.ignite.internal.util.lang.GridCursor;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.junit.Test;
@@ -232,14 +235,23 @@ public class GridIndexRebuildSelfTest extends DynamicIndexAbstractSelfTest {
         private boolean firstRbld = true;
 
         /** {@inheritDoc} */
-        @Override protected void rebuildIndexesFromHash0(GridCacheContext cctx, SchemaIndexCacheVisitorClosure clo)
-            throws IgniteCheckedException {
-            if (!firstRbld)
-                U.await(INSTANCE.rebuildLatch);
+        @Override protected void rebuildIndexesFromHash0(
+            GridCacheContext cctx,
+            SchemaIndexCacheVisitorClosure clo,
+            GridFutureAdapter<Void> rebuildIdxFut
+        ) {
+            if (!firstRbld) {
+                try {
+                    U.await(INSTANCE.rebuildLatch);
+                }
+                catch (IgniteInterruptedCheckedException e) {
+                    throw new IgniteException(e);
+                }
+            }
             else
                 firstRbld = false;
 
-            super.rebuildIndexesFromHash0(cctx, clo);
+            super.rebuildIndexesFromHash0(cctx, clo, rebuildIdxFut);
         }
     }
 }