You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vo...@apache.org on 2019/02/06 10:00:58 UTC
[ignite] branch master updated: IGNITE-11180: SQL: Proper naming
for map- and reduce- related classes. This closes #6008.
This is an automated email from the ASF dual-hosted git repository.
vozerov pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new 731f740 IGNITE-11180: SQL: Proper naming for map- and reduce- related classes. This closes #6008.
731f740 is described below
commit 731f74028d3cfc881f3da2fc46c0500a37d80074
Author: devozerov <vo...@gridgain.com>
AuthorDate: Wed Feb 6 13:00:39 2019 +0300
IGNITE-11180: SQL: Proper naming for map- and reduce- related classes. This closes #6008.
---
.../processors/query/h2/database/H2Tree.java | 5 +-
.../DmlDistributedUpdateRun.java} | 14 +-
.../processors/query/h2/opt/GridH2ProxyIndex.java | 50 +-----
.../query/h2/opt/GridH2RowDescriptor.java | 3 +-
.../h2/opt/join/ProxyDistributedLookupBatch.java | 76 +++++++++
.../query/h2/twostep/GridReduceQueryExecutor.java | 85 ++++------
.../query/h2/twostep/ReduceBlockList.java | 103 ++++++++++++
.../{GridMergeIndex.java => ReduceIndex.java} | 180 ++++-----------------
...IndexIterator.java => ReduceIndexIterator.java} | 16 +-
...ergeIndexSorted.java => ReduceIndexSorted.java} | 20 +--
...IndexUnsorted.java => ReduceIndexUnsorted.java} | 14 +-
.../query/h2/twostep/ReducePartitionMapper.java | 4 +-
.../query/h2/twostep/ReduceQueryRun.java | 12 +-
.../{GridResultPage.java => ReduceResultPage.java} | 25 +--
.../query/h2/twostep/ReduceScanIndex.java | 46 ++++++
.../query/h2/twostep/ReduceSourceKey.java | 60 +++++++
.../{GridMergeTable.java => ReduceTable.java} | 41 ++---
.../query/h2/twostep/ReduceTableEngine.java | 78 +++++++++
...readLocalTable.java => ReduceTableWrapper.java} | 46 +-----
.../query/IgniteSqlSplitterSelfTest.java | 6 +-
...neOrSinglePartitionsQueryOptimizationsTest.java | 2 +-
21 files changed, 508 insertions(+), 378 deletions(-)
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/H2Tree.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/H2Tree.java
index a3d865c..53e2af1 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/H2Tree.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/H2Tree.java
@@ -48,6 +48,7 @@ import org.apache.ignite.internal.processors.query.h2.opt.H2CacheRow;
import org.apache.ignite.internal.stat.IoStatisticsHolder;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
+import org.h2.result.SearchRow;
import org.h2.table.IndexColumn;
import org.h2.value.Value;
import org.jetbrains.annotations.Nullable;
@@ -396,7 +397,7 @@ public class H2Tree extends BPlusTree<H2Row, H2Row> {
inlineSizeRecomendation(row);
- org.h2.result.SearchRow rowData = getRow(io, pageAddr, idx);
+ SearchRow rowData = getRow(io, pageAddr, idx);
for (int i = lastIdxUsed, len = cols.length; i < len; i++) {
IndexColumn col = cols[i];
@@ -500,7 +501,7 @@ public class H2Tree extends BPlusTree<H2Row, H2Row> {
* @param row Grid H2 row related to given inline indexes.
*/
@SuppressWarnings({"ConditionalBreakInInfiniteLoop", "IfMayBeConditional"})
- private void inlineSizeRecomendation(org.h2.result.SearchRow row) {
+ private void inlineSizeRecomendation(SearchRow row) {
//Do the check only for put operations.
if(!(row instanceof H2CacheRow))
return;
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/DistributedUpdateRun.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/dml/DmlDistributedUpdateRun.java
similarity index 90%
rename from modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/DistributedUpdateRun.java
rename to modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/dml/DmlDistributedUpdateRun.java
index 9e7b9ae..9982141 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/DistributedUpdateRun.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/dml/DmlDistributedUpdateRun.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.ignite.internal.processors.query.h2.twostep;
+package org.apache.ignite.internal.processors.query.h2.dml;
import java.util.Arrays;
import java.util.HashSet;
@@ -31,7 +31,7 @@ import org.apache.ignite.internal.util.typedef.F;
/**
* Context for DML operation on reducer node.
*/
-class DistributedUpdateRun {
+public class DmlDistributedUpdateRun {
/** Expected number of responses. */
private final int nodeCount;
@@ -52,7 +52,7 @@ class DistributedUpdateRun {
*
* @param nodeCount Number of nodes to await results from.
*/
- DistributedUpdateRun(int nodeCount) {
+ public DmlDistributedUpdateRun(int nodeCount) {
this.nodeCount = nodeCount;
rspNodes = new HashSet<>(nodeCount);
@@ -61,7 +61,7 @@ class DistributedUpdateRun {
/**
* @return Result future.
*/
- GridFutureAdapter<UpdateResult> future() {
+ public GridFutureAdapter<UpdateResult> future() {
return fut;
}
@@ -69,7 +69,7 @@ class DistributedUpdateRun {
* Handle disconnection.
* @param e Pre-formatted error.
*/
- void handleDisconnect(CacheException e) {
+ public void handleDisconnect(CacheException e) {
fut.onDone(new IgniteCheckedException("Update failed because client node have disconnected.", e));
}
@@ -78,7 +78,7 @@ class DistributedUpdateRun {
*
* @param nodeId Node id.
*/
- void handleNodeLeft(UUID nodeId) {
+ public void handleNodeLeft(UUID nodeId) {
fut.onDone(new IgniteCheckedException("Update failed because map node left topology [nodeId=" + nodeId + "]"));
}
@@ -88,7 +88,7 @@ class DistributedUpdateRun {
* @param id Node id.
* @param msg Response message.
*/
- void handleResponse(UUID id, GridH2DmlResponse msg) {
+ public void handleResponse(UUID id, GridH2DmlResponse msg) {
synchronized (this) {
if (!rspNodes.add(id))
return; // ignore duplicated messages
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2ProxyIndex.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2ProxyIndex.java
index 95a253b..4897bd4 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2ProxyIndex.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2ProxyIndex.java
@@ -17,6 +17,7 @@
package org.apache.ignite.internal.processors.query.h2.opt;
+import org.apache.ignite.internal.processors.query.h2.opt.join.ProxyDistributedLookupBatch;
import org.h2.engine.Session;
import org.h2.index.BaseIndex;
import org.h2.index.Cursor;
@@ -34,7 +35,6 @@ import org.h2.table.TableFilter;
import java.util.HashSet;
import java.util.List;
-import java.util.concurrent.Future;
/**
* Allows to have 'free' index for alias columns
@@ -154,7 +154,12 @@ public class GridH2ProxyIndex extends BaseIndex {
@Override public IndexLookupBatch createLookupBatch(TableFilter[] filters, int filter) {
IndexLookupBatch batch = idx.createLookupBatch(filters, filter);
- return batch != null ? new ProxyIndexLookupBatch(batch) : null;
+ if (batch == null)
+ return null;
+
+ GridH2RowDescriptor rowDesc = ((GridH2Table)idx.getTable()).rowDescriptor();
+
+ return new ProxyDistributedLookupBatch(batch, rowDesc);
}
/** {@inheritDoc} */
@@ -162,45 +167,4 @@ public class GridH2ProxyIndex extends BaseIndex {
// No-op. Will be removed when underlying index is removed
}
- /** Proxy lookup batch */
- private class ProxyIndexLookupBatch implements IndexLookupBatch {
-
- /** Underlying normal lookup batch */
- private final IndexLookupBatch target;
-
- /**
- * Creates proxy lookup batch.
- *
- * @param target Underlying index lookup batch.
- */
- private ProxyIndexLookupBatch(IndexLookupBatch target) {
- this.target = target;
- }
-
- /** {@inheritDoc} */
- @Override public boolean addSearchRows(SearchRow first, SearchRow last) {
- GridH2RowDescriptor desc = ((GridH2Table)idx.getTable()).rowDescriptor();
- return target.addSearchRows(desc.prepareProxyIndexRow(first), desc.prepareProxyIndexRow(last));
- }
-
- /** {@inheritDoc} */
- @Override public boolean isBatchFull() {
- return target.isBatchFull();
- }
-
- /** {@inheritDoc} */
- @Override public List<Future<Cursor>> find() {
- return target.find();
- }
-
- /** {@inheritDoc} */
- @Override public String getPlanSQL() {
- return target.getPlanSQL();
- }
-
- /** {@inheritDoc} */
- @Override public void reset(boolean beforeQuery) {
- target.reset(beforeQuery);
- }
- }
}
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2RowDescriptor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2RowDescriptor.java
index 652c950..6a1f7f6 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2RowDescriptor.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2RowDescriptor.java
@@ -31,6 +31,7 @@ import org.apache.ignite.internal.processors.query.QueryUtils;
import org.apache.ignite.internal.processors.query.h2.H2TableDescriptor;
import org.apache.ignite.internal.processors.query.h2.IgniteH2Indexing;
import org.h2.message.DbException;
+import org.h2.result.SearchRow;
import org.h2.value.DataType;
import org.h2.value.Value;
import org.jetbrains.annotations.Nullable;
@@ -349,7 +350,7 @@ public class GridH2RowDescriptor {
* @param row Source row.
* @return Result.
*/
- public org.h2.result.SearchRow prepareProxyIndexRow(org.h2.result.SearchRow row) {
+ public SearchRow prepareProxyIndexRow(SearchRow row) {
if (row == null)
return null;
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/join/ProxyDistributedLookupBatch.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/join/ProxyDistributedLookupBatch.java
new file mode 100644
index 0000000..652bd4b
--- /dev/null
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/join/ProxyDistributedLookupBatch.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.h2.opt.join;
+
+import org.apache.ignite.internal.processors.query.h2.opt.GridH2RowDescriptor;
+import org.h2.index.Cursor;
+import org.h2.index.IndexLookupBatch;
+import org.h2.result.SearchRow;
+
+import java.util.List;
+import java.util.concurrent.Future;
+
+/**
+ * Lookip batch for proxy indexes.
+ */
+public class ProxyDistributedLookupBatch implements IndexLookupBatch {
+ /** Underlying normal lookup batch */
+ private final IndexLookupBatch delegate;
+
+ /** Row descriptor. */
+ private final GridH2RowDescriptor rowDesc;
+
+ /**
+ * Creates proxy lookup batch.
+ *
+ * @param delegate Underlying index lookup batch.
+ * @param rowDesc Row descriptor.
+ */
+ public ProxyDistributedLookupBatch(IndexLookupBatch delegate, GridH2RowDescriptor rowDesc) {
+ this.delegate = delegate;
+ this.rowDesc = rowDesc;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean addSearchRows(SearchRow first, SearchRow last) {
+ SearchRow firstProxy = rowDesc.prepareProxyIndexRow(first);
+ SearchRow lastProxy = rowDesc.prepareProxyIndexRow(last);
+
+ return delegate.addSearchRows(firstProxy, lastProxy);
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean isBatchFull() {
+ return delegate.isBatchFull();
+ }
+
+ /** {@inheritDoc} */
+ @Override public List<Future<Cursor>> find() {
+ return delegate.find();
+ }
+
+ /** {@inheritDoc} */
+ @Override public String getPlanSQL() {
+ return delegate.getPlanSQL();
+ }
+
+ /** {@inheritDoc} */
+ @Override public void reset(boolean beforeQuery) {
+ delegate.reset(beforeQuery);
+ }
+}
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
index 5ae2806..3ff6dfe 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
@@ -20,7 +20,6 @@ package org.apache.ignite.internal.processors.query.h2.twostep;
import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
-import java.sql.Statement;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
@@ -70,6 +69,7 @@ import org.apache.ignite.internal.processors.query.h2.H2FieldsIterator;
import org.apache.ignite.internal.processors.query.h2.H2Utils;
import org.apache.ignite.internal.processors.query.h2.IgniteH2Indexing;
import org.apache.ignite.internal.processors.query.h2.UpdateResult;
+import org.apache.ignite.internal.processors.query.h2.dml.DmlDistributedUpdateRun;
import org.apache.ignite.internal.processors.query.h2.opt.QueryContext;
import org.apache.ignite.internal.processors.query.h2.opt.QueryContextRegistry;
import org.apache.ignite.internal.processors.query.h2.sql.GridSqlSortColumn;
@@ -141,10 +141,10 @@ public class GridReduceQueryExecutor {
private final ConcurrentMap<Long, ReduceQueryRun> runs = new ConcurrentHashMap<>();
/** Contexts of running DML requests. */
- private final ConcurrentMap<Long, DistributedUpdateRun> updRuns = new ConcurrentHashMap<>();
+ private final ConcurrentMap<Long, DmlDistributedUpdateRun> updRuns = new ConcurrentHashMap<>();
/** */
- private volatile List<GridThreadLocalTable> fakeTbls = Collections.emptyList();
+ private volatile List<ReduceTableWrapper> fakeTbls = Collections.emptyList();
/** */
private final Lock fakeTblsLock = new ReentrantLock();
@@ -207,7 +207,7 @@ public class GridReduceQueryExecutor {
UUID nodeId = ((DiscoveryEvent)evt).eventNode().id();
for (ReduceQueryRun r : runs.values()) {
- for (GridMergeIndex idx : r.indexes()) {
+ for (ReduceIndex idx : r.indexes()) {
if (idx.hasSource(nodeId)) {
handleNodeLeft(r, nodeId);
@@ -216,7 +216,7 @@ public class GridReduceQueryExecutor {
}
}
- for (DistributedUpdateRun r : updRuns.values())
+ for (DmlDistributedUpdateRun r : updRuns.values())
r.handleNodeLeft(nodeId);
}
@@ -306,12 +306,12 @@ public class GridReduceQueryExecutor {
final int pageSize = r.pageSize();
- GridMergeIndex idx = r.indexes().get(msg.query());
+ ReduceIndex idx = r.indexes().get(msg.query());
- GridResultPage page;
+ ReduceResultPage page;
try {
- page = new GridResultPage(ctx, node.id(), msg) {
+ page = new ReduceResultPage(ctx, node.id(), msg) {
@Override public void fetchNextPage() {
if (r.hasErrorOrRetry()) {
if (r.exception() != null)
@@ -388,7 +388,7 @@ public class GridReduceQueryExecutor {
* @param dataPageScanEnabled If data page scan is enabled.
* @return Rows iterator.
*/
- @SuppressWarnings("BusyWait")
+ @SuppressWarnings({"BusyWait", "IfMayBeConditional"})
public Iterator<List<?>> query(
String schemaName,
final GridCacheTwoStepQuery qry,
@@ -559,17 +559,17 @@ public class GridReduceQueryExecutor {
final boolean skipMergeTbl = !qry.explain() && qry.skipMergeTable() || singlePartMode;
final int segmentsPerIndex = qry.explain() || isReplicatedOnly ? 1 :
- findFirstPartitioned(cacheIds).config().getQueryParallelism();
+ mapper.findFirstPartitioned(cacheIds).config().getQueryParallelism();
int replicatedQrysCnt = 0;
final Collection<ClusterNode> finalNodes = nodes;
for (GridCacheSqlQuery mapQry : mapQueries) {
- GridMergeIndex idx;
+ ReduceIndex idx;
if (!skipMergeTbl) {
- GridMergeTable tbl;
+ ReduceTable tbl;
try {
tbl = createMergeTable(r.connection(), mapQry, qry.explain());
@@ -583,7 +583,7 @@ public class GridReduceQueryExecutor {
fakeTable(r.connection(), tblIdx++).innerTable(tbl);
}
else
- idx = GridMergeIndexUnsorted.createDummy(ctx);
+ idx = ReduceIndexUnsorted.createDummy(ctx);
// If the query has only replicated tables, we have to run it on a single node only.
if (!mapQry.isPartitioned()) {
@@ -745,7 +745,7 @@ public class GridReduceQueryExecutor {
if (!retry) {
if (skipMergeTbl) {
- resIter = new GridMergeIndexIterator(this,
+ resIter = new ReduceIndexIterator(this,
finalNodes,
r,
qryReqId,
@@ -877,6 +877,7 @@ public class GridReduceQueryExecutor {
* @param cancel Cancel state.
* @return Update result, or {@code null} when some map node doesn't support distributed DML.
*/
+ @SuppressWarnings("IfMayBeConditional")
public UpdateResult update(
String schemaName,
List<Integer> cacheIds,
@@ -920,7 +921,7 @@ public class GridReduceQueryExecutor {
}
}
- final DistributedUpdateRun r = new DistributedUpdateRun(nodes.size());
+ final DmlDistributedUpdateRun r = new DmlDistributedUpdateRun(nodes.size());
int flags = enforceJoinOrder ? GridH2QueryRequest.FLAG_ENFORCE_JOIN_ORDER : 0;
@@ -991,7 +992,7 @@ public class GridReduceQueryExecutor {
try {
long reqId = msg.requestId();
- DistributedUpdateRun r = updRuns.get(reqId);
+ DmlDistributedUpdateRun r = updRuns.get(reqId);
if (r == null) {
U.warn(log, "Unexpected dml response (will ignore). [localNodeId=" + ctx.localNodeId() + ", nodeId=" +
@@ -1009,24 +1010,6 @@ public class GridReduceQueryExecutor {
}
/**
- * @param cacheIds Cache IDs.
- * @return The first partitioned cache context.
- */
- private GridCacheContext<?,?> findFirstPartitioned(List<Integer> cacheIds) {
- for (int i = 0; i < cacheIds.size(); i++) {
- GridCacheContext<?, ?> cctx = cacheContext(cacheIds.get(i));
-
- if (i == 0 && cctx.isLocal())
- throw new CacheException("Cache is LOCAL: " + cctx.name());
-
- if (!cctx.isReplicated() && !cctx.isLocal())
- return cctx;
- }
-
- throw new IllegalStateException("Failed to find partitioned cache.");
- }
-
- /**
* Returns true if the exception is triggered by query cancel.
*
* @param e Exception.
@@ -1050,7 +1033,7 @@ public class GridReduceQueryExecutor {
if (distributedJoins)
send(nodes, new GridQueryCancelRequest(qryReqId), null, false);
else {
- for (GridMergeIndex idx : r.indexes()) {
+ for (ReduceIndex idx : r.indexes()) {
if (!idx.fetchedAll()) {
send(nodes, new GridQueryCancelRequest(qryReqId), null, false);
@@ -1096,8 +1079,8 @@ public class GridReduceQueryExecutor {
* @param idx Index of table.
* @return Table.
*/
- private GridThreadLocalTable fakeTable(Connection c, int idx) {
- List<GridThreadLocalTable> tbls = fakeTbls;
+ private ReduceTableWrapper fakeTable(Connection c, int idx) {
+ List<ReduceTableWrapper> tbls = fakeTbls;
assert tbls.size() >= idx;
@@ -1106,18 +1089,12 @@ public class GridReduceQueryExecutor {
try {
if ((tbls = fakeTbls).size() == idx) { // Double check inside of lock.
- try (Statement stmt = c.createStatement()) {
- stmt.executeUpdate("CREATE TABLE " + mergeTableIdentifier(idx) +
- "(fake BOOL) ENGINE \"" + GridThreadLocalTable.Engine.class.getName() + '"');
- }
- catch (SQLException e) {
- throw new IllegalStateException(e);
- }
+ ReduceTableWrapper tbl = ReduceTableEngine.create(c, idx);
- List<GridThreadLocalTable> newTbls = new ArrayList<>(tbls.size() + 1);
+ List<ReduceTableWrapper> newTbls = new ArrayList<>(tbls.size() + 1);
newTbls.addAll(tbls);
- newTbls.add(GridThreadLocalTable.Engine.getCreated());
+ newTbls.add(tbl);
fakeTbls = tbls = newTbls;
}
@@ -1151,7 +1128,7 @@ public class GridReduceQueryExecutor {
int tblIdx = 0;
for (GridCacheSqlQuery mapQry : qry.mapQueries()) {
- GridMergeTable tbl = createMergeTable(c, mapQry, false);
+ ReduceTable tbl = createMergeTable(c, mapQry, false);
fakeTable(c, tblIdx++).innerTable(tbl);
}
@@ -1250,7 +1227,7 @@ public class GridReduceQueryExecutor {
* @throws IgniteCheckedException If failed.
*/
@SuppressWarnings("unchecked")
- private GridMergeTable createMergeTable(JdbcConnection conn, GridCacheSqlQuery qry, boolean explain)
+ private ReduceTable createMergeTable(JdbcConnection conn, GridCacheSqlQuery qry, boolean explain)
throws IgniteCheckedException {
try {
Session ses = (Session)conn.getSession();
@@ -1286,25 +1263,25 @@ public class GridReduceQueryExecutor {
boolean sortedIndex = !F.isEmpty(qry.sortColumns());
- GridMergeTable tbl = new GridMergeTable(data);
+ ReduceTable tbl = new ReduceTable(data);
ArrayList<Index> idxs = new ArrayList<>(2);
if (explain) {
- idxs.add(new GridMergeIndexUnsorted(ctx, tbl,
+ idxs.add(new ReduceIndexUnsorted(ctx, tbl,
sortedIndex ? MERGE_INDEX_SORTED : MERGE_INDEX_UNSORTED));
}
else if (sortedIndex) {
List<GridSqlSortColumn> sortCols = (List<GridSqlSortColumn>)qry.sortColumns();
- GridMergeIndexSorted sortedMergeIdx = new GridMergeIndexSorted(ctx, tbl, MERGE_INDEX_SORTED,
+ ReduceIndexSorted sortedMergeIdx = new ReduceIndexSorted(ctx, tbl, MERGE_INDEX_SORTED,
GridSqlSortColumn.toIndexColumns(tbl, sortCols));
- idxs.add(GridMergeTable.createScanIndex(sortedMergeIdx));
+ idxs.add(ReduceTable.createScanIndex(sortedMergeIdx));
idxs.add(sortedMergeIdx);
}
else
- idxs.add(new GridMergeIndexUnsorted(ctx, tbl, MERGE_INDEX_UNSORTED));
+ idxs.add(new ReduceIndexUnsorted(ctx, tbl, MERGE_INDEX_UNSORTED));
tbl.indexes(idxs);
@@ -1338,7 +1315,7 @@ public class GridReduceQueryExecutor {
for (Map.Entry<Long, ReduceQueryRun> e : runs.entrySet())
e.getValue().disconnected(err);
- for (DistributedUpdateRun r: updRuns.values())
+ for (DmlDistributedUpdateRun r: updRuns.values())
r.handleDisconnect(err);
}
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/ReduceBlockList.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/ReduceBlockList.java
new file mode 100644
index 0000000..87d4c14
--- /dev/null
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/ReduceBlockList.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.h2.twostep;
+
+import org.apache.ignite.internal.util.typedef.internal.U;
+
+import java.util.AbstractList;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.RandomAccess;
+
+/**
+ *
+ */
+public class ReduceBlockList<Z> extends AbstractList<Z> implements RandomAccess {
+ /** */
+ private final List<List<Z>> blocks;
+
+ /** */
+ private int size;
+
+ /** */
+ private final int maxBlockSize;
+
+ /** */
+ private final int shift;
+
+ /** */
+ private final int mask;
+
+ /**
+ * @param maxBlockSize Max block size.
+ */
+ public ReduceBlockList(int maxBlockSize) {
+ assert U.isPow2(maxBlockSize);
+
+ this.maxBlockSize = maxBlockSize;
+
+ shift = Integer.numberOfTrailingZeros(maxBlockSize);
+ mask = maxBlockSize - 1;
+
+ blocks = new ArrayList<>();
+ blocks.add(new ArrayList<Z>());
+ }
+
+ /** {@inheritDoc} */
+ @Override public int size() {
+ return size;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean add(Z z) {
+ size++;
+
+ List<Z> lastBlock = lastBlock();
+
+ lastBlock.add(z);
+
+ if (lastBlock.size() == maxBlockSize)
+ blocks.add(new ArrayList<Z>());
+
+ return true;
+ }
+
+ /** {@inheritDoc} */
+ @Override public Z get(int idx) {
+ return blocks.get(idx >>> shift).get(idx & mask);
+ }
+
+ /**
+ * @return Last block.
+ */
+ public List<Z> lastBlock() {
+ return ReduceIndex.last(blocks);
+ }
+
+ /**
+ * @return Evicted block.
+ */
+ public List<Z> evictFirstBlock() {
+ // Remove head block.
+ List<Z> res = blocks.remove(0);
+
+ size -= res.size();
+
+ return res;
+ }
+}
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndex.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/ReduceIndex.java
similarity index 80%
rename from modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndex.java
rename to modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/ReduceIndex.java
index 1c1cfaf..8266940 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndex.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/ReduceIndex.java
@@ -17,15 +17,12 @@
package org.apache.ignite.internal.processors.query.h2.twostep;
-import java.util.AbstractList;
-import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
-import java.util.RandomAccess;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
@@ -58,7 +55,7 @@ import static org.apache.ignite.IgniteSystemProperties.getInteger;
/**
* Merge index.
*/
-public abstract class GridMergeIndex extends BaseIndex {
+public abstract class ReduceIndex extends BaseIndex {
/** */
private static final int MAX_FETCH_SIZE = getInteger(IGNITE_SQL_MERGE_TABLE_MAX_SIZE, 10_000);
@@ -66,8 +63,8 @@ public abstract class GridMergeIndex extends BaseIndex {
private static final int PREFETCH_SIZE = getInteger(IGNITE_SQL_MERGE_TABLE_PREFETCH_SIZE, 1024);
/** */
- private static final AtomicReferenceFieldUpdater<GridMergeIndex, ConcurrentMap> lastPagesUpdater =
- AtomicReferenceFieldUpdater.newUpdater(GridMergeIndex.class, ConcurrentMap.class, "lastPages");
+ private static final AtomicReferenceFieldUpdater<ReduceIndex, ConcurrentMap> LAST_PAGES_UPDATER =
+ AtomicReferenceFieldUpdater.newUpdater(ReduceIndex.class, ConcurrentMap.class, "lastPages");
static {
if (!U.isPow2(PREFETCH_SIZE)) {
@@ -83,6 +80,7 @@ public abstract class GridMergeIndex extends BaseIndex {
/** */
protected final Comparator<SearchRow> firstRowCmp = new Comparator<SearchRow>() {
+ @SuppressWarnings("ComparatorMethodParameterNotUsed")
@Override public int compare(SearchRow rowInList, SearchRow searchRow) {
int res = compareRows(rowInList, searchRow);
@@ -92,6 +90,7 @@ public abstract class GridMergeIndex extends BaseIndex {
/** */
protected final Comparator<SearchRow> lastRowCmp = new Comparator<SearchRow>() {
+ @SuppressWarnings("ComparatorMethodParameterNotUsed")
@Override public int compare(SearchRow rowInList, SearchRow searchRow) {
int res = compareRows(rowInList, searchRow);
@@ -108,19 +107,17 @@ public abstract class GridMergeIndex extends BaseIndex {
/**
* Will be r/w from query execution thread only, does not need to be threadsafe.
*/
- private final BlockList<Row> fetched;
+ private final ReduceBlockList<Row> fetched;
/** */
private Row lastEvictedRow;
/** */
- private volatile int fetchedCnt;
-
- /** */
private final GridKernalContext ctx;
- /** */
- private volatile ConcurrentMap<SourceKey, Integer> lastPages;
+ /** DO NOT change name field of this field, updated through {@link #LAST_PAGES_UPDATER} */
+ @SuppressWarnings("unused")
+ private volatile ConcurrentMap<ReduceSourceKey, Integer> lastPages;
/**
* @param ctx Context.
@@ -129,8 +126,8 @@ public abstract class GridMergeIndex extends BaseIndex {
* @param type Type.
* @param cols Columns.
*/
- public GridMergeIndex(GridKernalContext ctx,
- GridMergeTable tbl,
+ protected ReduceIndex(GridKernalContext ctx,
+ ReduceTable tbl,
String name,
IndexType type,
IndexColumn[] cols
@@ -143,10 +140,10 @@ public abstract class GridMergeIndex extends BaseIndex {
/**
* @param ctx Context.
*/
- protected GridMergeIndex(GridKernalContext ctx) {
+ protected ReduceIndex(GridKernalContext ctx) {
this.ctx = ctx;
- fetched = new BlockList<>(PREFETCH_SIZE);
+ fetched = new ReduceBlockList<>(PREFETCH_SIZE);
}
/**
@@ -222,8 +219,8 @@ public abstract class GridMergeIndex extends BaseIndex {
* @param queue Queue to poll.
* @return Next page.
*/
- private GridResultPage takeNextPage(Pollable<GridResultPage> queue) {
- GridResultPage page;
+ private ReduceResultPage takeNextPage(Pollable<ReduceResultPage> queue) {
+ ReduceResultPage page;
for (;;) {
try {
@@ -247,9 +244,9 @@ public abstract class GridMergeIndex extends BaseIndex {
* @param iter Current iterator.
* @return The same or new iterator.
*/
- protected final Iterator<Value[]> pollNextIterator(Pollable<GridResultPage> queue, Iterator<Value[]> iter) {
+ protected final Iterator<Value[]> pollNextIterator(Pollable<ReduceResultPage> queue, Iterator<Value[]> iter) {
if (!iter.hasNext()) {
- GridResultPage page = takeNextPage(queue);
+ ReduceResultPage page = takeNextPage(queue);
if (!page.isLast())
page.fetchNextPage(); // Failed will throw an exception here.
@@ -279,7 +276,7 @@ public abstract class GridMergeIndex extends BaseIndex {
if (nodeId == null)
nodeId = F.first(sources);
- addPage0(new GridResultPage(null, nodeId, null) {
+ addPage0(new ReduceResultPage(null, nodeId, null) {
@Override public boolean isFail() {
return true;
}
@@ -307,9 +304,9 @@ public abstract class GridMergeIndex extends BaseIndex {
if (allRows < 0 || res.page() != 0)
return;
- ConcurrentMap<SourceKey,Integer> lp = lastPages;
+ ConcurrentMap<ReduceSourceKey,Integer> lp = lastPages;
- if (lp == null && !lastPagesUpdater.compareAndSet(this, null, lp = new ConcurrentHashMap<>()))
+ if (lp == null && !LAST_PAGES_UPDATER.compareAndSet(this, null, lp = new ConcurrentHashMap<>()))
lp = lastPages;
assert pageSize > 0: pageSize;
@@ -318,14 +315,14 @@ public abstract class GridMergeIndex extends BaseIndex {
assert lastPage >= 0: lastPage;
- if (lp.put(new SourceKey(nodeId, res.segmentId()), lastPage) != null)
+ if (lp.put(new ReduceSourceKey(nodeId, res.segmentId()), lastPage) != null)
throw new IllegalStateException();
}
/**
* @param page Page.
*/
- private void markLastPage(GridResultPage page) {
+ private void markLastPage(ReduceResultPage page) {
GridQueryNextPageResponse res = page.response();
if (!res.last()) {
@@ -333,12 +330,12 @@ public abstract class GridMergeIndex extends BaseIndex {
initLastPages(nodeId, res);
- ConcurrentMap<SourceKey,Integer> lp = lastPages;
+ ConcurrentMap<ReduceSourceKey,Integer> lp = lastPages;
if (lp == null)
return; // It was not initialized --> wait for last page flag.
- Integer lastPage = lp.get(new SourceKey(nodeId, res.segmentId()));
+ Integer lastPage = lp.get(new ReduceSourceKey(nodeId, res.segmentId()));
if (lastPage == null)
return; // This node may use the new protocol --> wait for last page flag.
@@ -356,7 +353,7 @@ public abstract class GridMergeIndex extends BaseIndex {
/**
* @param page Page.
*/
- public final void addPage(GridResultPage page) {
+ public final void addPage(ReduceResultPage page) {
markLastPage(page);
addPage0(page);
}
@@ -365,16 +362,16 @@ public abstract class GridMergeIndex extends BaseIndex {
* @param lastPage Real last page.
* @return Created dummy page.
*/
- protected final GridResultPage createDummyLastPage(GridResultPage lastPage) {
+ protected final ReduceResultPage createDummyLastPage(ReduceResultPage lastPage) {
assert !lastPage.isDummyLast(); // It must be a real last page.
- return new GridResultPage(ctx, lastPage.source(), null).setLast(true);
+ return new ReduceResultPage(ctx, lastPage.source(), null).setLast(true);
}
/**
* @param page Page.
*/
- protected abstract void addPage0(GridResultPage page);
+ protected abstract void addPage0(ReduceResultPage page);
/** {@inheritDoc} */
@Override public final Cursor find(Session ses, SearchRow first, SearchRow last) {
@@ -512,7 +509,7 @@ public abstract class GridMergeIndex extends BaseIndex {
* @param l List.
* @return Last element.
*/
- private static <Z> Z last(List<Z> l) {
+ public static <Z> Z last(List<Z> l) {
return l.get(l.size() - 1);
}
@@ -637,8 +634,6 @@ public abstract class GridMergeIndex extends BaseIndex {
cur = Integer.MAX_VALUE; // We were not able to fetch anything. Done.
else {
// Update fetched count.
- fetchedCnt += rows.size() - cur;
-
if (haveBounds()) {
cur = findBounds();
@@ -676,88 +671,6 @@ public abstract class GridMergeIndex extends BaseIndex {
}
}
- /** */
- enum State {
- UNINITIALIZED, INITIALIZED, FINISHED
- }
-
- /**
- */
- private static final class BlockList<Z> extends AbstractList<Z> implements RandomAccess {
- /** */
- private final List<List<Z>> blocks;
-
- /** */
- private int size;
-
- /** */
- private final int maxBlockSize;
-
- /** */
- private final int shift;
-
- /** */
- private final int mask;
-
- /**
- * @param maxBlockSize Max block size.
- */
- private BlockList(int maxBlockSize) {
- assert U.isPow2(maxBlockSize);
-
- this.maxBlockSize = maxBlockSize;
-
- shift = Integer.numberOfTrailingZeros(maxBlockSize);
- mask = maxBlockSize - 1;
-
- blocks = new ArrayList<>();
- blocks.add(new ArrayList<Z>());
- }
-
- /** {@inheritDoc} */
- @Override public int size() {
- return size;
- }
-
- /** {@inheritDoc} */
- @Override public boolean add(Z z) {
- size++;
-
- List<Z> lastBlock = lastBlock();
-
- lastBlock.add(z);
-
- if (lastBlock.size() == maxBlockSize)
- blocks.add(new ArrayList<Z>());
-
- return true;
- }
-
- /** {@inheritDoc} */
- @Override public Z get(int idx) {
- return blocks.get(idx >>> shift).get(idx & mask);
- }
-
- /**
- * @return Last block.
- */
- private List<Z> lastBlock() {
- return last(blocks);
- }
-
- /**
- * @return Evicted block.
- */
- private List<Z> evictFirstBlock() {
- // Remove head block.
- List<Z> res = blocks.remove(0);
-
- size -= res.size();
-
- return res;
- }
- }
-
/**
* Pollable.
*/
@@ -771,39 +684,4 @@ public abstract class GridMergeIndex extends BaseIndex {
E poll(long timeout, TimeUnit unit) throws InterruptedException;
}
- /**
- */
- private static class SourceKey {
- final UUID nodeId;
-
- /** */
- final int segment;
-
- /**
- * @param nodeId Node ID.
- * @param segment Segment.
- */
- SourceKey(UUID nodeId, int segment) {
- this.nodeId = nodeId;
- this.segment = segment;
- }
-
- /** {@inheritDoc} */
- @Override public boolean equals(Object o) {
- if (this == o) return true;
- if (o == null || getClass() != o.getClass()) return false;
-
- SourceKey sourceKey = (SourceKey)o;
-
- if (segment != sourceKey.segment) return false;
- return nodeId.equals(sourceKey.nodeId);
- }
-
- /** {@inheritDoc} */
- @Override public int hashCode() {
- int result = nodeId.hashCode();
- result = 31 * result + segment;
- return result;
- }
- }
}
\ No newline at end of file
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndexIterator.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/ReduceIndexIterator.java
similarity index 90%
rename from modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndexIterator.java
rename to modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/ReduceIndexIterator.java
index 851e1e4..3ff3a15 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndexIterator.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/ReduceIndexIterator.java
@@ -22,7 +22,6 @@ import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.NoSuchElementException;
-import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.internal.processors.cache.mvcc.MvccQueryTracker;
import org.h2.index.Cursor;
@@ -30,9 +29,9 @@ import org.h2.result.Row;
import org.jetbrains.annotations.Nullable;
/**
- * Iterator that transparently and sequentially traverses a bunch of {@link GridMergeIndex} objects.
+ * Iterator that transparently and sequentially traverses a bunch of {@link ReduceIndex} objects.
*/
-class GridMergeIndexIterator implements Iterator<List<?>>, AutoCloseable {
+public class ReduceIndexIterator implements Iterator<List<?>>, AutoCloseable {
/** Reduce query executor. */
private final GridReduceQueryExecutor rdcExec;
@@ -49,7 +48,7 @@ class GridMergeIndexIterator implements Iterator<List<?>>, AutoCloseable {
private final boolean distributedJoins;
/** Iterator over indexes. */
- private final Iterator<GridMergeIndex> idxIter;
+ private final Iterator<ReduceIndex> idxIter;
/** Current cursor. */
private Cursor cursor;
@@ -71,15 +70,14 @@ class GridMergeIndexIterator implements Iterator<List<?>>, AutoCloseable {
* @param run Query run.
* @param qryReqId Query request ID.
* @param distributedJoins Distributed joins.
- * @throws IgniteCheckedException if failed.
*/
- GridMergeIndexIterator(GridReduceQueryExecutor rdcExec,
+ public ReduceIndexIterator(GridReduceQueryExecutor rdcExec,
Collection<ClusterNode> nodes,
ReduceQueryRun run,
long qryReqId,
boolean distributedJoins,
- @Nullable MvccQueryTracker mvccTracker)
- throws IgniteCheckedException {
+ @Nullable MvccQueryTracker mvccTracker
+ ) {
this.rdcExec = rdcExec;
this.nodes = nodes;
this.run = run;
@@ -87,7 +85,7 @@ class GridMergeIndexIterator implements Iterator<List<?>>, AutoCloseable {
this.distributedJoins = distributedJoins;
this.mvccTracker = mvccTracker;
- this.idxIter = run.indexes().iterator();
+ idxIter = run.indexes().iterator();
advance();
}
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndexSorted.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/ReduceIndexSorted.java
similarity index 95%
rename from modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndexSorted.java
rename to modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/ReduceIndexSorted.java
index 482752a..4dcfbbf 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndexSorted.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/ReduceIndexSorted.java
@@ -54,7 +54,7 @@ import static java.util.Collections.emptyIterator;
/**
* Sorted index.
*/
-public final class GridMergeIndexSorted extends GridMergeIndex {
+public final class ReduceIndexSorted extends ReduceIndex {
/** */
private static final IndexType TYPE = IndexType.createNonUnique(false);
@@ -84,7 +84,7 @@ public final class GridMergeIndexSorted extends GridMergeIndex {
private final Condition notEmpty = lock.newCondition();
/** */
- private GridResultPage failPage;
+ private ReduceResultPage failPage;
/** */
private MergeStreamIterator it;
@@ -95,9 +95,9 @@ public final class GridMergeIndexSorted extends GridMergeIndex {
* @param name Index name,
* @param cols Columns.
*/
- public GridMergeIndexSorted(
+ public ReduceIndexSorted(
GridKernalContext ctx,
- GridMergeTable tbl,
+ ReduceTable tbl,
String name,
IndexColumn[] cols
) {
@@ -132,7 +132,7 @@ public final class GridMergeIndexSorted extends GridMergeIndex {
}
/** {@inheritDoc} */
- @Override protected void addPage0(GridResultPage page) {
+ @Override protected void addPage0(ReduceResultPage page) {
if (page.isFail()) {
lock.lock();
@@ -295,7 +295,7 @@ public final class GridMergeIndexSorted extends GridMergeIndex {
/**
* Row stream.
*/
- private final class RowStream implements Pollable<GridResultPage> {
+ private final class RowStream implements Pollable<ReduceResultPage> {
/** */
Iterator<Value[]> iter = emptyIterator();
@@ -303,12 +303,12 @@ public final class GridMergeIndexSorted extends GridMergeIndex {
Row cur;
/** */
- GridResultPage nextPage;
+ ReduceResultPage nextPage;
/**
* @param page Page.
*/
- private void addPage(GridResultPage page) {
+ private void addPage(ReduceResultPage page) {
assert !page.isFail();
if (page.isLast() && page.rowsInPage() == 0)
@@ -330,7 +330,7 @@ public final class GridMergeIndexSorted extends GridMergeIndex {
}
/** {@inheritDoc} */
- @Override public GridResultPage poll(long timeout, TimeUnit unit) throws InterruptedException {
+ @Override public ReduceResultPage poll(long timeout, TimeUnit unit) throws InterruptedException {
long nanos = unit.toNanos(timeout);
lock.lock();
@@ -340,7 +340,7 @@ public final class GridMergeIndexSorted extends GridMergeIndex {
if (failPage != null)
return failPage;
- GridResultPage page = nextPage;
+ ReduceResultPage page = nextPage;
if (page != null) {
// isLast && !isDummyLast
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndexUnsorted.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/ReduceIndexUnsorted.java
similarity index 90%
rename from modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndexUnsorted.java
rename to modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/ReduceIndexUnsorted.java
index 4e9d11a..b0f1c5e 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndexUnsorted.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/ReduceIndexUnsorted.java
@@ -42,12 +42,12 @@ import org.h2.value.Value;
/**
* Unsorted merge index.
*/
-public final class GridMergeIndexUnsorted extends GridMergeIndex {
+public final class ReduceIndexUnsorted extends ReduceIndex {
/** */
private static final IndexType TYPE = IndexType.createScan(false);
/** */
- private final PollableQueue<GridResultPage> queue = new PollableQueue<>();
+ private final PollableQueue<ReduceResultPage> queue = new PollableQueue<>();
/** */
private final AtomicInteger activeSources = new AtomicInteger(-1);
@@ -60,7 +60,7 @@ public final class GridMergeIndexUnsorted extends GridMergeIndex {
* @param tbl Table.
* @param name Index name.
*/
- public GridMergeIndexUnsorted(GridKernalContext ctx, GridMergeTable tbl, String name) {
+ public ReduceIndexUnsorted(GridKernalContext ctx, ReduceTable tbl, String name) {
super(ctx, tbl, name, TYPE, IndexColumn.wrap(tbl.getColumns()));
}
@@ -68,14 +68,14 @@ public final class GridMergeIndexUnsorted extends GridMergeIndex {
* @param ctx Context.
* @return Dummy index instance.
*/
- public static GridMergeIndexUnsorted createDummy(GridKernalContext ctx) {
- return new GridMergeIndexUnsorted(ctx);
+ public static ReduceIndexUnsorted createDummy(GridKernalContext ctx) {
+ return new ReduceIndexUnsorted(ctx);
}
/**
* @param ctx Context.
*/
- private GridMergeIndexUnsorted(GridKernalContext ctx) {
+ private ReduceIndexUnsorted(GridKernalContext ctx) {
super(ctx);
}
@@ -100,7 +100,7 @@ public final class GridMergeIndexUnsorted extends GridMergeIndex {
}
/** {@inheritDoc} */
- @Override protected void addPage0(GridResultPage page) {
+ @Override protected void addPage0(ReduceResultPage page) {
assert page.rowsInPage() > 0 || page.isLast() || page.isFail();
// Do not add empty page to avoid premature stream termination.
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/ReducePartitionMapper.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/ReducePartitionMapper.java
index 608f307..c898a5f 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/ReducePartitionMapper.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/ReducePartitionMapper.java
@@ -578,7 +578,7 @@ public class ReducePartitionMapper {
private Collection<ClusterNode> dataNodes(int grpId, AffinityTopologyVersion topVer) {
Collection<ClusterNode> res = ctx.discovery().cacheGroupAffinityNodes(grpId, topVer);
- return res != null ? res : Collections.<ClusterNode>emptySet();
+ return res != null ? res : Collections.emptySet();
}
/**
@@ -616,7 +616,7 @@ public class ReducePartitionMapper {
* @param cacheIds Cache IDs.
* @return The first partitioned cache context.
*/
- private GridCacheContext<?,?> findFirstPartitioned(List<Integer> cacheIds) {
+ public GridCacheContext<?,?> findFirstPartitioned(List<Integer> cacheIds) {
for (int i = 0; i < cacheIds.size(); i++) {
GridCacheContext<?, ?> cctx = cacheContext(cacheIds.get(i));
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/ReduceQueryRun.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/ReduceQueryRun.java
index d2f7e9a..91ab3e5 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/ReduceQueryRun.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/ReduceQueryRun.java
@@ -34,9 +34,9 @@ import org.jetbrains.annotations.Nullable;
/**
* Query run.
*/
-class ReduceQueryRun {
+public class ReduceQueryRun {
/** */
- private final List<GridMergeIndex> idxs;
+ private final List<ReduceIndex> idxs;
/** */
private CountDownLatch latch;
@@ -72,7 +72,9 @@ class ReduceQueryRun {
Boolean dataPageScanEnabled
) {
this.conn = (JdbcConnection)conn;
- this.idxs = new ArrayList<>(idxsCnt);
+
+ idxs = new ArrayList<>(idxsCnt);
+
this.pageSize = pageSize > 0 ? pageSize : GridCacheTwoStepQuery.DFLT_PAGE_SIZE;
this.selectForUpdateFut = selectForUpdateFut;
this.dataPageScanEnabled = dataPageScanEnabled;
@@ -130,7 +132,7 @@ class ReduceQueryRun {
while (latch.getCount() != 0) // We don't need to wait for all nodes to reply.
latch.countDown();
- for (GridMergeIndex idx : idxs) // Fail all merge indexes.
+ for (ReduceIndex idx : idxs) // Fail all merge indexes.
idx.fail(state.nodeId, state.ex);
}
@@ -199,7 +201,7 @@ class ReduceQueryRun {
/**
* @return Indexes.
*/
- List<GridMergeIndex> indexes() {
+ List<ReduceIndex> indexes() {
return idxs;
}
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridResultPage.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/ReduceResultPage.java
similarity index 96%
rename from modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridResultPage.java
rename to modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/ReduceResultPage.java
index 0cb986b..4437dd6 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridResultPage.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/ReduceResultPage.java
@@ -17,14 +17,6 @@
package org.apache.ignite.internal.processors.query.h2.twostep;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Iterator;
-import java.util.NoSuchElementException;
-import java.util.UUID;
-import javax.cache.CacheException;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.processors.query.h2.twostep.messages.GridQueryNextPageResponse;
@@ -34,12 +26,21 @@ import org.apache.ignite.lang.IgniteClosure;
import org.apache.ignite.plugin.extensions.communication.Message;
import org.h2.value.Value;
+import javax.cache.CacheException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+import java.util.UUID;
+
import static org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2ValueMessageFactory.fillArray;
/**
* Page result.
*/
-public class GridResultPage {
+public class ReduceResultPage {
/** */
private final UUID src;
@@ -61,7 +62,7 @@ public class GridResultPage {
* @param res Response.
*/
@SuppressWarnings("unchecked")
- public GridResultPage(final GridKernalContext ctx, UUID src, GridQueryNextPageResponse res) {
+ public ReduceResultPage(final GridKernalContext ctx, UUID src, GridQueryNextPageResponse res) {
assert src != null;
this.src = src;
@@ -158,7 +159,7 @@ public class GridResultPage {
* @param last Last page for a source.
* @return {@code this}.
*/
- public GridResultPage setLast(boolean last) {
+ public ReduceResultPage setLast(boolean last) {
this.last = last;
return this;
@@ -214,6 +215,6 @@ public class GridResultPage {
/** {@inheritDoc} */
@Override public String toString() {
- return S.toString(GridResultPage.class, this);
+ return S.toString(ReduceResultPage.class, this);
}
}
\ No newline at end of file
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/ReduceScanIndex.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/ReduceScanIndex.java
new file mode 100644
index 0000000..f380145
--- /dev/null
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/ReduceScanIndex.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.h2.twostep;
+
+import org.apache.ignite.internal.processors.query.h2.opt.GridH2ScanIndex;
+import org.h2.engine.Session;
+import org.h2.result.SortOrder;
+import org.h2.table.Column;
+import org.h2.table.TableFilter;
+
+import java.util.HashSet;
+
+/**
+ * Scan index wrapper.
+ */
+public class ReduceScanIndex extends GridH2ScanIndex<ReduceIndex> {
+ /**
+ * @param delegate Delegate.
+ */
+ public ReduceScanIndex(ReduceIndex delegate) {
+ super(delegate);
+ }
+
+ /** {@inheritDoc} */
+ @Override public double getCost(Session session, int[] masks, TableFilter[] filters, int filter,
+ SortOrder sortOrder, HashSet<Column> allColumnsSet) {
+ long rows = getRowCountApproximation();
+
+ return getCostRangeIndex(masks, rows, filters, filter, sortOrder, true, allColumnsSet);
+ }
+}
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/ReduceSourceKey.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/ReduceSourceKey.java
new file mode 100644
index 0000000..e673722
--- /dev/null
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/ReduceSourceKey.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.h2.twostep;
+
+import org.apache.ignite.internal.util.typedef.F;
+
+import java.util.UUID;
+
+/**
+ * Reduce source key for a specific remote data source (remote node + specific segment).
+ */
+public class ReduceSourceKey {
+ /** Node ID. */
+ private final UUID nodeId;
+
+ /** Segment. */
+ private final int segment;
+
+ /**
+ * @param nodeId Node ID.
+ * @param segment Segment.
+ */
+ public ReduceSourceKey(UUID nodeId, int segment) {
+ this.nodeId = nodeId;
+ this.segment = segment;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean equals(Object o) {
+ if (this == o)
+ return true;
+
+ if (o == null || getClass() != o.getClass())
+ return false;
+
+ ReduceSourceKey other = (ReduceSourceKey)o;
+
+ return F.eq(segment, other.segment) && F.eq(nodeId, other.nodeId);
+ }
+
+ /** {@inheritDoc} */
+ @Override public int hashCode() {
+ return 31 * nodeId.hashCode() + segment;
+ }
+}
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeTable.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/ReduceTable.java
similarity index 79%
rename from modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeTable.java
rename to modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/ReduceTable.java
index 681917f..c520c2a 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeTable.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/ReduceTable.java
@@ -18,7 +18,6 @@
package org.apache.ignite.internal.processors.query.h2.twostep;
import java.util.ArrayList;
-import java.util.HashSet;
import org.apache.ignite.internal.processors.query.h2.opt.GridH2ScanIndex;
import org.apache.ignite.internal.util.typedef.F;
@@ -28,24 +27,24 @@ import org.h2.index.Index;
import org.h2.index.IndexType;
import org.h2.message.DbException;
import org.h2.result.Row;
-import org.h2.result.SortOrder;
-import org.h2.table.Column;
import org.h2.table.IndexColumn;
import org.h2.table.TableBase;
-import org.h2.table.TableFilter;
import org.h2.table.TableType;
/**
- * Merge table for distributed queries.
+ * Table for reduce phase. Created for every splitted map query. Tables are created in H2 through SQL statement.
+ * In order to avoid overhead on SQL execution for every query, we create {@link ReduceTableWrapper} instead
+ * and set real {@code ReduceTable} through thread-local during query execution. This allow us to have only
+ * very limited number of physical tables in H2 for all user requests.
*/
-public class GridMergeTable extends TableBase {
+public class ReduceTable extends TableBase {
/** */
private ArrayList<Index> idxs;
/**
* @param data Data.
*/
- public GridMergeTable(CreateTableData data) {
+ public ReduceTable(CreateTableData data) {
super(data);
}
@@ -61,16 +60,16 @@ public class GridMergeTable extends TableBase {
/**
* @return Merge index.
*/
- public GridMergeIndex getMergeIndex() {
- return (GridMergeIndex)idxs.get(idxs.size() - 1); // Sorted index must be the last.
+ public ReduceIndex getMergeIndex() {
+ return (ReduceIndex)idxs.get(idxs.size() - 1); // Sorted index must be the last.
}
/**
* @param idx Index.
* @return Scan index.
*/
- public static GridH2ScanIndex<GridMergeIndex> createScanIndex(GridMergeIndex idx) {
- return new ScanIndex(idx);
+ public static GridH2ScanIndex<ReduceIndex> createScanIndex(ReduceIndex idx) {
+ return new ReduceScanIndex(idx);
}
/** {@inheritDoc} */
@@ -178,24 +177,4 @@ public class GridMergeTable extends TableBase {
@Override public void checkRename() {
throw DbException.getUnsupportedException("rename");
}
-
- /**
- * Scan index wrapper.
- */
- private static class ScanIndex extends GridH2ScanIndex<GridMergeIndex> {
- /**
- * @param delegate Delegate.
- */
- public ScanIndex(GridMergeIndex delegate) {
- super(delegate);
- }
-
- /** {@inheritDoc} */
- @Override public double getCost(Session session, int[] masks, TableFilter[] filters, int filter,
- SortOrder sortOrder, HashSet<Column> allColumnsSet) {
- long rows = getRowCountApproximation();
-
- return getCostRangeIndex(masks, rows, filters, filter, sortOrder, true, allColumnsSet);
- }
- }
}
\ No newline at end of file
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/ReduceTableEngine.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/ReduceTableEngine.java
new file mode 100644
index 0000000..1ca2e48
--- /dev/null
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/ReduceTableEngine.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.h2.twostep;
+
+import org.h2.api.TableEngine;
+import org.h2.command.ddl.CreateTableData;
+import org.h2.table.Table;
+
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.sql.Statement;
+
+import static org.apache.ignite.internal.processors.query.h2.sql.GridSqlQuerySplitter.mergeTableIdentifier;
+
+/**
+ * Engine to create reduce table.
+ */
+public class ReduceTableEngine implements TableEngine {
+ /** */
+ private static final ThreadLocal<ReduceTableWrapper> CREATED_TBL = new ThreadLocal<>();
+
+ /**
+ * Create merge table over the given connection with provided index.
+ *
+ * @param conn Connection.
+ * @param idx Index.
+ * @return Created table.
+ */
+ public static ReduceTableWrapper create(Connection conn, int idx) {
+ try (Statement stmt = conn.createStatement()) {
+ stmt.executeUpdate("CREATE TABLE " + mergeTableIdentifier(idx) +
+ "(fake BOOL) ENGINE \"" + ReduceTableEngine.class.getName() + '"');
+ }
+ catch (SQLException e) {
+ throw new IllegalStateException(e);
+ }
+
+ ReduceTableWrapper tbl = CREATED_TBL.get();
+
+ assert tbl != null;
+
+ CREATED_TBL.remove();
+
+ return tbl;
+ }
+
+ /** {@inheritDoc} */
+ @Override public Table createTable(CreateTableData d) {
+ assert CREATED_TBL.get() == null;
+
+ ReduceTableWrapper tbl = new ReduceTableWrapper(
+ d.schema,
+ d.id,
+ d.tableName,
+ d.persistIndexes,
+ d.persistData
+ );
+
+ CREATED_TBL.set(tbl);
+
+ return tbl;
+ }
+}
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridThreadLocalTable.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/ReduceTableWrapper.java
similarity index 85%
rename from modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridThreadLocalTable.java
rename to modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/ReduceTableWrapper.java
index b01c3d4..ddc0898 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridThreadLocalTable.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/ReduceTableWrapper.java
@@ -21,8 +21,6 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
import javax.cache.CacheException;
-import org.h2.api.TableEngine;
-import org.h2.command.ddl.CreateTableData;
import org.h2.engine.DbObject;
import org.h2.engine.Session;
import org.h2.index.Index;
@@ -41,11 +39,12 @@ import org.h2.table.TableType;
import org.h2.value.Value;
/**
- * Thread local table wrapper for another table instance.
+ * Thread local table wrapper for real reducer table. All reduce queries share the same small set of this fake tables.
+ * During reduce query execution every thread installs it's own real reduce tables into thread-local storage.
*/
-public class GridThreadLocalTable extends Table {
+public class ReduceTableWrapper extends Table {
/** Delegate table */
- private final ThreadLocal<Table> tbl = new ThreadLocal<>();
+ private final ThreadLocal<ReduceTable> tbl = new ThreadLocal<>();
/**
* @param schema Schema.
@@ -54,14 +53,14 @@ public class GridThreadLocalTable extends Table {
* @param persistIndexes Persist indexes.
* @param persistData Persist data.
*/
- public GridThreadLocalTable(Schema schema, int id, String name, boolean persistIndexes, boolean persistData) {
+ public ReduceTableWrapper(Schema schema, int id, String name, boolean persistIndexes, boolean persistData) {
super(schema, id, name, persistIndexes, persistData);
}
/**
* @param t Table or {@code null} to reset existing.
*/
- public void innerTable(Table t) {
+ public void innerTable(ReduceTable t) {
if (t == null)
tbl.remove();
else
@@ -259,37 +258,4 @@ public class GridThreadLocalTable extends Table {
@Override public void checkRename() {
throw DbException.getUnsupportedException("rename");
}
-
- /**
- * Engine.
- */
- public static class Engine implements TableEngine {
- /** */
- private static ThreadLocal<GridThreadLocalTable> createdTbl = new ThreadLocal<>();
-
- /**
- * @return Created table.
- */
- public static GridThreadLocalTable getCreated() {
- GridThreadLocalTable tbl = createdTbl.get();
-
- assert tbl != null;
-
- createdTbl.remove();
-
- return tbl;
- }
-
- /** {@inheritDoc} */
- @Override public Table createTable(CreateTableData d) {
- assert createdTbl.get() == null;
-
- GridThreadLocalTable tbl = new GridThreadLocalTable(d.schema, d.id, d.tableName, d.persistIndexes,
- d.persistData);
-
- createdTbl.set(tbl);
-
- return tbl;
- }
- }
}
\ No newline at end of file
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/IgniteSqlSplitterSelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/IgniteSqlSplitterSelfTest.java
index d6937b1..73964a4 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/IgniteSqlSplitterSelfTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/IgniteSqlSplitterSelfTest.java
@@ -47,7 +47,7 @@ import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.internal.processors.cache.index.AbstractIndexingCommonTest;
-import org.apache.ignite.internal.processors.query.h2.twostep.GridMergeIndex;
+import org.apache.ignite.internal.processors.query.h2.twostep.ReduceIndex;
import org.apache.ignite.internal.util.GridRandom;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.X;
@@ -575,7 +575,7 @@ public class IgniteSqlSplitterSelfTest extends AbstractIndexingCommonTest {
Integer.class, Value.class));
try {
- GridTestUtils.setFieldValue(null, GridMergeIndex.class, "PREFETCH_SIZE", 8);
+ GridTestUtils.setFieldValue(null, ReduceIndex.class, "PREFETCH_SIZE", 8);
Random rnd = new GridRandom();
@@ -625,7 +625,7 @@ public class IgniteSqlSplitterSelfTest extends AbstractIndexingCommonTest {
}
}
finally {
- GridTestUtils.setFieldValue(null, GridMergeIndex.class, "PREFETCH_SIZE", 1024);
+ GridTestUtils.setFieldValue(null, ReduceIndex.class, "PREFETCH_SIZE", 1024);
c.destroy();
}
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/twostep/NoneOrSinglePartitionsQueryOptimizationsTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/twostep/NoneOrSinglePartitionsQueryOptimizationsTest.java
index 7e0aaa5..68c3e09 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/twostep/NoneOrSinglePartitionsQueryOptimizationsTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/twostep/NoneOrSinglePartitionsQueryOptimizationsTest.java
@@ -467,7 +467,7 @@ public class NoneOrSinglePartitionsQueryOptimizationsTest extends GridCommonAbst
if (expMergeTbl)
assertTrue(innerIter instanceof H2ResultSetIterator);
else
- assertTrue(innerIter instanceof GridMergeIndexIterator);
+ assertTrue(innerIter instanceof ReduceIndexIterator);
List<List<?>> all = new ArrayList<>();