You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vo...@apache.org on 2019/02/06 10:00:58 UTC

[ignite] branch master updated: IGNITE-11180: SQL: Proper naming for map- and reduce- related classes. This closes #6008.

This is an automated email from the ASF dual-hosted git repository.

vozerov pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/master by this push:
     new 731f740  IGNITE-11180: SQL: Proper naming for map- and reduce- related classes. This closes #6008.
731f740 is described below

commit 731f74028d3cfc881f3da2fc46c0500a37d80074
Author: devozerov <vo...@gridgain.com>
AuthorDate: Wed Feb 6 13:00:39 2019 +0300

    IGNITE-11180: SQL: Proper naming for map- and reduce- related classes. This closes #6008.
---
 .../processors/query/h2/database/H2Tree.java       |   5 +-
 .../DmlDistributedUpdateRun.java}                  |  14 +-
 .../processors/query/h2/opt/GridH2ProxyIndex.java  |  50 +-----
 .../query/h2/opt/GridH2RowDescriptor.java          |   3 +-
 .../h2/opt/join/ProxyDistributedLookupBatch.java   |  76 +++++++++
 .../query/h2/twostep/GridReduceQueryExecutor.java  |  85 ++++------
 .../query/h2/twostep/ReduceBlockList.java          | 103 ++++++++++++
 .../{GridMergeIndex.java => ReduceIndex.java}      | 180 ++++-----------------
 ...IndexIterator.java => ReduceIndexIterator.java} |  16 +-
 ...ergeIndexSorted.java => ReduceIndexSorted.java} |  20 +--
 ...IndexUnsorted.java => ReduceIndexUnsorted.java} |  14 +-
 .../query/h2/twostep/ReducePartitionMapper.java    |   4 +-
 .../query/h2/twostep/ReduceQueryRun.java           |  12 +-
 .../{GridResultPage.java => ReduceResultPage.java} |  25 +--
 .../query/h2/twostep/ReduceScanIndex.java          |  46 ++++++
 .../query/h2/twostep/ReduceSourceKey.java          |  60 +++++++
 .../{GridMergeTable.java => ReduceTable.java}      |  41 ++---
 .../query/h2/twostep/ReduceTableEngine.java        |  78 +++++++++
 ...readLocalTable.java => ReduceTableWrapper.java} |  46 +-----
 .../query/IgniteSqlSplitterSelfTest.java           |   6 +-
 ...neOrSinglePartitionsQueryOptimizationsTest.java |   2 +-
 21 files changed, 508 insertions(+), 378 deletions(-)

diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/H2Tree.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/H2Tree.java
index a3d865c..53e2af1 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/H2Tree.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/H2Tree.java
@@ -48,6 +48,7 @@ import org.apache.ignite.internal.processors.query.h2.opt.H2CacheRow;
 import org.apache.ignite.internal.stat.IoStatisticsHolder;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.U;
+import org.h2.result.SearchRow;
 import org.h2.table.IndexColumn;
 import org.h2.value.Value;
 import org.jetbrains.annotations.Nullable;
@@ -396,7 +397,7 @@ public class H2Tree extends BPlusTree<H2Row, H2Row> {
 
             inlineSizeRecomendation(row);
 
-            org.h2.result.SearchRow rowData = getRow(io, pageAddr, idx);
+            SearchRow rowData = getRow(io, pageAddr, idx);
 
             for (int i = lastIdxUsed, len = cols.length; i < len; i++) {
                 IndexColumn col = cols[i];
@@ -500,7 +501,7 @@ public class H2Tree extends BPlusTree<H2Row, H2Row> {
      * @param row Grid H2 row related to given inline indexes.
      */
     @SuppressWarnings({"ConditionalBreakInInfiniteLoop", "IfMayBeConditional"})
-    private void inlineSizeRecomendation(org.h2.result.SearchRow row) {
+    private void inlineSizeRecomendation(SearchRow row) {
         //Do the check only for put operations.
         if(!(row instanceof H2CacheRow))
             return;
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/DistributedUpdateRun.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/dml/DmlDistributedUpdateRun.java
similarity index 90%
rename from modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/DistributedUpdateRun.java
rename to modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/dml/DmlDistributedUpdateRun.java
index 9e7b9ae..9982141 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/DistributedUpdateRun.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/dml/DmlDistributedUpdateRun.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.ignite.internal.processors.query.h2.twostep;
+package org.apache.ignite.internal.processors.query.h2.dml;
 
 import java.util.Arrays;
 import java.util.HashSet;
@@ -31,7 +31,7 @@ import org.apache.ignite.internal.util.typedef.F;
 /**
  * Context for DML operation on reducer node.
  */
-class DistributedUpdateRun {
+public class DmlDistributedUpdateRun {
     /** Expected number of responses. */
     private final int nodeCount;
 
@@ -52,7 +52,7 @@ class DistributedUpdateRun {
      *
      * @param nodeCount Number of nodes to await results from.
      */
-    DistributedUpdateRun(int nodeCount) {
+    public DmlDistributedUpdateRun(int nodeCount) {
         this.nodeCount = nodeCount;
 
         rspNodes = new HashSet<>(nodeCount);
@@ -61,7 +61,7 @@ class DistributedUpdateRun {
     /**
      * @return Result future.
      */
-    GridFutureAdapter<UpdateResult> future() {
+    public GridFutureAdapter<UpdateResult> future() {
         return fut;
     }
 
@@ -69,7 +69,7 @@ class DistributedUpdateRun {
      * Handle disconnection.
      * @param e Pre-formatted error.
      */
-    void handleDisconnect(CacheException e) {
+    public void handleDisconnect(CacheException e) {
         fut.onDone(new IgniteCheckedException("Update failed because client node have disconnected.", e));
     }
 
@@ -78,7 +78,7 @@ class DistributedUpdateRun {
      *
      * @param nodeId Node id.
      */
-    void handleNodeLeft(UUID nodeId) {
+    public void handleNodeLeft(UUID nodeId) {
         fut.onDone(new IgniteCheckedException("Update failed because map node left topology [nodeId=" + nodeId + "]"));
     }
 
@@ -88,7 +88,7 @@ class DistributedUpdateRun {
      * @param id Node id.
      * @param msg Response message.
      */
-    void handleResponse(UUID id, GridH2DmlResponse msg) {
+    public void handleResponse(UUID id, GridH2DmlResponse msg) {
         synchronized (this) {
             if (!rspNodes.add(id))
                 return; // ignore duplicated messages
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2ProxyIndex.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2ProxyIndex.java
index 95a253b..4897bd4 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2ProxyIndex.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2ProxyIndex.java
@@ -17,6 +17,7 @@
 
 package org.apache.ignite.internal.processors.query.h2.opt;
 
+import org.apache.ignite.internal.processors.query.h2.opt.join.ProxyDistributedLookupBatch;
 import org.h2.engine.Session;
 import org.h2.index.BaseIndex;
 import org.h2.index.Cursor;
@@ -34,7 +35,6 @@ import org.h2.table.TableFilter;
 
 import java.util.HashSet;
 import java.util.List;
-import java.util.concurrent.Future;
 
 /**
  * Allows to have 'free' index for alias columns
@@ -154,7 +154,12 @@ public class GridH2ProxyIndex extends BaseIndex {
     @Override public IndexLookupBatch createLookupBatch(TableFilter[] filters, int filter) {
         IndexLookupBatch batch = idx.createLookupBatch(filters, filter);
 
-        return batch != null ? new ProxyIndexLookupBatch(batch) : null;
+        if (batch == null)
+            return null;
+
+        GridH2RowDescriptor rowDesc = ((GridH2Table)idx.getTable()).rowDescriptor();
+
+        return new ProxyDistributedLookupBatch(batch, rowDesc);
     }
 
     /** {@inheritDoc} */
@@ -162,45 +167,4 @@ public class GridH2ProxyIndex extends BaseIndex {
         // No-op. Will be removed when underlying index is removed
     }
 
-    /** Proxy lookup batch */
-    private class ProxyIndexLookupBatch implements IndexLookupBatch {
-
-        /** Underlying normal lookup batch */
-        private final IndexLookupBatch target;
-
-        /**
-         * Creates proxy lookup batch.
-         *
-         * @param target Underlying index lookup batch.
-         */
-        private ProxyIndexLookupBatch(IndexLookupBatch target) {
-            this.target = target;
-        }
-
-        /** {@inheritDoc} */
-        @Override public boolean addSearchRows(SearchRow first, SearchRow last) {
-            GridH2RowDescriptor desc = ((GridH2Table)idx.getTable()).rowDescriptor();
-            return target.addSearchRows(desc.prepareProxyIndexRow(first), desc.prepareProxyIndexRow(last));
-        }
-
-        /** {@inheritDoc} */
-        @Override public boolean isBatchFull() {
-            return target.isBatchFull();
-        }
-
-        /** {@inheritDoc} */
-        @Override public List<Future<Cursor>> find() {
-            return target.find();
-        }
-
-        /** {@inheritDoc} */
-        @Override public String getPlanSQL() {
-            return target.getPlanSQL();
-        }
-
-        /** {@inheritDoc} */
-        @Override public void reset(boolean beforeQuery) {
-            target.reset(beforeQuery);
-        }
-    }
 }
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2RowDescriptor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2RowDescriptor.java
index 652c950..6a1f7f6 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2RowDescriptor.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2RowDescriptor.java
@@ -31,6 +31,7 @@ import org.apache.ignite.internal.processors.query.QueryUtils;
 import org.apache.ignite.internal.processors.query.h2.H2TableDescriptor;
 import org.apache.ignite.internal.processors.query.h2.IgniteH2Indexing;
 import org.h2.message.DbException;
+import org.h2.result.SearchRow;
 import org.h2.value.DataType;
 import org.h2.value.Value;
 import org.jetbrains.annotations.Nullable;
@@ -349,7 +350,7 @@ public class GridH2RowDescriptor {
      * @param row Source row.
      * @return Result.
      */
-    public org.h2.result.SearchRow prepareProxyIndexRow(org.h2.result.SearchRow row) {
+    public SearchRow prepareProxyIndexRow(SearchRow row) {
         if (row == null)
             return null;
 
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/join/ProxyDistributedLookupBatch.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/join/ProxyDistributedLookupBatch.java
new file mode 100644
index 0000000..652bd4b
--- /dev/null
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/join/ProxyDistributedLookupBatch.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.h2.opt.join;
+
+import org.apache.ignite.internal.processors.query.h2.opt.GridH2RowDescriptor;
+import org.h2.index.Cursor;
+import org.h2.index.IndexLookupBatch;
+import org.h2.result.SearchRow;
+
+import java.util.List;
+import java.util.concurrent.Future;
+
+/**
+ * Lookip batch for proxy indexes.
+ */
+public class ProxyDistributedLookupBatch implements IndexLookupBatch {
+    /** Underlying normal lookup batch */
+    private final IndexLookupBatch delegate;
+
+    /** Row descriptor. */
+    private final GridH2RowDescriptor rowDesc;
+
+    /**
+     * Creates proxy lookup batch.
+     *
+     * @param delegate Underlying index lookup batch.
+     * @param rowDesc Row descriptor.
+     */
+    public ProxyDistributedLookupBatch(IndexLookupBatch delegate, GridH2RowDescriptor rowDesc) {
+        this.delegate = delegate;
+        this.rowDesc = rowDesc;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean addSearchRows(SearchRow first, SearchRow last) {
+        SearchRow firstProxy = rowDesc.prepareProxyIndexRow(first);
+        SearchRow lastProxy = rowDesc.prepareProxyIndexRow(last);
+
+        return delegate.addSearchRows(firstProxy, lastProxy);
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean isBatchFull() {
+        return delegate.isBatchFull();
+    }
+
+    /** {@inheritDoc} */
+    @Override public List<Future<Cursor>> find() {
+        return delegate.find();
+    }
+
+    /** {@inheritDoc} */
+    @Override public String getPlanSQL() {
+        return delegate.getPlanSQL();
+    }
+
+    /** {@inheritDoc} */
+    @Override public void reset(boolean beforeQuery) {
+        delegate.reset(beforeQuery);
+    }
+}
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
index 5ae2806..3ff6dfe 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
@@ -20,7 +20,6 @@ package org.apache.ignite.internal.processors.query.h2.twostep;
 import java.sql.Connection;
 import java.sql.ResultSet;
 import java.sql.SQLException;
-import java.sql.Statement;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
@@ -70,6 +69,7 @@ import org.apache.ignite.internal.processors.query.h2.H2FieldsIterator;
 import org.apache.ignite.internal.processors.query.h2.H2Utils;
 import org.apache.ignite.internal.processors.query.h2.IgniteH2Indexing;
 import org.apache.ignite.internal.processors.query.h2.UpdateResult;
+import org.apache.ignite.internal.processors.query.h2.dml.DmlDistributedUpdateRun;
 import org.apache.ignite.internal.processors.query.h2.opt.QueryContext;
 import org.apache.ignite.internal.processors.query.h2.opt.QueryContextRegistry;
 import org.apache.ignite.internal.processors.query.h2.sql.GridSqlSortColumn;
@@ -141,10 +141,10 @@ public class GridReduceQueryExecutor {
     private final ConcurrentMap<Long, ReduceQueryRun> runs = new ConcurrentHashMap<>();
 
     /** Contexts of running DML requests. */
-    private final ConcurrentMap<Long, DistributedUpdateRun> updRuns = new ConcurrentHashMap<>();
+    private final ConcurrentMap<Long, DmlDistributedUpdateRun> updRuns = new ConcurrentHashMap<>();
 
     /** */
-    private volatile List<GridThreadLocalTable> fakeTbls = Collections.emptyList();
+    private volatile List<ReduceTableWrapper> fakeTbls = Collections.emptyList();
 
     /** */
     private final Lock fakeTblsLock = new ReentrantLock();
@@ -207,7 +207,7 @@ public class GridReduceQueryExecutor {
                 UUID nodeId = ((DiscoveryEvent)evt).eventNode().id();
 
                 for (ReduceQueryRun r : runs.values()) {
-                    for (GridMergeIndex idx : r.indexes()) {
+                    for (ReduceIndex idx : r.indexes()) {
                         if (idx.hasSource(nodeId)) {
                             handleNodeLeft(r, nodeId);
 
@@ -216,7 +216,7 @@ public class GridReduceQueryExecutor {
                     }
                 }
 
-                for (DistributedUpdateRun r : updRuns.values())
+                for (DmlDistributedUpdateRun r : updRuns.values())
                     r.handleNodeLeft(nodeId);
 
             }
@@ -306,12 +306,12 @@ public class GridReduceQueryExecutor {
 
         final int pageSize = r.pageSize();
 
-        GridMergeIndex idx = r.indexes().get(msg.query());
+        ReduceIndex idx = r.indexes().get(msg.query());
 
-        GridResultPage page;
+        ReduceResultPage page;
 
         try {
-            page = new GridResultPage(ctx, node.id(), msg) {
+            page = new ReduceResultPage(ctx, node.id(), msg) {
                 @Override public void fetchNextPage() {
                     if (r.hasErrorOrRetry()) {
                         if (r.exception() != null)
@@ -388,7 +388,7 @@ public class GridReduceQueryExecutor {
      * @param dataPageScanEnabled If data page scan is enabled.
      * @return Rows iterator.
      */
-    @SuppressWarnings("BusyWait")
+    @SuppressWarnings({"BusyWait", "IfMayBeConditional"})
     public Iterator<List<?>> query(
         String schemaName,
         final GridCacheTwoStepQuery qry,
@@ -559,17 +559,17 @@ public class GridReduceQueryExecutor {
             final boolean skipMergeTbl = !qry.explain() && qry.skipMergeTable() || singlePartMode;
 
             final int segmentsPerIndex = qry.explain() || isReplicatedOnly ? 1 :
-                findFirstPartitioned(cacheIds).config().getQueryParallelism();
+                mapper.findFirstPartitioned(cacheIds).config().getQueryParallelism();
 
             int replicatedQrysCnt = 0;
 
             final Collection<ClusterNode> finalNodes = nodes;
 
             for (GridCacheSqlQuery mapQry : mapQueries) {
-                GridMergeIndex idx;
+                ReduceIndex idx;
 
                 if (!skipMergeTbl) {
-                    GridMergeTable tbl;
+                    ReduceTable tbl;
 
                     try {
                         tbl = createMergeTable(r.connection(), mapQry, qry.explain());
@@ -583,7 +583,7 @@ public class GridReduceQueryExecutor {
                     fakeTable(r.connection(), tblIdx++).innerTable(tbl);
                 }
                 else
-                    idx = GridMergeIndexUnsorted.createDummy(ctx);
+                    idx = ReduceIndexUnsorted.createDummy(ctx);
 
                 // If the query has only replicated tables, we have to run it on a single node only.
                 if (!mapQry.isPartitioned()) {
@@ -745,7 +745,7 @@ public class GridReduceQueryExecutor {
 
                 if (!retry) {
                     if (skipMergeTbl) {
-                        resIter = new GridMergeIndexIterator(this,
+                        resIter = new ReduceIndexIterator(this,
                             finalNodes,
                             r,
                             qryReqId,
@@ -877,6 +877,7 @@ public class GridReduceQueryExecutor {
      * @param cancel Cancel state.
      * @return Update result, or {@code null} when some map node doesn't support distributed DML.
      */
+    @SuppressWarnings("IfMayBeConditional")
     public UpdateResult update(
         String schemaName,
         List<Integer> cacheIds,
@@ -920,7 +921,7 @@ public class GridReduceQueryExecutor {
             }
         }
 
-        final DistributedUpdateRun r = new DistributedUpdateRun(nodes.size());
+        final DmlDistributedUpdateRun r = new DmlDistributedUpdateRun(nodes.size());
 
         int flags = enforceJoinOrder ? GridH2QueryRequest.FLAG_ENFORCE_JOIN_ORDER : 0;
 
@@ -991,7 +992,7 @@ public class GridReduceQueryExecutor {
         try {
             long reqId = msg.requestId();
 
-            DistributedUpdateRun r = updRuns.get(reqId);
+            DmlDistributedUpdateRun r = updRuns.get(reqId);
 
             if (r == null) {
                 U.warn(log, "Unexpected dml response (will ignore). [localNodeId=" + ctx.localNodeId() + ", nodeId=" +
@@ -1009,24 +1010,6 @@ public class GridReduceQueryExecutor {
     }
 
     /**
-     * @param cacheIds Cache IDs.
-     * @return The first partitioned cache context.
-     */
-    private GridCacheContext<?,?> findFirstPartitioned(List<Integer> cacheIds) {
-        for (int i = 0; i < cacheIds.size(); i++) {
-            GridCacheContext<?, ?> cctx = cacheContext(cacheIds.get(i));
-
-            if (i == 0 && cctx.isLocal())
-                throw new CacheException("Cache is LOCAL: " + cctx.name());
-
-            if (!cctx.isReplicated() && !cctx.isLocal())
-                return cctx;
-        }
-
-        throw new IllegalStateException("Failed to find partitioned cache.");
-    }
-
-    /**
      * Returns true if the exception is triggered by query cancel.
      *
      * @param e Exception.
@@ -1050,7 +1033,7 @@ public class GridReduceQueryExecutor {
         if (distributedJoins)
             send(nodes, new GridQueryCancelRequest(qryReqId), null, false);
         else {
-            for (GridMergeIndex idx : r.indexes()) {
+            for (ReduceIndex idx : r.indexes()) {
                 if (!idx.fetchedAll()) {
                     send(nodes, new GridQueryCancelRequest(qryReqId), null, false);
 
@@ -1096,8 +1079,8 @@ public class GridReduceQueryExecutor {
      * @param idx Index of table.
      * @return Table.
      */
-    private GridThreadLocalTable fakeTable(Connection c, int idx) {
-        List<GridThreadLocalTable> tbls = fakeTbls;
+    private ReduceTableWrapper fakeTable(Connection c, int idx) {
+        List<ReduceTableWrapper> tbls = fakeTbls;
 
         assert tbls.size() >= idx;
 
@@ -1106,18 +1089,12 @@ public class GridReduceQueryExecutor {
 
             try {
                 if ((tbls = fakeTbls).size() == idx) { // Double check inside of lock.
-                    try (Statement stmt = c.createStatement()) {
-                        stmt.executeUpdate("CREATE TABLE " + mergeTableIdentifier(idx) +
-                            "(fake BOOL) ENGINE \"" + GridThreadLocalTable.Engine.class.getName() + '"');
-                    }
-                    catch (SQLException e) {
-                        throw new IllegalStateException(e);
-                    }
+                    ReduceTableWrapper tbl = ReduceTableEngine.create(c, idx);
 
-                    List<GridThreadLocalTable> newTbls = new ArrayList<>(tbls.size() + 1);
+                    List<ReduceTableWrapper> newTbls = new ArrayList<>(tbls.size() + 1);
 
                     newTbls.addAll(tbls);
-                    newTbls.add(GridThreadLocalTable.Engine.getCreated());
+                    newTbls.add(tbl);
 
                     fakeTbls = tbls = newTbls;
                 }
@@ -1151,7 +1128,7 @@ public class GridReduceQueryExecutor {
         int tblIdx = 0;
 
         for (GridCacheSqlQuery mapQry : qry.mapQueries()) {
-            GridMergeTable tbl = createMergeTable(c, mapQry, false);
+            ReduceTable tbl = createMergeTable(c, mapQry, false);
 
             fakeTable(c, tblIdx++).innerTable(tbl);
         }
@@ -1250,7 +1227,7 @@ public class GridReduceQueryExecutor {
      * @throws IgniteCheckedException If failed.
      */
     @SuppressWarnings("unchecked")
-    private GridMergeTable createMergeTable(JdbcConnection conn, GridCacheSqlQuery qry, boolean explain)
+    private ReduceTable createMergeTable(JdbcConnection conn, GridCacheSqlQuery qry, boolean explain)
         throws IgniteCheckedException {
         try {
             Session ses = (Session)conn.getSession();
@@ -1286,25 +1263,25 @@ public class GridReduceQueryExecutor {
 
             boolean sortedIndex = !F.isEmpty(qry.sortColumns());
 
-            GridMergeTable tbl = new GridMergeTable(data);
+            ReduceTable tbl = new ReduceTable(data);
 
             ArrayList<Index> idxs = new ArrayList<>(2);
 
             if (explain) {
-                idxs.add(new GridMergeIndexUnsorted(ctx, tbl,
+                idxs.add(new ReduceIndexUnsorted(ctx, tbl,
                     sortedIndex ? MERGE_INDEX_SORTED : MERGE_INDEX_UNSORTED));
             }
             else if (sortedIndex) {
                 List<GridSqlSortColumn> sortCols = (List<GridSqlSortColumn>)qry.sortColumns();
 
-                GridMergeIndexSorted sortedMergeIdx = new GridMergeIndexSorted(ctx, tbl, MERGE_INDEX_SORTED,
+                ReduceIndexSorted sortedMergeIdx = new ReduceIndexSorted(ctx, tbl, MERGE_INDEX_SORTED,
                     GridSqlSortColumn.toIndexColumns(tbl, sortCols));
 
-                idxs.add(GridMergeTable.createScanIndex(sortedMergeIdx));
+                idxs.add(ReduceTable.createScanIndex(sortedMergeIdx));
                 idxs.add(sortedMergeIdx);
             }
             else
-                idxs.add(new GridMergeIndexUnsorted(ctx, tbl, MERGE_INDEX_UNSORTED));
+                idxs.add(new ReduceIndexUnsorted(ctx, tbl, MERGE_INDEX_UNSORTED));
 
             tbl.indexes(idxs);
 
@@ -1338,7 +1315,7 @@ public class GridReduceQueryExecutor {
         for (Map.Entry<Long, ReduceQueryRun> e : runs.entrySet())
             e.getValue().disconnected(err);
 
-        for (DistributedUpdateRun r: updRuns.values())
+        for (DmlDistributedUpdateRun r: updRuns.values())
             r.handleDisconnect(err);
     }
 
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/ReduceBlockList.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/ReduceBlockList.java
new file mode 100644
index 0000000..87d4c14
--- /dev/null
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/ReduceBlockList.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.h2.twostep;
+
+import org.apache.ignite.internal.util.typedef.internal.U;
+
+import java.util.AbstractList;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.RandomAccess;
+
+/**
+ *
+ */
+public class ReduceBlockList<Z> extends AbstractList<Z> implements RandomAccess {
+    /** */
+    private final List<List<Z>> blocks;
+
+    /** */
+    private int size;
+
+    /** */
+    private final int maxBlockSize;
+
+    /** */
+    private final int shift;
+
+    /** */
+    private final int mask;
+
+    /**
+     * @param maxBlockSize Max block size.
+     */
+    public ReduceBlockList(int maxBlockSize) {
+        assert U.isPow2(maxBlockSize);
+
+        this.maxBlockSize = maxBlockSize;
+
+        shift = Integer.numberOfTrailingZeros(maxBlockSize);
+        mask = maxBlockSize - 1;
+
+        blocks = new ArrayList<>();
+        blocks.add(new ArrayList<Z>());
+    }
+
+    /** {@inheritDoc} */
+    @Override public int size() {
+        return size;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean add(Z z) {
+        size++;
+
+        List<Z> lastBlock = lastBlock();
+
+        lastBlock.add(z);
+
+        if (lastBlock.size() == maxBlockSize)
+            blocks.add(new ArrayList<Z>());
+
+        return true;
+    }
+
+    /** {@inheritDoc} */
+    @Override public Z get(int idx) {
+        return blocks.get(idx >>> shift).get(idx & mask);
+    }
+
+    /**
+     * @return Last block.
+     */
+    public List<Z> lastBlock() {
+        return ReduceIndex.last(blocks);
+    }
+
+    /**
+     * @return Evicted block.
+     */
+    public List<Z> evictFirstBlock() {
+        // Remove head block.
+        List<Z> res = blocks.remove(0);
+
+        size -= res.size();
+
+        return res;
+    }
+}
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndex.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/ReduceIndex.java
similarity index 80%
rename from modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndex.java
rename to modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/ReduceIndex.java
index 1c1cfaf..8266940 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndex.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/ReduceIndex.java
@@ -17,15 +17,12 @@
 
 package org.apache.ignite.internal.processors.query.h2.twostep;
 
-import java.util.AbstractList;
-import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.Comparator;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
-import java.util.RandomAccess;
 import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
@@ -58,7 +55,7 @@ import static org.apache.ignite.IgniteSystemProperties.getInteger;
 /**
  * Merge index.
  */
-public abstract class GridMergeIndex extends BaseIndex {
+public abstract class ReduceIndex extends BaseIndex {
     /** */
     private static final int MAX_FETCH_SIZE = getInteger(IGNITE_SQL_MERGE_TABLE_MAX_SIZE, 10_000);
 
@@ -66,8 +63,8 @@ public abstract class GridMergeIndex extends BaseIndex {
     private static final int PREFETCH_SIZE = getInteger(IGNITE_SQL_MERGE_TABLE_PREFETCH_SIZE, 1024);
 
     /** */
-    private static final AtomicReferenceFieldUpdater<GridMergeIndex, ConcurrentMap> lastPagesUpdater =
-        AtomicReferenceFieldUpdater.newUpdater(GridMergeIndex.class, ConcurrentMap.class, "lastPages");
+    private static final AtomicReferenceFieldUpdater<ReduceIndex, ConcurrentMap> LAST_PAGES_UPDATER =
+        AtomicReferenceFieldUpdater.newUpdater(ReduceIndex.class, ConcurrentMap.class, "lastPages");
 
     static {
         if (!U.isPow2(PREFETCH_SIZE)) {
@@ -83,6 +80,7 @@ public abstract class GridMergeIndex extends BaseIndex {
 
     /** */
     protected final Comparator<SearchRow> firstRowCmp = new Comparator<SearchRow>() {
+        @SuppressWarnings("ComparatorMethodParameterNotUsed")
         @Override public int compare(SearchRow rowInList, SearchRow searchRow) {
             int res = compareRows(rowInList, searchRow);
 
@@ -92,6 +90,7 @@ public abstract class GridMergeIndex extends BaseIndex {
 
     /** */
     protected final Comparator<SearchRow> lastRowCmp = new Comparator<SearchRow>() {
+        @SuppressWarnings("ComparatorMethodParameterNotUsed")
         @Override public int compare(SearchRow rowInList, SearchRow searchRow) {
             int res = compareRows(rowInList, searchRow);
 
@@ -108,19 +107,17 @@ public abstract class GridMergeIndex extends BaseIndex {
     /**
      * Will be r/w from query execution thread only, does not need to be threadsafe.
      */
-    private final BlockList<Row> fetched;
+    private final ReduceBlockList<Row> fetched;
 
     /** */
     private Row lastEvictedRow;
 
     /** */
-    private volatile int fetchedCnt;
-
-    /** */
     private final GridKernalContext ctx;
 
-    /** */
-    private volatile ConcurrentMap<SourceKey, Integer> lastPages;
+    /** DO NOT change name field of this field, updated through {@link #LAST_PAGES_UPDATER} */
+    @SuppressWarnings("unused")
+    private volatile ConcurrentMap<ReduceSourceKey, Integer> lastPages;
 
     /**
      * @param ctx Context.
@@ -129,8 +126,8 @@ public abstract class GridMergeIndex extends BaseIndex {
      * @param type Type.
      * @param cols Columns.
      */
-    public GridMergeIndex(GridKernalContext ctx,
-        GridMergeTable tbl,
+    protected ReduceIndex(GridKernalContext ctx,
+        ReduceTable tbl,
         String name,
         IndexType type,
         IndexColumn[] cols
@@ -143,10 +140,10 @@ public abstract class GridMergeIndex extends BaseIndex {
     /**
      * @param ctx Context.
      */
-    protected GridMergeIndex(GridKernalContext ctx) {
+    protected ReduceIndex(GridKernalContext ctx) {
         this.ctx = ctx;
 
-        fetched = new BlockList<>(PREFETCH_SIZE);
+        fetched = new ReduceBlockList<>(PREFETCH_SIZE);
     }
 
     /**
@@ -222,8 +219,8 @@ public abstract class GridMergeIndex extends BaseIndex {
      * @param queue Queue to poll.
      * @return Next page.
      */
-    private GridResultPage takeNextPage(Pollable<GridResultPage> queue) {
-        GridResultPage page;
+    private ReduceResultPage takeNextPage(Pollable<ReduceResultPage> queue) {
+        ReduceResultPage page;
 
         for (;;) {
             try {
@@ -247,9 +244,9 @@ public abstract class GridMergeIndex extends BaseIndex {
      * @param iter Current iterator.
      * @return The same or new iterator.
      */
-    protected final Iterator<Value[]> pollNextIterator(Pollable<GridResultPage> queue, Iterator<Value[]> iter) {
+    protected final Iterator<Value[]> pollNextIterator(Pollable<ReduceResultPage> queue, Iterator<Value[]> iter) {
         if (!iter.hasNext()) {
-            GridResultPage page = takeNextPage(queue);
+            ReduceResultPage page = takeNextPage(queue);
 
             if (!page.isLast())
                 page.fetchNextPage(); // Failed will throw an exception here.
@@ -279,7 +276,7 @@ public abstract class GridMergeIndex extends BaseIndex {
         if (nodeId == null)
             nodeId = F.first(sources);
 
-        addPage0(new GridResultPage(null, nodeId, null) {
+        addPage0(new ReduceResultPage(null, nodeId, null) {
             @Override public boolean isFail() {
                 return true;
             }
@@ -307,9 +304,9 @@ public abstract class GridMergeIndex extends BaseIndex {
         if (allRows < 0 || res.page() != 0)
             return;
 
-        ConcurrentMap<SourceKey,Integer> lp = lastPages;
+        ConcurrentMap<ReduceSourceKey,Integer> lp = lastPages;
 
-        if (lp == null && !lastPagesUpdater.compareAndSet(this, null, lp = new ConcurrentHashMap<>()))
+        if (lp == null && !LAST_PAGES_UPDATER.compareAndSet(this, null, lp = new ConcurrentHashMap<>()))
             lp = lastPages;
 
         assert pageSize > 0: pageSize;
@@ -318,14 +315,14 @@ public abstract class GridMergeIndex extends BaseIndex {
 
         assert lastPage >= 0: lastPage;
 
-        if (lp.put(new SourceKey(nodeId, res.segmentId()), lastPage) != null)
+        if (lp.put(new ReduceSourceKey(nodeId, res.segmentId()), lastPage) != null)
             throw new IllegalStateException();
     }
 
     /**
      * @param page Page.
      */
-    private void markLastPage(GridResultPage page) {
+    private void markLastPage(ReduceResultPage page) {
         GridQueryNextPageResponse res = page.response();
 
         if (!res.last()) {
@@ -333,12 +330,12 @@ public abstract class GridMergeIndex extends BaseIndex {
 
             initLastPages(nodeId, res);
 
-            ConcurrentMap<SourceKey,Integer> lp = lastPages;
+            ConcurrentMap<ReduceSourceKey,Integer> lp = lastPages;
 
             if (lp == null)
                 return; // It was not initialized --> wait for last page flag.
 
-            Integer lastPage = lp.get(new SourceKey(nodeId, res.segmentId()));
+            Integer lastPage = lp.get(new ReduceSourceKey(nodeId, res.segmentId()));
 
             if (lastPage == null)
                 return; // This node may use the new protocol --> wait for last page flag.
@@ -356,7 +353,7 @@ public abstract class GridMergeIndex extends BaseIndex {
     /**
      * @param page Page.
      */
-    public final void addPage(GridResultPage page) {
+    public final void addPage(ReduceResultPage page) {
         markLastPage(page);
         addPage0(page);
     }
@@ -365,16 +362,16 @@ public abstract class GridMergeIndex extends BaseIndex {
      * @param lastPage Real last page.
      * @return Created dummy page.
      */
-    protected final GridResultPage createDummyLastPage(GridResultPage lastPage) {
+    protected final ReduceResultPage createDummyLastPage(ReduceResultPage lastPage) {
         assert !lastPage.isDummyLast(); // It must be a real last page.
 
-        return new GridResultPage(ctx, lastPage.source(), null).setLast(true);
+        return new ReduceResultPage(ctx, lastPage.source(), null).setLast(true);
     }
 
     /**
      * @param page Page.
      */
-    protected abstract void addPage0(GridResultPage page);
+    protected abstract void addPage0(ReduceResultPage page);
 
     /** {@inheritDoc} */
     @Override public final Cursor find(Session ses, SearchRow first, SearchRow last) {
@@ -512,7 +509,7 @@ public abstract class GridMergeIndex extends BaseIndex {
      * @param l List.
      * @return Last element.
      */
-    private static <Z> Z last(List<Z> l) {
+    public static <Z> Z last(List<Z> l) {
         return l.get(l.size() - 1);
     }
 
@@ -637,8 +634,6 @@ public abstract class GridMergeIndex extends BaseIndex {
                     cur = Integer.MAX_VALUE; // We were not able to fetch anything. Done.
                 else {
                     // Update fetched count.
-                    fetchedCnt += rows.size() - cur;
-
                     if (haveBounds()) {
                         cur = findBounds();
 
@@ -676,88 +671,6 @@ public abstract class GridMergeIndex extends BaseIndex {
         }
     }
 
-    /** */
-    enum State {
-        UNINITIALIZED, INITIALIZED, FINISHED
-    }
-
-    /**
-     */
-    private static final class BlockList<Z> extends AbstractList<Z> implements RandomAccess {
-        /** */
-        private final List<List<Z>> blocks;
-
-        /** */
-        private int size;
-
-        /** */
-        private final int maxBlockSize;
-
-        /** */
-        private final int shift;
-
-        /** */
-        private final int mask;
-
-        /**
-         * @param maxBlockSize Max block size.
-         */
-        private BlockList(int maxBlockSize) {
-            assert U.isPow2(maxBlockSize);
-
-            this.maxBlockSize = maxBlockSize;
-
-            shift = Integer.numberOfTrailingZeros(maxBlockSize);
-            mask = maxBlockSize - 1;
-
-            blocks = new ArrayList<>();
-            blocks.add(new ArrayList<Z>());
-        }
-
-        /** {@inheritDoc} */
-        @Override public int size() {
-            return size;
-        }
-
-        /** {@inheritDoc} */
-        @Override public boolean add(Z z) {
-            size++;
-
-            List<Z> lastBlock = lastBlock();
-
-            lastBlock.add(z);
-
-            if (lastBlock.size() == maxBlockSize)
-                blocks.add(new ArrayList<Z>());
-
-            return true;
-        }
-
-        /** {@inheritDoc} */
-        @Override public Z get(int idx) {
-            return blocks.get(idx >>> shift).get(idx & mask);
-        }
-
-        /**
-         * @return Last block.
-         */
-        private List<Z> lastBlock() {
-            return last(blocks);
-        }
-
-        /**
-         * @return Evicted block.
-         */
-        private List<Z> evictFirstBlock() {
-            // Remove head block.
-            List<Z> res = blocks.remove(0);
-
-            size -= res.size();
-
-            return res;
-        }
-    }
-
     /**
      * Pollable.
      */
@@ -771,39 +684,4 @@ public abstract class GridMergeIndex extends BaseIndex {
         E poll(long timeout, TimeUnit unit) throws InterruptedException;
     }
 
-    /**
-     */
-    private static class SourceKey {
-        final UUID nodeId;
-
-        /** */
-        final int segment;
-
-        /**
-         * @param nodeId Node ID.
-         * @param segment Segment.
-         */
-        SourceKey(UUID nodeId, int segment) {
-            this.nodeId = nodeId;
-            this.segment = segment;
-        }
-
-        /** {@inheritDoc} */
-        @Override public boolean equals(Object o) {
-            if (this == o) return true;
-            if (o == null || getClass() != o.getClass()) return false;
-
-            SourceKey sourceKey = (SourceKey)o;
-
-            if (segment != sourceKey.segment) return false;
-            return nodeId.equals(sourceKey.nodeId);
-        }
-
-        /** {@inheritDoc} */
-        @Override public int hashCode() {
-            int result = nodeId.hashCode();
-            result = 31 * result + segment;
-            return result;
-        }
-    }
 }
\ No newline at end of file
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndexIterator.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/ReduceIndexIterator.java
similarity index 90%
rename from modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndexIterator.java
rename to modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/ReduceIndexIterator.java
index 851e1e4..3ff3a15 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndexIterator.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/ReduceIndexIterator.java
@@ -22,7 +22,6 @@ import java.util.Collection;
 import java.util.Iterator;
 import java.util.List;
 import java.util.NoSuchElementException;
-import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.internal.processors.cache.mvcc.MvccQueryTracker;
 import org.h2.index.Cursor;
@@ -30,9 +29,9 @@ import org.h2.result.Row;
 import org.jetbrains.annotations.Nullable;
 
 /**
- * Iterator that transparently and sequentially traverses a bunch of {@link GridMergeIndex} objects.
+ * Iterator that transparently and sequentially traverses a bunch of {@link ReduceIndex} objects.
  */
-class GridMergeIndexIterator implements Iterator<List<?>>, AutoCloseable {
+public class ReduceIndexIterator implements Iterator<List<?>>, AutoCloseable {
     /** Reduce query executor. */
     private final GridReduceQueryExecutor rdcExec;
 
@@ -49,7 +48,7 @@ class GridMergeIndexIterator implements Iterator<List<?>>, AutoCloseable {
     private final boolean distributedJoins;
 
     /** Iterator over indexes. */
-    private final Iterator<GridMergeIndex> idxIter;
+    private final Iterator<ReduceIndex> idxIter;
 
     /** Current cursor. */
     private Cursor cursor;
@@ -71,15 +70,14 @@ class GridMergeIndexIterator implements Iterator<List<?>>, AutoCloseable {
      * @param run Query run.
      * @param qryReqId Query request ID.
      * @param distributedJoins Distributed joins.
-     * @throws IgniteCheckedException if failed.
      */
-    GridMergeIndexIterator(GridReduceQueryExecutor rdcExec,
+    public ReduceIndexIterator(GridReduceQueryExecutor rdcExec,
         Collection<ClusterNode> nodes,
         ReduceQueryRun run,
         long qryReqId,
         boolean distributedJoins,
-        @Nullable MvccQueryTracker mvccTracker)
-        throws IgniteCheckedException {
+        @Nullable MvccQueryTracker mvccTracker
+    ) {
         this.rdcExec = rdcExec;
         this.nodes = nodes;
         this.run = run;
@@ -87,7 +85,7 @@ class GridMergeIndexIterator implements Iterator<List<?>>, AutoCloseable {
         this.distributedJoins = distributedJoins;
         this.mvccTracker = mvccTracker;
 
-        this.idxIter = run.indexes().iterator();
+        idxIter = run.indexes().iterator();
 
         advance();
     }
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndexSorted.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/ReduceIndexSorted.java
similarity index 95%
rename from modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndexSorted.java
rename to modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/ReduceIndexSorted.java
index 482752a..4dcfbbf 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndexSorted.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/ReduceIndexSorted.java
@@ -54,7 +54,7 @@ import static java.util.Collections.emptyIterator;
 /**
  * Sorted index.
  */
-public final class GridMergeIndexSorted extends GridMergeIndex {
+public final class ReduceIndexSorted extends ReduceIndex {
     /** */
     private static final IndexType TYPE = IndexType.createNonUnique(false);
 
@@ -84,7 +84,7 @@ public final class GridMergeIndexSorted extends GridMergeIndex {
     private final Condition notEmpty = lock.newCondition();
 
     /** */
-    private GridResultPage failPage;
+    private ReduceResultPage failPage;
 
     /** */
     private MergeStreamIterator it;
@@ -95,9 +95,9 @@ public final class GridMergeIndexSorted extends GridMergeIndex {
      * @param name Index name,
      * @param cols Columns.
      */
-    public GridMergeIndexSorted(
+    public ReduceIndexSorted(
         GridKernalContext ctx,
-        GridMergeTable tbl,
+        ReduceTable tbl,
         String name,
         IndexColumn[] cols
     ) {
@@ -132,7 +132,7 @@ public final class GridMergeIndexSorted extends GridMergeIndex {
     }
 
     /** {@inheritDoc} */
-    @Override protected void addPage0(GridResultPage page) {
+    @Override protected void addPage0(ReduceResultPage page) {
         if (page.isFail()) {
             lock.lock();
 
@@ -295,7 +295,7 @@ public final class GridMergeIndexSorted extends GridMergeIndex {
     /**
      * Row stream.
      */
-    private final class RowStream implements Pollable<GridResultPage> {
+    private final class RowStream implements Pollable<ReduceResultPage> {
         /** */
         Iterator<Value[]> iter = emptyIterator();
 
@@ -303,12 +303,12 @@ public final class GridMergeIndexSorted extends GridMergeIndex {
         Row cur;
 
         /** */
-        GridResultPage nextPage;
+        ReduceResultPage nextPage;
 
         /**
          * @param page Page.
          */
-        private void addPage(GridResultPage page) {
+        private void addPage(ReduceResultPage page) {
             assert !page.isFail();
 
             if (page.isLast() && page.rowsInPage() == 0)
@@ -330,7 +330,7 @@ public final class GridMergeIndexSorted extends GridMergeIndex {
         }
 
         /** {@inheritDoc} */
-        @Override public GridResultPage poll(long timeout, TimeUnit unit) throws InterruptedException {
+        @Override public ReduceResultPage poll(long timeout, TimeUnit unit) throws InterruptedException {
             long nanos = unit.toNanos(timeout);
 
             lock.lock();
@@ -340,7 +340,7 @@ public final class GridMergeIndexSorted extends GridMergeIndex {
                     if (failPage != null)
                         return failPage;
 
-                    GridResultPage page = nextPage;
+                    ReduceResultPage page = nextPage;
 
                     if (page != null) {
                         // isLast && !isDummyLast
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndexUnsorted.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/ReduceIndexUnsorted.java
similarity index 90%
rename from modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndexUnsorted.java
rename to modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/ReduceIndexUnsorted.java
index 4e9d11a..b0f1c5e 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndexUnsorted.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/ReduceIndexUnsorted.java
@@ -42,12 +42,12 @@ import org.h2.value.Value;
 /**
  * Unsorted merge index.
  */
-public final class GridMergeIndexUnsorted extends GridMergeIndex {
+public final class ReduceIndexUnsorted extends ReduceIndex {
     /** */
     private static final IndexType TYPE = IndexType.createScan(false);
 
     /** */
-    private final PollableQueue<GridResultPage> queue = new PollableQueue<>();
+    private final PollableQueue<ReduceResultPage> queue = new PollableQueue<>();
 
     /** */
     private final AtomicInteger activeSources = new AtomicInteger(-1);
@@ -60,7 +60,7 @@ public final class GridMergeIndexUnsorted extends GridMergeIndex {
      * @param tbl  Table.
      * @param name Index name.
      */
-    public GridMergeIndexUnsorted(GridKernalContext ctx, GridMergeTable tbl, String name) {
+    public ReduceIndexUnsorted(GridKernalContext ctx, ReduceTable tbl, String name) {
         super(ctx, tbl, name, TYPE, IndexColumn.wrap(tbl.getColumns()));
     }
 
@@ -68,14 +68,14 @@ public final class GridMergeIndexUnsorted extends GridMergeIndex {
      * @param ctx Context.
      * @return Dummy index instance.
      */
-    public static GridMergeIndexUnsorted createDummy(GridKernalContext ctx) {
-        return new GridMergeIndexUnsorted(ctx);
+    public static ReduceIndexUnsorted createDummy(GridKernalContext ctx) {
+        return new ReduceIndexUnsorted(ctx);
     }
 
     /**
      * @param ctx Context.
      */
-    private GridMergeIndexUnsorted(GridKernalContext ctx) {
+    private ReduceIndexUnsorted(GridKernalContext ctx) {
         super(ctx);
     }
 
@@ -100,7 +100,7 @@ public final class GridMergeIndexUnsorted extends GridMergeIndex {
     }
 
     /** {@inheritDoc} */
-    @Override protected void addPage0(GridResultPage page) {
+    @Override protected void addPage0(ReduceResultPage page) {
         assert page.rowsInPage() > 0 || page.isLast() || page.isFail();
 
         // Do not add empty page to avoid premature stream termination.
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/ReducePartitionMapper.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/ReducePartitionMapper.java
index 608f307..c898a5f 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/ReducePartitionMapper.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/ReducePartitionMapper.java
@@ -578,7 +578,7 @@ public class ReducePartitionMapper {
     private Collection<ClusterNode> dataNodes(int grpId, AffinityTopologyVersion topVer) {
         Collection<ClusterNode> res = ctx.discovery().cacheGroupAffinityNodes(grpId, topVer);
 
-        return res != null ? res : Collections.<ClusterNode>emptySet();
+        return res != null ? res : Collections.emptySet();
     }
 
     /**
@@ -616,7 +616,7 @@ public class ReducePartitionMapper {
      * @param cacheIds Cache IDs.
      * @return The first partitioned cache context.
      */
-    private GridCacheContext<?,?> findFirstPartitioned(List<Integer> cacheIds) {
+    public GridCacheContext<?,?> findFirstPartitioned(List<Integer> cacheIds) {
         for (int i = 0; i < cacheIds.size(); i++) {
             GridCacheContext<?, ?> cctx = cacheContext(cacheIds.get(i));
 
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/ReduceQueryRun.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/ReduceQueryRun.java
index d2f7e9a..91ab3e5 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/ReduceQueryRun.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/ReduceQueryRun.java
@@ -34,9 +34,9 @@ import org.jetbrains.annotations.Nullable;
 /**
  * Query run.
  */
-class ReduceQueryRun {
+public class ReduceQueryRun {
     /** */
-    private final List<GridMergeIndex> idxs;
+    private final List<ReduceIndex> idxs;
 
     /** */
     private CountDownLatch latch;
@@ -72,7 +72,9 @@ class ReduceQueryRun {
         Boolean dataPageScanEnabled
     ) {
         this.conn = (JdbcConnection)conn;
-        this.idxs = new ArrayList<>(idxsCnt);
+
+        idxs = new ArrayList<>(idxsCnt);
+
         this.pageSize = pageSize > 0 ? pageSize : GridCacheTwoStepQuery.DFLT_PAGE_SIZE;
         this.selectForUpdateFut = selectForUpdateFut;
         this.dataPageScanEnabled  = dataPageScanEnabled;
@@ -130,7 +132,7 @@ class ReduceQueryRun {
         while (latch.getCount() != 0) // We don't need to wait for all nodes to reply.
             latch.countDown();
 
-        for (GridMergeIndex idx : idxs) // Fail all merge indexes.
+        for (ReduceIndex idx : idxs) // Fail all merge indexes.
             idx.fail(state.nodeId, state.ex);
     }
 
@@ -199,7 +201,7 @@ class ReduceQueryRun {
     /**
      * @return Indexes.
      */
-    List<GridMergeIndex> indexes() {
+    List<ReduceIndex> indexes() {
         return idxs;
     }
 
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridResultPage.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/ReduceResultPage.java
similarity index 96%
rename from modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridResultPage.java
rename to modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/ReduceResultPage.java
index 0cb986b..4437dd6 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridResultPage.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/ReduceResultPage.java
@@ -17,14 +17,6 @@
 
 package org.apache.ignite.internal.processors.query.h2.twostep;
 
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Iterator;
-import java.util.NoSuchElementException;
-import java.util.UUID;
-import javax.cache.CacheException;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.internal.GridKernalContext;
 import org.apache.ignite.internal.processors.query.h2.twostep.messages.GridQueryNextPageResponse;
@@ -34,12 +26,21 @@ import org.apache.ignite.lang.IgniteClosure;
 import org.apache.ignite.plugin.extensions.communication.Message;
 import org.h2.value.Value;
 
+import javax.cache.CacheException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+import java.util.UUID;
+
 import static org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2ValueMessageFactory.fillArray;
 
 /**
  * Page result.
  */
-public class GridResultPage {
+public class ReduceResultPage {
     /** */
     private final UUID src;
 
@@ -61,7 +62,7 @@ public class GridResultPage {
      * @param res Response.
      */
     @SuppressWarnings("unchecked")
-    public GridResultPage(final GridKernalContext ctx, UUID src, GridQueryNextPageResponse res) {
+    public ReduceResultPage(final GridKernalContext ctx, UUID src, GridQueryNextPageResponse res) {
         assert src != null;
 
         this.src = src;
@@ -158,7 +159,7 @@ public class GridResultPage {
      * @param last Last page for a source.
      * @return {@code this}.
      */
-    public GridResultPage setLast(boolean last) {
+    public ReduceResultPage setLast(boolean last) {
         this.last = last;
 
         return this;
@@ -214,6 +215,6 @@ public class GridResultPage {
 
     /** {@inheritDoc} */
     @Override public String toString() {
-        return S.toString(GridResultPage.class, this);
+        return S.toString(ReduceResultPage.class, this);
     }
 }
\ No newline at end of file
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/ReduceScanIndex.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/ReduceScanIndex.java
new file mode 100644
index 0000000..f380145
--- /dev/null
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/ReduceScanIndex.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.h2.twostep;
+
+import org.apache.ignite.internal.processors.query.h2.opt.GridH2ScanIndex;
+import org.h2.engine.Session;
+import org.h2.result.SortOrder;
+import org.h2.table.Column;
+import org.h2.table.TableFilter;
+
+import java.util.HashSet;
+
+/**
+ * Scan index wrapper.
+ */
+public class ReduceScanIndex extends GridH2ScanIndex<ReduceIndex> {
+    /**
+     * @param delegate Delegate.
+     */
+    public ReduceScanIndex(ReduceIndex delegate) {
+        super(delegate);
+    }
+
+    /** {@inheritDoc} */
+    @Override public double getCost(Session session, int[] masks, TableFilter[] filters, int filter,
+        SortOrder sortOrder, HashSet<Column> allColumnsSet) {
+        long rows = getRowCountApproximation();
+
+        return getCostRangeIndex(masks, rows, filters, filter, sortOrder, true, allColumnsSet);
+    }
+}
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/ReduceSourceKey.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/ReduceSourceKey.java
new file mode 100644
index 0000000..e673722
--- /dev/null
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/ReduceSourceKey.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.h2.twostep;
+
+import org.apache.ignite.internal.util.typedef.F;
+
+import java.util.UUID;
+
+/**
+ * Reduce source key for a specific remote data source (remote node + specific segment).
+ */
+public class ReduceSourceKey {
+    /** Node ID. */
+    private final UUID nodeId;
+
+    /** Segment. */
+    private final int segment;
+
+    /**
+     * @param nodeId Node ID.
+     * @param segment Segment.
+     */
+    public ReduceSourceKey(UUID nodeId, int segment) {
+        this.nodeId = nodeId;
+        this.segment = segment;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean equals(Object o) {
+        if (this == o)
+            return true;
+
+        if (o == null || getClass() != o.getClass())
+            return false;
+
+        ReduceSourceKey other = (ReduceSourceKey)o;
+
+        return F.eq(segment, other.segment) && F.eq(nodeId, other.nodeId);
+    }
+
+    /** {@inheritDoc} */
+    @Override public int hashCode() {
+        return 31 * nodeId.hashCode() + segment;
+    }
+}
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeTable.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/ReduceTable.java
similarity index 79%
rename from modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeTable.java
rename to modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/ReduceTable.java
index 681917f..c520c2a 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeTable.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/ReduceTable.java
@@ -18,7 +18,6 @@
 package org.apache.ignite.internal.processors.query.h2.twostep;
 
 import java.util.ArrayList;
-import java.util.HashSet;
 
 import org.apache.ignite.internal.processors.query.h2.opt.GridH2ScanIndex;
 import org.apache.ignite.internal.util.typedef.F;
@@ -28,24 +27,24 @@ import org.h2.index.Index;
 import org.h2.index.IndexType;
 import org.h2.message.DbException;
 import org.h2.result.Row;
-import org.h2.result.SortOrder;
-import org.h2.table.Column;
 import org.h2.table.IndexColumn;
 import org.h2.table.TableBase;
-import org.h2.table.TableFilter;
 import org.h2.table.TableType;
 
 /**
- * Merge table for distributed queries.
+ * Table for reduce phase. Created for every splitted map query. Tables are created in H2 through SQL statement.
+ * In order to avoid overhead on SQL execution for every query, we create {@link ReduceTableWrapper} instead
+ * and set real {@code ReduceTable} through thread-local during query execution. This allow us to have only
+ * very limited number of physical tables in H2 for all user requests.
  */
-public class GridMergeTable extends TableBase {
+public class ReduceTable extends TableBase {
     /** */
     private ArrayList<Index> idxs;
 
     /**
      * @param data Data.
      */
-    public GridMergeTable(CreateTableData data) {
+    public ReduceTable(CreateTableData data) {
         super(data);
     }
 
@@ -61,16 +60,16 @@ public class GridMergeTable extends TableBase {
     /**
      * @return Merge index.
      */
-    public GridMergeIndex getMergeIndex() {
-        return (GridMergeIndex)idxs.get(idxs.size() - 1); // Sorted index must be the last.
+    public ReduceIndex getMergeIndex() {
+        return (ReduceIndex)idxs.get(idxs.size() - 1); // Sorted index must be the last.
     }
 
     /**
      * @param idx Index.
      * @return Scan index.
      */
-    public static GridH2ScanIndex<GridMergeIndex> createScanIndex(GridMergeIndex idx) {
-        return new ScanIndex(idx);
+    public static GridH2ScanIndex<ReduceIndex> createScanIndex(ReduceIndex idx) {
+        return new ReduceScanIndex(idx);
     }
 
     /** {@inheritDoc} */
@@ -178,24 +177,4 @@ public class GridMergeTable extends TableBase {
     @Override public void checkRename() {
         throw DbException.getUnsupportedException("rename");
     }
-
-    /**
-     * Scan index wrapper.
-     */
-    private static class ScanIndex extends GridH2ScanIndex<GridMergeIndex> {
-        /**
-         * @param delegate Delegate.
-         */
-        public ScanIndex(GridMergeIndex delegate) {
-            super(delegate);
-        }
-
-        /** {@inheritDoc} */
-        @Override public double getCost(Session session, int[] masks, TableFilter[] filters, int filter,
-            SortOrder sortOrder, HashSet<Column> allColumnsSet) {
-            long rows = getRowCountApproximation();
-
-            return getCostRangeIndex(masks, rows, filters, filter, sortOrder, true, allColumnsSet);
-        }
-    }
 }
\ No newline at end of file
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/ReduceTableEngine.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/ReduceTableEngine.java
new file mode 100644
index 0000000..1ca2e48
--- /dev/null
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/ReduceTableEngine.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.h2.twostep;
+
+import org.h2.api.TableEngine;
+import org.h2.command.ddl.CreateTableData;
+import org.h2.table.Table;
+
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.sql.Statement;
+
+import static org.apache.ignite.internal.processors.query.h2.sql.GridSqlQuerySplitter.mergeTableIdentifier;
+
+/**
+ * Engine to create reduce table.
+ */
+public class ReduceTableEngine implements TableEngine {
+    /** */
+    private static final ThreadLocal<ReduceTableWrapper> CREATED_TBL = new ThreadLocal<>();
+
+    /**
+     * Create merge table over the given connection with provided index.
+     *
+     * @param conn Connection.
+     * @param idx Index.
+     * @return Created table.
+     */
+    public static ReduceTableWrapper create(Connection conn, int idx) {
+        try (Statement stmt = conn.createStatement()) {
+            stmt.executeUpdate("CREATE TABLE " + mergeTableIdentifier(idx) +
+                "(fake BOOL) ENGINE \"" + ReduceTableEngine.class.getName() + '"');
+        }
+        catch (SQLException e) {
+            throw new IllegalStateException(e);
+        }
+
+        ReduceTableWrapper tbl = CREATED_TBL.get();
+
+        assert tbl != null;
+
+        CREATED_TBL.remove();
+
+        return tbl;
+    }
+
+    /** {@inheritDoc} */
+    @Override public Table createTable(CreateTableData d) {
+        assert CREATED_TBL.get() == null;
+
+        ReduceTableWrapper tbl = new ReduceTableWrapper(
+            d.schema,
+            d.id,
+            d.tableName,
+            d.persistIndexes,
+            d.persistData
+        );
+
+        CREATED_TBL.set(tbl);
+
+        return tbl;
+    }
+}
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridThreadLocalTable.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/ReduceTableWrapper.java
similarity index 85%
rename from modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridThreadLocalTable.java
rename to modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/ReduceTableWrapper.java
index b01c3d4..ddc0898 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridThreadLocalTable.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/ReduceTableWrapper.java
@@ -21,8 +21,6 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashSet;
 import javax.cache.CacheException;
-import org.h2.api.TableEngine;
-import org.h2.command.ddl.CreateTableData;
 import org.h2.engine.DbObject;
 import org.h2.engine.Session;
 import org.h2.index.Index;
@@ -41,11 +39,12 @@ import org.h2.table.TableType;
 import org.h2.value.Value;
 
 /**
- * Thread local table wrapper for another table instance.
+ * Thread local table wrapper for real reducer table. All reduce queries share the same small set of this fake tables.
+ * During reduce query execution every thread installs it's own real reduce tables into thread-local storage.
  */
-public class GridThreadLocalTable extends Table {
+public class ReduceTableWrapper extends Table {
     /** Delegate table */
-    private final ThreadLocal<Table> tbl = new ThreadLocal<>();
+    private final ThreadLocal<ReduceTable> tbl = new ThreadLocal<>();
 
     /**
      * @param schema Schema.
@@ -54,14 +53,14 @@ public class GridThreadLocalTable extends Table {
      * @param persistIndexes Persist indexes.
      * @param persistData Persist data.
      */
-    public GridThreadLocalTable(Schema schema, int id, String name, boolean persistIndexes, boolean persistData) {
+    public ReduceTableWrapper(Schema schema, int id, String name, boolean persistIndexes, boolean persistData) {
         super(schema, id, name, persistIndexes, persistData);
     }
 
     /**
      * @param t Table or {@code null} to reset existing.
      */
-    public void innerTable(Table t) {
+    public void innerTable(ReduceTable t) {
         if (t == null)
             tbl.remove();
         else
@@ -259,37 +258,4 @@ public class GridThreadLocalTable extends Table {
     @Override public void checkRename() {
         throw DbException.getUnsupportedException("rename");
     }
-
-    /**
-     * Engine.
-     */
-    public static class Engine implements TableEngine {
-        /** */
-        private static ThreadLocal<GridThreadLocalTable> createdTbl = new ThreadLocal<>();
-
-        /**
-         * @return Created table.
-         */
-        public static GridThreadLocalTable getCreated() {
-            GridThreadLocalTable tbl = createdTbl.get();
-
-            assert tbl != null;
-
-            createdTbl.remove();
-
-            return tbl;
-        }
-
-        /** {@inheritDoc} */
-        @Override public Table createTable(CreateTableData d) {
-            assert createdTbl.get() == null;
-
-            GridThreadLocalTable tbl = new GridThreadLocalTable(d.schema, d.id, d.tableName, d.persistIndexes,
-                d.persistData);
-
-            createdTbl.set(tbl);
-
-            return tbl;
-        }
-    }
 }
\ No newline at end of file
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/IgniteSqlSplitterSelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/IgniteSqlSplitterSelfTest.java
index d6937b1..73964a4 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/IgniteSqlSplitterSelfTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/IgniteSqlSplitterSelfTest.java
@@ -47,7 +47,7 @@ import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.configuration.IgniteConfiguration;
 import org.apache.ignite.internal.processors.cache.index.AbstractIndexingCommonTest;
-import org.apache.ignite.internal.processors.query.h2.twostep.GridMergeIndex;
+import org.apache.ignite.internal.processors.query.h2.twostep.ReduceIndex;
 import org.apache.ignite.internal.util.GridRandom;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.X;
@@ -575,7 +575,7 @@ public class IgniteSqlSplitterSelfTest extends AbstractIndexingCommonTest {
             Integer.class, Value.class));
 
         try {
-            GridTestUtils.setFieldValue(null, GridMergeIndex.class, "PREFETCH_SIZE", 8);
+            GridTestUtils.setFieldValue(null, ReduceIndex.class, "PREFETCH_SIZE", 8);
 
             Random rnd = new GridRandom();
 
@@ -625,7 +625,7 @@ public class IgniteSqlSplitterSelfTest extends AbstractIndexingCommonTest {
             }
         }
         finally {
-            GridTestUtils.setFieldValue(null, GridMergeIndex.class, "PREFETCH_SIZE", 1024);
+            GridTestUtils.setFieldValue(null, ReduceIndex.class, "PREFETCH_SIZE", 1024);
 
             c.destroy();
         }
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/twostep/NoneOrSinglePartitionsQueryOptimizationsTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/twostep/NoneOrSinglePartitionsQueryOptimizationsTest.java
index 7e0aaa5..68c3e09 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/twostep/NoneOrSinglePartitionsQueryOptimizationsTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/twostep/NoneOrSinglePartitionsQueryOptimizationsTest.java
@@ -467,7 +467,7 @@ public class NoneOrSinglePartitionsQueryOptimizationsTest extends GridCommonAbst
             if (expMergeTbl)
                 assertTrue(innerIter instanceof H2ResultSetIterator);
             else
-                assertTrue(innerIter instanceof GridMergeIndexIterator);
+                assertTrue(innerIter instanceof ReduceIndexIterator);
 
             List<List<?>> all = new ArrayList<>();