You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vo...@apache.org on 2019/02/04 13:34:46 UTC
[ignite] branch master updated: IGNITE-11185: SQL: moved index
distributed join logic to H2TreeIndex. Minor changes to collocation model.
This closes #6012.
This is an automated email from the ASF dual-hosted git repository.
vozerov pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new 7b61c32 IGNITE-11185: SQL: moved index distributed join logic to H2TreeIndex. Minor changes to collocation model. This closes #6012.
7b61c32 is described below
commit 7b61c32a1c5ab443b315d5a87fec43d19b39e6fc
Author: devozerov <vo...@gridgain.com>
AuthorDate: Mon Feb 4 16:32:52 2019 +0300
IGNITE-11185: SQL: moved index distributed join logic to H2TreeIndex. Minor changes to collocation model. This closes #6012.
---
.../query/h2/opt/GridH2SpatialIndex.java | 8 +-
.../internal/processors/query/h2/H2Utils.java | 15 +
.../query/h2/database/H2PkHashIndex.java | 3 +-
.../query/h2/database/H2TreeClientIndex.java | 49 +--
.../processors/query/h2/database/H2TreeIndex.java | 423 ++++++++++++++++++--
.../processors/query/h2/opt/GridH2IndexBase.java | 443 +--------------------
.../query/h2/opt/join/BroadcastCursor.java | 4 +-
.../query/h2/opt/join/CollocationModel.java | 219 ++++------
.../h2/opt/join/CollocationModelAffinity.java | 32 ++
.../h2/opt/join/CollocationModelMultiplier.java | 54 +++
.../query/h2/opt/join/CollocationModelType.java | 75 ++++
.../query/h2/opt/join/DistributedLookupBatch.java | 17 +-
.../processors/query/h2/opt/join/RangeSource.java | 6 +-
.../processors/query/h2/opt/join/RangeStream.java | 6 +-
.../query/h2/twostep/GridMergeIndexSorted.java | 4 +-
.../query/h2/twostep/GridReduceQueryExecutor.java | 3 +-
16 files changed, 709 insertions(+), 652 deletions(-)
diff --git a/modules/geospatial/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2SpatialIndex.java b/modules/geospatial/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2SpatialIndex.java
index 4af31fb..f2f0b01 100644
--- a/modules/geospatial/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2SpatialIndex.java
+++ b/modules/geospatial/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2SpatialIndex.java
@@ -27,6 +27,7 @@ import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.query.QueryUtils;
import org.apache.ignite.internal.processors.query.h2.H2Cursor;
import org.apache.ignite.internal.processors.query.h2.H2Utils;
@@ -60,6 +61,9 @@ import org.locationtech.jts.geom.Geometry;
*/
@SuppressWarnings("unused"/*reflection*/)
public class GridH2SpatialIndex extends GridH2IndexBase implements SpatialIndex {
+ /** Cache context. */
+ private final GridCacheContext ctx;
+
/** */
private final ReadWriteLock lock = new ReentrantReadWriteLock();
@@ -173,7 +177,7 @@ public class GridH2SpatialIndex extends GridH2IndexBase implements SpatialIndex
assert key != null;
- final int seg = segmentForRow(row);
+ final int seg = segmentForRow(ctx, row);
Long rowId = keyToId.get(key);
@@ -249,7 +253,7 @@ public class GridH2SpatialIndex extends GridH2IndexBase implements SpatialIndex
assert oldRow != null;
- final int seg = segmentForRow(row);
+ final int seg = segmentForRow(ctx, row);
if (!segments[seg].remove(getEnvelope(row, rowId), rowId))
throw DbException.throwInternalError("row not found");
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/H2Utils.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/H2Utils.java
index 7078b43..684ecf7 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/H2Utils.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/H2Utils.java
@@ -32,6 +32,7 @@ import java.sql.Timestamp;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
+import java.util.Comparator;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
@@ -728,4 +729,18 @@ public class H2Utils {
return prepared(s);
}
}
+
+ /**
+ * @param arr Array.
+ * @param off Offset.
+ * @param cmp Comparator.
+ */
+ public static <Z> void bubbleUp(Z[] arr, int off, Comparator<Z> cmp) {
+ for (int i = off, last = arr.length - 1; i < last; i++) {
+ if (cmp.compare(arr[i], arr[i + 1]) <= 0)
+ break;
+
+ U.swap(arr, i, i + 1);
+ }
+ }
}
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/H2PkHashIndex.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/H2PkHashIndex.java
index 3216b94..7fa877e 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/H2PkHashIndex.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/H2PkHashIndex.java
@@ -70,6 +70,7 @@ public class H2PkHashIndex extends GridH2IndexBase {
* @param colsList Index columns.
* @param segments Segments.
*/
+ @SuppressWarnings("ZeroLengthArrayAllocation")
public H2PkHashIndex(
GridCacheContext<?, ?> cctx,
GridH2Table tbl,
@@ -81,7 +82,7 @@ public class H2PkHashIndex extends GridH2IndexBase {
this.segments = segments;
- IndexColumn[] cols = colsList.toArray(new IndexColumn[colsList.size()]);
+ IndexColumn[] cols = colsList.toArray(new IndexColumn[0]);
IndexColumn.mapColumns(cols, tbl);
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/H2TreeClientIndex.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/H2TreeClientIndex.java
index 1c1104b..726ba10 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/H2TreeClientIndex.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/H2TreeClientIndex.java
@@ -18,6 +18,8 @@
package org.apache.ignite.internal.processors.query.h2.database;
import java.util.List;
+
+import org.apache.ignite.IgniteException;
import org.apache.ignite.internal.processors.query.IgniteSQLException;
import org.apache.ignite.internal.processors.query.h2.opt.H2CacheRow;
import org.apache.ignite.internal.processors.query.h2.opt.GridH2Table;
@@ -31,25 +33,15 @@ import org.h2.table.IndexColumn;
* We need indexes on an not affinity nodes. The index shouldn't contains any data.
*/
public class H2TreeClientIndex extends H2TreeIndexBase {
-
- /**
- *
- */
- public static final IgniteSQLException SHOULDNT_BE_INVOKED_EXCEPTION = new IgniteSQLException("Shouldn't be invoked, due to it's not affinity node");
-
/**
* @param tbl Table.
* @param name Index name.
* @param pk Primary key.
* @param colsList Index columns.
*/
- public H2TreeClientIndex(
- GridH2Table tbl,
- String name,
- boolean pk,
- List<IndexColumn> colsList
- ) {
- IndexColumn[] cols = colsList.toArray(new IndexColumn[colsList.size()]);
+ @SuppressWarnings("ZeroLengthArrayAllocation")
+ public H2TreeClientIndex(GridH2Table tbl, String name, boolean pk, List<IndexColumn> colsList) {
+ IndexColumn[] cols = colsList.toArray(new IndexColumn[0]);
IndexColumn.mapColumns(cols, tbl);
@@ -59,51 +51,48 @@ public class H2TreeClientIndex extends H2TreeIndexBase {
/** {@inheritDoc} */
@Override public void refreshColumnIds() {
- //do nothing.
- }
-
- /** {@inheritDoc} */
- @Override public void destroy(boolean rmvIndex) {
- //do nothing.
+ // Do nothing.
}
/** {@inheritDoc} */
@Override public int segmentsCount() {
- throw SHOULDNT_BE_INVOKED_EXCEPTION;
+ throw unsupported();
}
/** {@inheritDoc} */
@Override public Cursor find(Session ses, SearchRow lower, SearchRow upper) {
- throw SHOULDNT_BE_INVOKED_EXCEPTION;
+ throw unsupported();
}
/** {@inheritDoc} */
@Override public H2CacheRow put(H2CacheRow row) {
- throw SHOULDNT_BE_INVOKED_EXCEPTION;
+ throw unsupported();
}
/** {@inheritDoc} */
@Override public boolean putx(H2CacheRow row) {
- throw SHOULDNT_BE_INVOKED_EXCEPTION;
+ throw unsupported();
}
/** {@inheritDoc} */
@Override public boolean removex(SearchRow row) {
- throw SHOULDNT_BE_INVOKED_EXCEPTION;
+ throw unsupported();
}
/** {@inheritDoc} */
@Override public long getRowCount(Session ses) {
- throw SHOULDNT_BE_INVOKED_EXCEPTION;
+ throw unsupported();
}
/** {@inheritDoc} */
- @Override public Cursor findFirstOrLast(Session session, boolean b) {
- throw SHOULDNT_BE_INVOKED_EXCEPTION;
+ @Override public Cursor findFirstOrLast(Session session, boolean first) {
+ throw unsupported();
}
- /** {@inheritDoc} */
- @Override protected H2Tree treeForRead(int segment) {
- throw SHOULDNT_BE_INVOKED_EXCEPTION;
+ /**
+ * @return Exception about unsupported operation.
+ */
+ private static IgniteException unsupported() {
+ return new IgniteSQLException("Shouldn't be invoked on non-affinity node.");
}
}
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/H2TreeIndex.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/H2TreeIndex.java
index d52bec4..2744923 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/H2TreeIndex.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/H2TreeIndex.java
@@ -18,12 +18,21 @@
package org.apache.ignite.internal.processors.query.h2.database;
import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Iterator;
import java.util.List;
+import java.util.Map;
+import java.util.UUID;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.IgniteSystemProperties;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.internal.GridKernalContext;
+import org.apache.ignite.internal.GridTopic;
+import org.apache.ignite.internal.managers.communication.GridIoPolicy;
+import org.apache.ignite.internal.managers.communication.GridMessageListener;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.mvcc.MvccSnapshot;
import org.apache.ignite.internal.processors.cache.persistence.IgniteCacheDatabaseSharedManager;
@@ -33,26 +42,60 @@ import org.apache.ignite.internal.processors.cache.persistence.tree.io.PageIO;
import org.apache.ignite.internal.processors.query.GridQueryTypeDescriptor;
import org.apache.ignite.internal.processors.query.h2.H2Cursor;
import org.apache.ignite.internal.processors.query.h2.H2RowCache;
+import org.apache.ignite.internal.processors.query.h2.H2Utils;
import org.apache.ignite.internal.processors.query.h2.opt.GridH2Cursor;
import org.apache.ignite.internal.processors.query.h2.opt.GridH2QueryContext;
+import org.apache.ignite.internal.processors.query.h2.opt.GridH2RowDescriptor;
import org.apache.ignite.internal.processors.query.h2.opt.H2CacheRow;
import org.apache.ignite.internal.processors.query.h2.opt.H2Row;
import org.apache.ignite.internal.processors.query.h2.opt.GridH2Table;
+import org.apache.ignite.internal.processors.query.h2.opt.join.CursorIteratorWrapper;
+import org.apache.ignite.internal.processors.query.h2.opt.join.DistributedJoinContext;
+import org.apache.ignite.internal.processors.query.h2.opt.join.DistributedLookupBatch;
+import org.apache.ignite.internal.processors.query.h2.opt.join.RangeSource;
+import org.apache.ignite.internal.processors.query.h2.opt.join.RangeStream;
+import org.apache.ignite.internal.processors.query.h2.opt.join.SegmentKey;
+import org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2IndexRangeRequest;
+import org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2IndexRangeResponse;
+import org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2RowMessage;
+import org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2RowRange;
+import org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2RowRangeBounds;
+import org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2ValueMessage;
+import org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2ValueMessageFactory;
import org.apache.ignite.internal.stat.IoStatisticsHolder;
import org.apache.ignite.internal.stat.IoStatisticsType;
+import org.apache.ignite.internal.util.GridSpinBusyLock;
+import org.apache.ignite.internal.util.IgniteTree;
+import org.apache.ignite.internal.util.lang.GridCursor;
+import org.apache.ignite.internal.util.typedef.CIX2;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteBiTuple;
+import org.apache.ignite.plugin.extensions.communication.Message;
import org.apache.ignite.spi.indexing.IndexingQueryCacheFilter;
import org.apache.ignite.spi.indexing.IndexingQueryFilter;
import org.h2.engine.Session;
import org.h2.index.Cursor;
+import org.h2.index.IndexCondition;
+import org.h2.index.IndexLookupBatch;
import org.h2.index.IndexType;
import org.h2.index.SingleRowCursor;
import org.h2.message.DbException;
import org.h2.result.SearchRow;
import org.h2.table.IndexColumn;
+import org.h2.table.TableFilter;
+import org.h2.value.Value;
import org.jetbrains.annotations.Nullable;
+import javax.cache.CacheException;
+
+import static java.util.Collections.singletonList;
+import static org.apache.ignite.internal.processors.query.h2.opt.GridH2QueryType.MAP;
+import static org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2IndexRangeResponse.STATUS_ERROR;
+import static org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2IndexRangeResponse.STATUS_NOT_FOUND;
+import static org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2IndexRangeResponse.STATUS_OK;
+import static org.h2.result.Row.MEMORY_CALCULATE;
+
/**
* H2 Index over {@link BPlusTree}.
*/
@@ -67,6 +110,9 @@ public class H2TreeIndex extends H2TreeIndexBase {
/** */
private final List<InlineIndexHelper> inlineIdxs;
+ /** Kernal context. */
+ private final GridKernalContext ctx;
+
/** Cache context. */
private final GridCacheContext<?, ?> cctx;
@@ -88,10 +134,23 @@ public class H2TreeIndex extends H2TreeIndexBase {
/** */
private final IgniteLogger log;
+ /** */
+ private final Object msgTopic;
+
+ /** */
+ private final GridMessageListener msgLsnr;
+
+ /** */
+ private final CIX2<ClusterNode,Message> locNodeHnd = new CIX2<ClusterNode,Message>() {
+ @Override public void applyx(ClusterNode locNode, Message msg) {
+ onMessage0(locNode.id(), msg);
+ }
+ };
+
/**
* @param cctx Cache context.
* @param rowCache Row cache.
- * @param tbl Table.
+ * @param table Table.
* @param idxName Index name.
* @param pk Primary key.
* @param affinityKey {@code true} for affinity key.
@@ -101,10 +160,11 @@ public class H2TreeIndex extends H2TreeIndexBase {
* @param segmentsCnt Count of tree segments.
* @throws IgniteCheckedException If failed.
*/
+ @SuppressWarnings("MapReplaceableByEnumMap")
public H2TreeIndex(
GridCacheContext<?, ?> cctx,
@Nullable H2RowCache rowCache,
- GridH2Table tbl,
+ GridH2Table table,
String idxName,
boolean pk,
boolean affinityKey,
@@ -115,23 +175,25 @@ public class H2TreeIndex extends H2TreeIndexBase {
) throws IgniteCheckedException {
assert segmentsCnt > 0 : segmentsCnt;
- this.cctx = cctx;
+ ctx = cctx.kernalContext();
+ log = ctx.log(getClass());
- this.log = cctx.logger(getClass().getName());
+ this.cctx = cctx;
this.pk = pk;
this.affinityKey = affinityKey;
- this.tblName = tbl.getName();
+ tblName = table.getName();
+
this.idxName = idxName;
- this.table = tbl;
+ this.table = table;
- GridQueryTypeDescriptor typeDesc = tbl.rowDescriptor().type();
+ GridQueryTypeDescriptor typeDesc = table.rowDescriptor().type();
int typeId = cctx.binaryMarshaller() ? typeDesc.typeId() : typeDesc.valueClass().hashCode();
- treeName = BPlusTree.treeName((tbl.rowDescriptor() == null ? "" : typeId + "_") + idxName, "H2Tree");
+ treeName = BPlusTree.treeName((table.rowDescriptor() == null ? "" : typeId + "_") + idxName, "H2Tree");
IndexColumnsInfo unwrappedColsInfo = new IndexColumnsInfo(unwrappedColsList, inlineSize);
@@ -161,11 +223,11 @@ public class H2TreeIndex extends H2TreeIndexBase {
segments[i] = new H2Tree(
cctx,
- tbl,
+ table,
treeName,
idxName,
tblName,
- tbl.cacheName(),
+ table.cacheName(),
cctx.offheap().reuseListForIndex(treeName),
cctx.groupId(),
cctx.dataRegion().pageMemory(),
@@ -198,12 +260,31 @@ public class H2TreeIndex extends H2TreeIndexBase {
inlineIdxs = colsInfo.inlineIdx();
- IndexColumn.mapColumns(cols, tbl);
+ IndexColumn.mapColumns(cols, table);
- initBaseIndex(tbl, 0, idxName, cols,
+ initBaseIndex(table, 0, idxName, cols,
pk ? IndexType.createPrimaryKey(false, false) : IndexType.createNonUnique(false, false, false));
- initDistributedJoinMessaging(tbl);
+ // Initialize distributed joins.
+ msgTopic = new IgniteBiTuple<>(GridTopic.TOPIC_QUERY, table.identifierString() + '.' + getName());
+
+ msgLsnr = new GridMessageListener() {
+ @Override public void onMessage(UUID nodeId, Object msg, byte plc) {
+ GridSpinBusyLock l = table.rowDescriptor().indexing().busyLock();
+
+ if (!l.enterBusy())
+ return;
+
+ try {
+ onMessage0(nodeId, msg);
+ }
+ finally {
+ l.leaveBusy();
+ }
+ }
+ };
+
+ ctx.io().addMessageListener(msgTopic, msgLsnr);
}
/**
@@ -303,7 +384,7 @@ public class H2TreeIndex extends H2TreeIndexBase {
try {
InlineIndexHelper.setCurrentInlineIndexes(inlineIdxs);
- int seg = segmentForRow(row);
+ int seg = segmentForRow(cctx, row);
H2Tree tree = treeForRead(seg);
@@ -324,7 +405,7 @@ public class H2TreeIndex extends H2TreeIndexBase {
try {
InlineIndexHelper.setCurrentInlineIndexes(inlineIdxs);
- int seg = segmentForRow(row);
+ int seg = segmentForRow(cctx, row);
H2Tree tree = treeForRead(seg);
@@ -347,7 +428,7 @@ public class H2TreeIndex extends H2TreeIndexBase {
try {
InlineIndexHelper.setCurrentInlineIndexes(inlineIdxs);
- int seg = segmentForRow(row);
+ int seg = segmentForRow(cctx, row);
H2Tree tree = treeForRead(seg);
@@ -411,17 +492,24 @@ public class H2TreeIndex extends H2TreeIndexBase {
throw new IgniteException(e);
}
finally {
- super.destroy(rmvIdx);
+ if (msgLsnr != null)
+ ctx.io().removeMessageListener(msgTopic, msgLsnr);
}
}
- /** {@inheritDoc} */
- @Override protected H2Tree treeForRead(int segment) {
+ /**
+ * @param segment Segment Id.
+ * @return Snapshot for requested segment if there is one.
+ */
+ private H2Tree treeForRead(int segment) {
return segments[segment];
}
- /** {@inheritDoc} */
- @Override protected BPlusTree.TreeRowClosure<H2Row, H2Row> filter(GridH2QueryContext qctx) {
+ /**
+ * @param qctx Query context.
+ * @return Row filter.
+ */
+ private BPlusTree.TreeRowClosure<H2Row, H2Row> filter(GridH2QueryContext qctx) {
if (qctx == null) {
assert !cctx.mvccEnabled();
@@ -510,14 +598,299 @@ public class H2TreeIndex extends H2TreeIndexBase {
inlineIdxs.set(pos, inlineHelpers.get(pos));
}
+ /** {@inheritDoc} */
+ @Override public IndexLookupBatch createLookupBatch(TableFilter[] filters, int filter) {
+ GridH2QueryContext qctx = GridH2QueryContext.get();
+
+ if (qctx == null || qctx.distributedJoinContext() == null || !getTable().isPartitioned())
+ return null;
+
+ IndexColumn affCol = getTable().getAffinityKeyColumn();
+ GridH2RowDescriptor desc = getTable().rowDescriptor();
+
+ int affColId = -1;
+ boolean ucast = false;
+
+ if (affCol != null) {
+ affColId = affCol.column.getColumnId();
+ int[] masks = filters[filter].getMasks();
+
+ if (masks != null) {
+ ucast = (masks[affColId] & IndexCondition.EQUALITY) != 0 ||
+ desc.checkKeyIndexCondition(masks, IndexCondition.EQUALITY);
+ }
+ }
+
+ GridCacheContext<?, ?> cctx = getTable().rowDescriptor().context();
+
+ return new DistributedLookupBatch(this, cctx, ucast, affColId);
+ }
+
+ /**
+ * @param nodes Nodes.
+ * @param msg Message.
+ */
+ public void send(Collection<ClusterNode> nodes, Message msg) {
+ boolean res = getTable().rowDescriptor().indexing().send(msgTopic,
+ -1,
+ nodes,
+ msg,
+ null,
+ locNodeHnd,
+ GridIoPolicy.IDX_POOL,
+ false
+ );
+
+ if (!res)
+ throw H2Utils.retryException("Failed to send message to nodes: " + nodes);
+ }
+
+ /**
+ * @param nodeId Source node ID.
+ * @param msg Message.
+ */
+ private void onMessage0(UUID nodeId, Object msg) {
+ ClusterNode node = ctx.discovery().node(nodeId);
+
+ if (node == null)
+ return;
+
+ try {
+ if (msg instanceof GridH2IndexRangeRequest)
+ onIndexRangeRequest(node, (GridH2IndexRangeRequest)msg);
+ else if (msg instanceof GridH2IndexRangeResponse)
+ onIndexRangeResponse(node, (GridH2IndexRangeResponse)msg);
+ }
+ catch (Throwable th) {
+ U.error(log, "Failed to handle message[nodeId=" + nodeId + ", msg=" + msg + "]", th);
+
+ if (th instanceof Error)
+ throw th;
+ }
+ }
+
+ /**
+ * @param node Requesting node.
+ * @param msg Request message.
+ */
+ private void onIndexRangeRequest(final ClusterNode node, final GridH2IndexRangeRequest msg) {
+ GridH2IndexRangeResponse res = new GridH2IndexRangeResponse();
+
+ res.originNodeId(msg.originNodeId());
+ res.queryId(msg.queryId());
+ res.originSegmentId(msg.originSegmentId());
+ res.segment(msg.segment());
+ res.batchLookupId(msg.batchLookupId());
+
+ GridH2QueryContext qctx = GridH2QueryContext.get(
+ ctx.localNodeId(),
+ msg.originNodeId(),
+ msg.queryId(),
+ msg.originSegmentId(),
+ MAP
+ );
+
+ if (qctx == null)
+ res.status(STATUS_NOT_FOUND);
+ else {
+ DistributedJoinContext joinCtx = qctx.distributedJoinContext();
+
+ assert joinCtx != null;
+
+ try {
+ RangeSource src;
+
+ if (msg.bounds() != null) {
+ // This is the first request containing all the search rows.
+ assert !msg.bounds().isEmpty() : "empty bounds";
+
+ src = new RangeSource(this, msg.bounds(), msg.segment(), filter(qctx));
+ }
+ else {
+ // This is request to fetch next portion of data.
+ src = joinCtx.getSource(node.id(), msg.segment(), msg.batchLookupId());
+
+ assert src != null;
+ }
+
+ List<GridH2RowRange> ranges = new ArrayList<>();
+
+ int maxRows = joinCtx.pageSize();
+
+ assert maxRows > 0 : maxRows;
+
+ while (maxRows > 0) {
+ GridH2RowRange range = src.next(maxRows);
+
+ if (range == null)
+ break;
+
+ ranges.add(range);
+
+ if (range.rows() != null)
+ maxRows -= range.rows().size();
+ }
+
+ assert !ranges.isEmpty();
+
+ if (src.hasMoreRows()) {
+ // Save source for future fetches.
+ if (msg.bounds() != null)
+ joinCtx.putSource(node.id(), msg.segment(), msg.batchLookupId(), src);
+ }
+ else if (msg.bounds() == null) {
+ // Drop saved source.
+ joinCtx.putSource(node.id(), msg.segment(), msg.batchLookupId(), null);
+ }
+
+ res.ranges(ranges);
+ res.status(STATUS_OK);
+ }
+ catch (Throwable th) {
+ U.error(log, "Failed to process request: " + msg, th);
+
+ res.error(th.getClass() + ": " + th.getMessage());
+ res.status(STATUS_ERROR);
+ }
+ }
+
+ send(singletonList(node), res);
+ }
+
/**
+ * @param node Responded node.
+ * @param msg Response message.
+ */
+ private void onIndexRangeResponse(ClusterNode node, GridH2IndexRangeResponse msg) {
+ GridH2QueryContext qctx = GridH2QueryContext.get(
+ ctx.localNodeId(),
+ msg.originNodeId(),
+ msg.queryId(),
+ msg.originSegmentId(),
+ MAP
+ );
+
+ if (qctx == null)
+ return;
+
+ DistributedJoinContext joinCtx = qctx.distributedJoinContext();
+
+ assert joinCtx != null;
+
+ Map<SegmentKey, RangeStream> streams = joinCtx.getStreams(msg.batchLookupId());
+
+ if (streams == null)
+ return;
+
+ RangeStream stream = streams.get(new SegmentKey(node, msg.segment()));
+
+ assert stream != null;
+
+ stream.onResponse(msg);
+ }
+
+ /**
+ * Find rows for the segments (distributed joins).
*
+ * @param bounds Bounds.
+ * @param segment Segment.
+ * @param filter Filter.
+ * @return Iterator.
*/
+ @SuppressWarnings("unchecked")
+ public Iterator<H2Row> findForSegment(GridH2RowRangeBounds bounds, int segment,
+ BPlusTree.TreeRowClosure<H2Row, H2Row> filter) {
+ SearchRow first = toSearchRow(bounds.first());
+ SearchRow last = toSearchRow(bounds.last());
+
+ IgniteTree t = treeForRead(segment);
+
+ try {
+ GridCursor<H2Row> range = ((BPlusTree)t).find(first, last, filter, null);
+
+ if (range == null)
+ range = H2Utils.EMPTY_CURSOR;
+
+ H2Cursor cur = new H2Cursor(range);
+
+ return new CursorIteratorWrapper(cur);
+ }
+ catch (IgniteCheckedException e) {
+ throw DbException.convert(e);
+ }
+ }
+
+ /**
+ * @param msg Row message.
+ * @return Search row.
+ */
+ private SearchRow toSearchRow(GridH2RowMessage msg) {
+ if (msg == null)
+ return null;
+
+ Value[] vals = new Value[getTable().getColumns().length];
+
+ assert vals.length > 0;
+
+ List<GridH2ValueMessage> msgVals = msg.values();
+
+ for (int i = 0; i < indexColumns.length; i++) {
+ if (i >= msgVals.size())
+ continue;
+
+ try {
+ vals[indexColumns[i].column.getColumnId()] = msgVals.get(i).value(ctx);
+ }
+ catch (IgniteCheckedException e) {
+ throw new CacheException(e);
+ }
+ }
+
+ return database.createRow(vals, MEMORY_CALCULATE);
+ }
+
+ /**
+ * @param row Search row.
+ * @return Row message.
+ */
+ public GridH2RowMessage toSearchRowMessage(SearchRow row) {
+ if (row == null)
+ return null;
+
+ List<GridH2ValueMessage> vals = new ArrayList<>(indexColumns.length);
+
+ for (IndexColumn idxCol : indexColumns) {
+ Value val = row.getValue(idxCol.column.getColumnId());
+
+ if (val == null)
+ break;
+
+ try {
+ vals.add(GridH2ValueMessageFactory.toMessage(val));
+ }
+ catch (IgniteCheckedException e) {
+ throw new CacheException(e);
+ }
+ }
+
+ GridH2RowMessage res = new GridH2RowMessage();
+
+ res.values(vals);
+
+ return res;
+ }
+
+ /**
+ *
+ */
+ @SuppressWarnings({"PublicInnerClass", "AssignmentOrReturnOfFieldWithMutableType"})
public class IndexColumnsInfo {
/** */
private final int inlineSize;
+
/** */
private final IndexColumn[] cols;
+
/** */
private final List<InlineIndexHelper> inlineIdx;
@@ -525,12 +898,12 @@ public class H2TreeIndex extends H2TreeIndexBase {
* @param colsList Index columns list
* @param cfgInlineSize Inline size from cache config.
*/
+ @SuppressWarnings("ZeroLengthArrayAllocation")
public IndexColumnsInfo(List<IndexColumn> colsList, int cfgInlineSize) {
- this.cols = colsList.toArray(new IndexColumn[0]);
-
- this.inlineIdx = getAvailableInlineColumns(cols);
+ cols = colsList.toArray(new IndexColumn[0]);
- this.inlineSize = computeInlineSize(inlineIdx, cfgInlineSize);
+ inlineIdx = getAvailableInlineColumns(cols);
+ inlineSize = computeInlineSize(inlineIdx, cfgInlineSize);
}
/**
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2IndexBase.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2IndexBase.java
index c53d0ec..4be96cd 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2IndexBase.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2IndexBase.java
@@ -17,135 +17,23 @@
package org.apache.ignite.internal.processors.query.h2.opt;
-import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.IgniteLogger;
-import org.apache.ignite.cluster.ClusterNode;
-import org.apache.ignite.internal.GridKernalContext;
-import org.apache.ignite.internal.GridTopic;
-import org.apache.ignite.internal.managers.communication.GridIoPolicy;
-import org.apache.ignite.internal.managers.communication.GridMessageListener;
import org.apache.ignite.internal.processors.cache.CacheObject;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
-import org.apache.ignite.internal.processors.cache.persistence.tree.BPlusTree;
import org.apache.ignite.internal.processors.query.QueryUtils;
-import org.apache.ignite.internal.processors.query.h2.H2Cursor;
-import org.apache.ignite.internal.processors.query.h2.H2Utils;
-import org.apache.ignite.internal.processors.query.h2.opt.join.CursorIteratorWrapper;
-import org.apache.ignite.internal.processors.query.h2.opt.join.DistributedJoinContext;
-import org.apache.ignite.internal.processors.query.h2.opt.join.DistributedLookupBatch;
+import org.apache.ignite.internal.processors.query.h2.opt.join.CollocationModelMultiplier;
import org.apache.ignite.internal.processors.query.h2.opt.join.CollocationModel;
-import org.apache.ignite.internal.processors.query.h2.opt.join.RangeSource;
-import org.apache.ignite.internal.processors.query.h2.opt.join.RangeStream;
-import org.apache.ignite.internal.processors.query.h2.opt.join.SegmentKey;
-import org.apache.ignite.internal.processors.query.h2.sql.SplitterContext;
-import org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2IndexRangeRequest;
-import org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2IndexRangeResponse;
-import org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2RowMessage;
-import org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2RowRange;
-import org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2RowRangeBounds;
-import org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2ValueMessage;
-import org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2ValueMessageFactory;
-import org.apache.ignite.internal.util.GridSpinBusyLock;
-import org.apache.ignite.internal.util.IgniteTree;
-import org.apache.ignite.internal.util.lang.GridCursor;
-import org.apache.ignite.internal.util.typedef.CIX2;
-import org.apache.ignite.internal.util.typedef.internal.U;
-import org.apache.ignite.lang.IgniteBiTuple;
-import org.apache.ignite.logger.NullLogger;
-import org.apache.ignite.plugin.extensions.communication.Message;
import org.h2.engine.Session;
import org.h2.index.BaseIndex;
-import org.h2.index.IndexCondition;
-import org.h2.index.IndexLookupBatch;
import org.h2.message.DbException;
import org.h2.result.Row;
import org.h2.result.SearchRow;
-import org.h2.table.IndexColumn;
import org.h2.table.TableFilter;
import org.h2.value.Value;
-import javax.cache.CacheException;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Comparator;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.UUID;
-
-import static java.util.Collections.singletonList;
-import static org.apache.ignite.internal.processors.query.h2.opt.GridH2QueryType.MAP;
-import static org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2IndexRangeResponse.STATUS_ERROR;
-import static org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2IndexRangeResponse.STATUS_NOT_FOUND;
-import static org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2IndexRangeResponse.STATUS_OK;
-import static org.h2.result.Row.MEMORY_CALCULATE;
-
/**
* Index base.
*/
public abstract class GridH2IndexBase extends BaseIndex {
- /** */
- public static final Object EXPLICIT_NULL = new Object();
-
- /** */
- private Object msgTopic;
-
- /** */
- private GridMessageListener msgLsnr;
-
- /** */
- private IgniteLogger log;
-
- /** */
- private final CIX2<ClusterNode,Message> locNodeHnd = new CIX2<ClusterNode,Message>() {
- @Override public void applyx(ClusterNode clusterNode, Message msg) {
- onMessage0(clusterNode.id(), msg);
- }
- };
-
- protected GridCacheContext<?, ?> ctx;
-
- /**
- * @param tbl Table.
- */
- @SuppressWarnings("MapReplaceableByEnumMap")
- protected final void initDistributedJoinMessaging(GridH2Table tbl) {
- final GridH2RowDescriptor desc = tbl.rowDescriptor();
-
- if (desc != null && desc.context() != null) {
- ctx = desc.context();
-
- GridKernalContext ctx = desc.context().kernalContext();
-
- log = ctx.log(getClass());
-
- msgTopic = new IgniteBiTuple<>(GridTopic.TOPIC_QUERY, tbl.identifierString() + '.' + getName());
-
- msgLsnr = new GridMessageListener() {
- @Override public void onMessage(UUID nodeId, Object msg, byte plc) {
- GridSpinBusyLock l = desc.indexing().busyLock();
-
- if (!l.enterBusy())
- return;
-
- try {
- onMessage0(nodeId, msg);
- }
- finally {
- l.leaveBusy();
- }
- }
- };
-
- ctx.io().addMessageListener(msgTopic, msgLsnr);
- }
- else {
- msgTopic = null;
- msgLsnr = null;
- log = new NullLogger();
- }
- }
-
/** {@inheritDoc} */
@Override public final void close(Session ses) {
// No-op. Actual index destruction must happen in method destroy.
@@ -159,8 +47,7 @@ public abstract class GridH2IndexBase extends BaseIndex {
* @param rmv Flag remove.
*/
public void destroy(boolean rmv) {
- if (msgLsnr != null)
- kernalContext().io().removeMessageListener(msgTopic, msgLsnr);
+ // No-op.
}
/**
@@ -209,18 +96,9 @@ public abstract class GridH2IndexBase extends BaseIndex {
* @return Multiplier.
*/
public final int getDistributedMultiplier(Session ses, TableFilter[] filters, int filter) {
- // We do optimizations with respect to distributed joins only on PREPARE stage only.
- // Notice that we check for isJoinBatchEnabled, because we can do multiple different
- // optimization passes on PREPARE stage.
- // Query expressions can not be distributed as well.
- SplitterContext ctx = SplitterContext.get();
-
- if (!ctx.distributedJoins() || !ses.isJoinBatchEnabled() || ses.isPreparingQueryExpression())
- return CollocationModel.MULTIPLIER_COLLOCATED;
+ CollocationModelMultiplier mul = CollocationModel.distributedMultiplier(ses, filters, filter);
- assert filters != null;
-
- return CollocationModel.distributedMultiplier(ctx, ses, filters, filter);
+ return mul.multiplier();
}
/** {@inheritDoc} */
@@ -264,283 +142,15 @@ public abstract class GridH2IndexBase extends BaseIndex {
}
/** {@inheritDoc} */
- @Override public IndexLookupBatch createLookupBatch(TableFilter[] filters, int filter) {
- GridH2QueryContext qctx = GridH2QueryContext.get();
-
- if (qctx == null || qctx.distributedJoinContext() == null || !getTable().isPartitioned())
- return null;
-
- IndexColumn affCol = getTable().getAffinityKeyColumn();
- GridH2RowDescriptor desc = getTable().rowDescriptor();
-
- int affColId = -1;
- boolean ucast = false;
-
- if (affCol != null) {
- affColId = affCol.column.getColumnId();
- int[] masks = filters[filter].getMasks();
-
- if (masks != null) {
- ucast = (masks[affColId] & IndexCondition.EQUALITY) != 0 ||
- desc.checkKeyIndexCondition(masks, IndexCondition.EQUALITY);
- }
- }
-
- GridCacheContext<?, ?> cctx = getTable().rowDescriptor().context();
-
- return new DistributedLookupBatch(this, cctx, ucast, affColId);
- }
-
- /** {@inheritDoc} */
@Override public void removeChildrenAndResources(Session session) {
// The sole purpose of this override is to pass session to table.removeIndex
assert table instanceof GridH2Table;
((GridH2Table)table).removeIndex(session, this);
- remove(session);
- database.removeMeta(session, getId());
- }
-
- /**
- * @param nodes Nodes.
- * @param msg Message.
- */
- public void send(Collection<ClusterNode> nodes, Message msg) {
- if (!getTable().rowDescriptor().indexing().send(msgTopic,
- -1,
- nodes,
- msg,
- null,
- locNodeHnd,
- GridIoPolicy.IDX_POOL,
- false))
- throw H2Utils.retryException("Failed to send message to nodes: " + nodes);
- }
-
- /**
- * @param nodeId Source node ID.
- * @param msg Message.
- */
- private void onMessage0(UUID nodeId, Object msg) {
- ClusterNode node = kernalContext().discovery().node(nodeId);
-
- if (node == null)
- return;
-
- try {
- if (msg instanceof GridH2IndexRangeRequest)
- onIndexRangeRequest(node, (GridH2IndexRangeRequest)msg);
- else if (msg instanceof GridH2IndexRangeResponse)
- onIndexRangeResponse(node, (GridH2IndexRangeResponse)msg);
- }
- catch (Throwable th) {
- U.error(log, "Failed to handle message[nodeId=" + nodeId + ", msg=" + msg + "]", th);
-
- if (th instanceof Error)
- throw th;
- }
- }
-
- /**
- * @return Kernal context.
- */
- private GridKernalContext kernalContext() {
- return getTable().rowDescriptor().context().kernalContext();
- }
-
- /**
- * @param node Requesting node.
- * @param msg Request message.
- */
- private void onIndexRangeRequest(final ClusterNode node, final GridH2IndexRangeRequest msg) {
- GridH2IndexRangeResponse res = new GridH2IndexRangeResponse();
-
- res.originNodeId(msg.originNodeId());
- res.queryId(msg.queryId());
- res.originSegmentId(msg.originSegmentId());
- res.segment(msg.segment());
- res.batchLookupId(msg.batchLookupId());
-
- GridH2QueryContext qctx = GridH2QueryContext.get(kernalContext().localNodeId(), msg.originNodeId(),
- msg.queryId(), msg.originSegmentId(), MAP);
-
- if (qctx == null)
- res.status(STATUS_NOT_FOUND);
- else {
- DistributedJoinContext joinCtx = qctx.distributedJoinContext();
-
- assert joinCtx != null;
-
- try {
- RangeSource src;
-
- if (msg.bounds() != null) {
- // This is the first request containing all the search rows.
- assert !msg.bounds().isEmpty() : "empty bounds";
-
- src = new RangeSource(this, msg.bounds(), msg.segment(), filter(qctx));
- }
- else {
- // This is request to fetch next portion of data.
- src = joinCtx.getSource(node.id(), msg.segment(), msg.batchLookupId());
-
- assert src != null;
- }
-
- List<GridH2RowRange> ranges = new ArrayList<>();
-
- int maxRows = joinCtx.pageSize();
-
- assert maxRows > 0 : maxRows;
-
- while (maxRows > 0) {
- GridH2RowRange range = src.next(maxRows);
-
- if (range == null)
- break;
-
- ranges.add(range);
-
- if (range.rows() != null)
- maxRows -= range.rows().size();
- }
-
- assert !ranges.isEmpty();
-
- if (src.hasMoreRows()) {
- // Save source for future fetches.
- if (msg.bounds() != null)
- joinCtx.putSource(node.id(), msg.segment(), msg.batchLookupId(), src);
- }
- else if (msg.bounds() == null) {
- // Drop saved source.
- joinCtx.putSource(node.id(), msg.segment(), msg.batchLookupId(), null);
- }
- res.ranges(ranges);
- res.status(STATUS_OK);
- }
- catch (Throwable th) {
- U.error(log, "Failed to process request: " + msg, th);
-
- res.error(th.getClass() + ": " + th.getMessage());
- res.status(STATUS_ERROR);
- }
- }
-
- send(singletonList(node), res);
- }
-
- /**
- * @param qctx Query context.
- * @return Row filter.
- */
- protected BPlusTree.TreeRowClosure<H2Row, H2Row> filter(GridH2QueryContext qctx) {
- throw new UnsupportedOperationException();
- }
-
- /**
- * @param node Responded node.
- * @param msg Response message.
- */
- private void onIndexRangeResponse(ClusterNode node, GridH2IndexRangeResponse msg) {
- GridH2QueryContext qctx = GridH2QueryContext.get(kernalContext().localNodeId(),
- msg.originNodeId(), msg.queryId(), msg.originSegmentId(), MAP);
-
- if (qctx == null)
- return;
-
- DistributedJoinContext joinCtx = qctx.distributedJoinContext();
-
- assert joinCtx != null;
-
- Map<SegmentKey, RangeStream> streams = joinCtx.getStreams(msg.batchLookupId());
-
- if (streams == null)
- return;
-
- RangeStream stream = streams.get(new SegmentKey(node, msg.segment()));
-
- assert stream != null;
-
- stream.onResponse(msg);
- }
-
- /**
- * @param msg Row message.
- * @return Search row.
- */
- private SearchRow toSearchRow(GridH2RowMessage msg) {
- if (msg == null)
- return null;
-
- GridKernalContext ctx = kernalContext();
-
- Value[] vals = new Value[getTable().getColumns().length];
-
- assert vals.length > 0;
-
- List<GridH2ValueMessage> msgVals = msg.values();
-
- for (int i = 0; i < indexColumns.length; i++) {
- if (i >= msgVals.size())
- continue;
-
- try {
- vals[indexColumns[i].column.getColumnId()] = msgVals.get(i).value(ctx);
- }
- catch (IgniteCheckedException e) {
- throw new CacheException(e);
- }
- }
-
- return database.createRow(vals, MEMORY_CALCULATE);
- }
-
- /**
- * @param row Search row.
- * @return Row message.
- */
- public GridH2RowMessage toSearchRowMessage(SearchRow row) {
- if (row == null)
- return null;
-
- List<GridH2ValueMessage> vals = new ArrayList<>(indexColumns.length);
-
- for (IndexColumn idxCol : indexColumns) {
- Value val = row.getValue(idxCol.column.getColumnId());
-
- if (val == null)
- break;
-
- try {
- vals.add(GridH2ValueMessageFactory.toMessage(val));
- }
- catch (IgniteCheckedException e) {
- throw new CacheException(e);
- }
- }
-
- GridH2RowMessage res = new GridH2RowMessage();
-
- res.values(vals);
-
- return res;
- }
+ remove(session);
- /**
- * @param arr Array.
- * @param off Offset.
- * @param cmp Comparator.
- */
- public static <Z> void bubbleUp(Z[] arr, int off, Comparator<Z> cmp) {
- // TODO Optimize: use binary search if the range in array is big.
- for (int i = off, last = arr.length - 1; i < last; i++) {
- if (cmp.compare(arr[i], arr[i + 1]) <= 0)
- break;
-
- U.swap(arr, i, i + 1);
- }
+ database.removeMeta(session, getId());
}
/**
@@ -561,7 +171,7 @@ public abstract class GridH2IndexBase extends BaseIndex {
* @return Segment ID for given row.
*/
@SuppressWarnings("IfMayBeConditional")
- protected int segmentForRow(SearchRow row) {
+ protected int segmentForRow(GridCacheContext ctx, SearchRow row) {
assert row != null;
if (segmentsCount() == 1 || ctx == null)
@@ -584,45 +194,6 @@ public abstract class GridH2IndexBase extends BaseIndex {
}
/**
- * Find rows for the segments (distributed joins).
- *
- * @param bounds Bounds.
- * @param segment Segment.
- * @param filter Filter.
- * @return Iterator.
- */
- @SuppressWarnings("unchecked")
- public Iterator<H2Row> findForSegment(GridH2RowRangeBounds bounds, int segment,
- BPlusTree.TreeRowClosure<H2Row, H2Row> filter) {
- SearchRow first = toSearchRow(bounds.first());
- SearchRow last = toSearchRow(bounds.last());
-
- IgniteTree t = treeForRead(segment);
-
- try {
- GridCursor<H2Row> range = ((BPlusTree)t).find(first, last, filter, null);
-
- if (range == null)
- range = H2Utils.EMPTY_CURSOR;
-
- H2Cursor cur = new H2Cursor(range);
-
- return new CursorIteratorWrapper(cur);
- }
- catch (IgniteCheckedException e) {
- throw DbException.convert(e);
- }
- }
-
- /**
- * @param segment Segment Id.
- * @return Snapshot for requested segment if there is one.
- */
- protected <K, V> IgniteTree<K, V> treeForRead(int segment) {
- throw new UnsupportedOperationException();
- }
-
- /**
* Re-assign column ids after removal of column(s).
*/
public void refreshColumnIds() {
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/join/BroadcastCursor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/join/BroadcastCursor.java
index 632d72a..86051e4 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/join/BroadcastCursor.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/join/BroadcastCursor.java
@@ -17,6 +17,7 @@
package org.apache.ignite.internal.processors.query.h2.opt.join;
+import org.apache.ignite.internal.processors.query.h2.H2Utils;
import org.apache.ignite.internal.processors.query.h2.opt.GridH2IndexBase;
import org.h2.index.Cursor;
import org.h2.result.Row;
@@ -30,6 +31,7 @@ import java.util.Map;
/**
* Merge cursor from multiple nodes.
*/
+@SuppressWarnings("ComparatorNotSerializable")
public class BroadcastCursor implements Cursor, Comparator<RangeStream> {
/** Index. */
private final GridH2IndexBase idx;
@@ -122,7 +124,7 @@ public class BroadcastCursor implements Cursor, Comparator<RangeStream> {
}
// Bubble up current min stream with respect to fetched row to achieve correct sort order of streams.
- GridH2IndexBase.bubbleUp(streams, off, this);
+ H2Utils.bubbleUp(streams, off, this);
return true;
}
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/join/CollocationModel.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/join/CollocationModel.java
index e5eea1d..6bbd968 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/join/CollocationModel.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/join/CollocationModel.java
@@ -48,17 +48,8 @@ import org.h2.table.TableView;
* Collocation model for a query.
*/
public final class CollocationModel {
- /** */
- public static final int MULTIPLIER_COLLOCATED = 1;
-
- /** */
- private static final int MULTIPLIER_UNICAST = 50;
-
- /** */
- private static final int MULTIPLIER_BROADCAST = 200;
-
- /** */
- private static final int MULTIPLIER_REPLICATED_NOT_LAST = 10_000;
+ /** Empty filter array. */
+ private static final TableFilter[] EMPTY_FILTERS = new TableFilter[0];
/** */
private final CollocationModel upper;
@@ -70,10 +61,10 @@ public final class CollocationModel {
private final boolean view;
/** */
- private int multiplier;
+ private CollocationModelMultiplier multiplier;
/** */
- private Type type;
+ private CollocationModelType type;
/** */
private CollocationModel[] children;
@@ -236,7 +227,7 @@ public final class CollocationModel {
// Reset results.
type = null;
- multiplier = 0;
+ multiplier = null;
return true;
}
@@ -254,14 +245,14 @@ public final class CollocationModel {
boolean collocated = true;
boolean partitioned = false;
- int maxMultiplier = MULTIPLIER_COLLOCATED;
+ CollocationModelMultiplier maxMultiplier = CollocationModelMultiplier.COLLOCATED;
for (int i = 0; i < childFilters.length; i++) {
CollocationModel child = child(i, true);
- Type t = child.type(true);
+ CollocationModelType t = child.type(true);
- if (child.multiplier == MULTIPLIER_REPLICATED_NOT_LAST)
+ if (child.multiplier == CollocationModelMultiplier.REPLICATED_NOT_LAST)
maxMultiplier = child.multiplier;
if (t.isPartitioned()) {
@@ -270,19 +261,19 @@ public final class CollocationModel {
if (!t.isCollocated()) {
collocated = false;
- int m = child.multiplier(true);
+ CollocationModelMultiplier m = child.multiplier(true);
- if (m > maxMultiplier) {
+ if (m.multiplier() > maxMultiplier.multiplier()) {
maxMultiplier = m;
- if (maxMultiplier == MULTIPLIER_REPLICATED_NOT_LAST)
+ if (maxMultiplier == CollocationModelMultiplier.REPLICATED_NOT_LAST)
break;
}
}
}
}
- type = Type.of(partitioned, collocated);
+ type = CollocationModelType.of(partitioned, collocated);
multiplier = maxMultiplier;
}
else {
@@ -294,8 +285,8 @@ public final class CollocationModel {
// Only partitioned tables will do distributed joins.
if (!(tbl instanceof GridH2Table) || !((GridH2Table)tbl).isPartitioned()) {
- type = Type.REPLICATED;
- multiplier = MULTIPLIER_COLLOCATED;
+ type = CollocationModelType.REPLICATED;
+ multiplier = CollocationModelMultiplier.COLLOCATED;
return;
}
@@ -303,29 +294,29 @@ public final class CollocationModel {
// If we are the first partitioned table in a join, then we are "base" for all the rest partitioned tables
// which will need to get remote result (if there is no affinity condition). Since this query is broadcasted
// to all the affinity nodes the "base" does not need to get remote results.
- if (!upper.findPartitionedTableBefore(filter)) {
- type = Type.PARTITIONED_COLLOCATED;
- multiplier = MULTIPLIER_COLLOCATED;
+ if (!upper.isPartitionedTableBeforeExists(filter)) {
+ type = CollocationModelType.PARTITIONED_COLLOCATED;
+ multiplier = CollocationModelMultiplier.COLLOCATED;
}
else {
// It is enough to make sure that our previous join by affinity key is collocated, then we are
// collocated. If we at least have affinity key condition, then we do unicast which is cheaper.
switch (upper.joinedWithCollocated(filter)) {
case COLLOCATED_JOIN:
- type = Type.PARTITIONED_COLLOCATED;
- multiplier = MULTIPLIER_COLLOCATED;
+ type = CollocationModelType.PARTITIONED_COLLOCATED;
+ multiplier = CollocationModelMultiplier.COLLOCATED;
break;
case HAS_AFFINITY_CONDITION:
- type = Type.PARTITIONED_NOT_COLLOCATED;
- multiplier = MULTIPLIER_UNICAST;
+ type = CollocationModelType.PARTITIONED_NOT_COLLOCATED;
+ multiplier = CollocationModelMultiplier.UNICAST;
break;
case NONE:
- type = Type.PARTITIONED_NOT_COLLOCATED;
- multiplier = MULTIPLIER_BROADCAST;
+ type = CollocationModelType.PARTITIONED_NOT_COLLOCATED;
+ multiplier = CollocationModelMultiplier.BROADCAST;
break;
@@ -334,18 +325,20 @@ public final class CollocationModel {
}
}
- if (upper.previousReplicated(filter))
- multiplier = MULTIPLIER_REPLICATED_NOT_LAST;
+ if (upper.isPreviousTableReplicated(filter))
+ multiplier = CollocationModelMultiplier.REPLICATED_NOT_LAST;
}
}
/**
- * @param f Current filter.
- * @return {@code true} If partitioned table was found.
+ * Check whether at least one PARTITIONED table is located before current table.
+ *
+ * @param filterIdx Current filter index.
+ * @return {@code true} If PARTITIONED table exists.
*/
- private boolean findPartitionedTableBefore(int f) {
- for (int i = 0; i < f; i++) {
- CollocationModel child = child(i, true);
+ private boolean isPartitionedTableBeforeExists(int filterIdx) {
+ for (int idx = 0; idx < filterIdx; idx++) {
+ CollocationModel child = child(idx, true);
// The c can be null if it is not a GridH2Table and not a sub-query,
// it is a some kind of function table or anything else that considered replicated.
@@ -354,19 +347,26 @@ public final class CollocationModel {
}
// We have to search globally in upper queries as well.
- return upper != null && upper.findPartitionedTableBefore(filter);
+ return upper != null && upper.isPartitionedTableBeforeExists(filter);
}
/**
- * @param f Current filter.
+ * Check if previous table in the sequence is REPLICATED.
+ *
+ * @param filterIdx Current filter index.
* @return {@code true} If previous table is REPLICATED.
*/
- @SuppressWarnings("SimplifiableIfStatement")
- private boolean previousReplicated(int f) {
- if (f > 0 && child(f - 1, true).type(true) == Type.REPLICATED)
+ private boolean isPreviousTableReplicated(int filterIdx) {
+ // We are at the first table, nothing exists before it
+ if (filterIdx == 0)
+ return false;
+
+ CollocationModel child = child(filterIdx - 1, true);
+
+ if (child != null && child.type(true) == CollocationModelType.REPLICATED)
return true;
- return upper != null && upper.previousReplicated(filter);
+ return upper != null && upper.isPreviousTableReplicated(filter);
}
/**
@@ -374,7 +374,7 @@ public final class CollocationModel {
* @return Affinity join type.
*/
@SuppressWarnings("ForLoopReplaceableByForEach")
- private Affinity joinedWithCollocated(int f) {
+ private CollocationModelAffinity joinedWithCollocated(int f) {
TableFilter tf = childFilters[f];
GridH2Table tbl = (GridH2Table)tf.getTable();
@@ -424,10 +424,10 @@ public final class CollocationModel {
// the found affinity column is the needed one, since we can select multiple
// different affinity columns from different tables.
if (cm != null && !cm.view) {
- Type t = cm.type(true);
+ CollocationModelType t = cm.type(true);
if (t.isPartitioned() && t.isCollocated() && isAffinityColumn(prevJoin, expCol, validate))
- return Affinity.COLLOCATED_JOIN;
+ return CollocationModelAffinity.COLLOCATED_JOIN;
}
}
}
@@ -435,7 +435,7 @@ public final class CollocationModel {
}
}
- return affKeyCondFound ? Affinity.HAS_AFFINITY_CONDITION : Affinity.NONE;
+ return affKeyCondFound ? CollocationModelAffinity.HAS_AFFINITY_CONDITION : CollocationModelAffinity.NONE;
}
/**
@@ -457,6 +457,7 @@ public final class CollocationModel {
* @param validate Query validation flag.
* @return {@code true} It it is an affinity column.
*/
+ @SuppressWarnings("IfMayBeConditional")
private static boolean isAffinityColumn(TableFilter f, ExpressionColumn expCol, boolean validate) {
Column col = expCol.getColumn();
@@ -519,21 +520,23 @@ public final class CollocationModel {
* @return Multiplier.
*/
@SuppressWarnings("ForLoopReplaceableByForEach")
- private int multiplier(boolean withUnion) {
+ private CollocationModelMultiplier multiplier(boolean withUnion) {
calculate();
- assert multiplier != 0;
+ assert multiplier != null;
if (withUnion && unions != null) {
- int maxMultiplier = 0;
+ CollocationModelMultiplier maxMultiplier = null;
for (int i = 0; i < unions.size(); i++) {
- int m = unions.get(i).multiplier(false);
+ CollocationModelMultiplier m = unions.get(i).multiplier(false);
- if (m > maxMultiplier)
+ if (maxMultiplier == null || m.multiplier() > maxMultiplier.multiplier())
maxMultiplier = m;
}
+ assert maxMultiplier != null;
+
return maxMultiplier;
}
@@ -544,26 +547,26 @@ public final class CollocationModel {
* @param withUnion With respect to union.
* @return Type.
*/
- private Type type(boolean withUnion) {
+ private CollocationModelType type(boolean withUnion) {
calculate();
assert type != null;
if (withUnion && unions != null) {
- Type left = unions.get(0).type(false);
+ CollocationModelType left = unions.get(0).type(false);
for (int i = 1; i < unions.size(); i++) {
- Type right = unions.get(i).type(false);
+ CollocationModelType right = unions.get(i).type(false);
if (!left.isCollocated() || !right.isCollocated()) {
- left = Type.PARTITIONED_NOT_COLLOCATED;
+ left = CollocationModelType.PARTITIONED_NOT_COLLOCATED;
break;
}
else if (!left.isPartitioned() && !right.isPartitioned())
- left = Type.REPLICATED;
+ left = CollocationModelType.REPLICATED;
else
- left = Type.PARTITIONED_COLLOCATED;
+ left = CollocationModelType.PARTITIONED_COLLOCATED;
}
return left;
@@ -577,6 +580,7 @@ public final class CollocationModel {
* @param create Create child if needed.
* @return Child collocation.
*/
+ @SuppressWarnings("IfMayBeConditional")
private CollocationModel child(int i, boolean create) {
CollocationModel child = children[i];
@@ -625,18 +629,22 @@ public final class CollocationModel {
/**
* Get distributed multiplier for the given sequence of tables.
*
- * @param ctx Splitter context.
* @param ses Session.
* @param filters Filters.
* @param filter Filter index.
* @return Multiplier.
*/
- public static int distributedMultiplier(
- SplitterContext ctx,
- Session ses,
- TableFilter[] filters,
- int filter
- ) {
+ public static CollocationModelMultiplier distributedMultiplier(Session ses, TableFilter[] filters, int filter) {
+ // Notice that we check for isJoinBatchEnabled, because we can do multiple different
+ // optimization passes on PREPARE stage.
+ // Query expressions can not be distributed as well.
+ SplitterContext ctx = SplitterContext.get();
+
+ if (!ctx.distributedJoins() || !ses.isJoinBatchEnabled() || ses.isPreparingQueryExpression())
+ return CollocationModelMultiplier.COLLOCATED;
+
+ assert filters != null;
+
clearViewIndexCache(ses);
CollocationModel model = buildCollocationModel(ctx, ses.getSubQueryInfo(), filters, filter, false);
@@ -717,9 +725,9 @@ public final class CollocationModel {
public static boolean isCollocated(Query qry) {
CollocationModel mdl = buildCollocationModel(null, -1, qry, null, true);
- Type type = mdl.type(true);
+ CollocationModelType type = mdl.type(true);
- if (!type.isCollocated() && mdl.multiplier == MULTIPLIER_REPLICATED_NOT_LAST)
+ if (!type.isCollocated() && mdl.multiplier == CollocationModelMultiplier.REPLICATED_NOT_LAST)
throw new CacheException("Failed to execute query: for distributed join " +
"all REPLICATED caches must be at the end of the joined tables list.");
@@ -763,7 +771,7 @@ public final class CollocationModel {
for (TableFilter f = select.getTopTableFilter(); f != null; f = f.getJoin())
list.add(f);
- TableFilter[] filters = list.toArray(new TableFilter[list.size()]);
+ TableFilter[] filters = list.toArray(EMPTY_FILTERS);
CollocationModel cm = createChildModel(upper, filter, unions, true, validate);
@@ -803,75 +811,4 @@ public final class CollocationModel {
if (!viewIdxCache.isEmpty())
viewIdxCache.clear();
}
-
- /**
- * Collocation type.
- */
- private enum Type {
- /** */
- PARTITIONED_COLLOCATED(true, true),
-
- /** */
- PARTITIONED_NOT_COLLOCATED(true, false),
-
- /** */
- REPLICATED(false, true);
-
- /** */
- private final boolean partitioned;
-
- /** */
- private final boolean collocated;
-
- /**
- * @param partitioned Partitioned.
- * @param collocated Collocated.
- */
- Type(boolean partitioned, boolean collocated) {
- this.partitioned = partitioned;
- this.collocated = collocated;
- }
-
- /**
- * @return {@code true} If partitioned.
- */
- public boolean isPartitioned() {
- return partitioned;
- }
-
- /**
- * @return {@code true} If collocated.
- */
- public boolean isCollocated() {
- return collocated;
- }
-
- /**
- * @param partitioned Partitioned.
- * @param collocated Collocated.
- * @return Type.
- */
- static Type of(boolean partitioned, boolean collocated) {
- if (collocated)
- return partitioned ? Type.PARTITIONED_COLLOCATED : Type.REPLICATED;
-
- assert partitioned;
-
- return Type.PARTITIONED_NOT_COLLOCATED;
- }
- }
-
- /**
- * Affinity of a table relative to previous joined tables.
- */
- private enum Affinity {
- /** */
- NONE,
-
- /** */
- HAS_AFFINITY_CONDITION,
-
- /** */
- COLLOCATED_JOIN
- }
}
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/join/CollocationModelAffinity.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/join/CollocationModelAffinity.java
new file mode 100644
index 0000000..07740ad
--- /dev/null
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/join/CollocationModelAffinity.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.h2.opt.join;
+
+/**
+ * Affinity of a table relative to previous joined tables.
+ */
+public enum CollocationModelAffinity {
+ /** */
+ NONE,
+
+ /** */
+ HAS_AFFINITY_CONDITION,
+
+ /** */
+ COLLOCATED_JOIN
+}
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/join/CollocationModelMultiplier.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/join/CollocationModelMultiplier.java
new file mode 100644
index 0000000..74b76a3
--- /dev/null
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/join/CollocationModelMultiplier.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.h2.opt.join;
+
+/**
+ * Multiplier for different collocation types.
+ */
+public enum CollocationModelMultiplier {
+ /** Tables are collocated, cheap. */
+ COLLOCATED(1),
+
+ /** */
+ UNICAST(50),
+
+ /** */
+ BROADCAST(200),
+
+ /** Force REPLICATED tables to be at the end of join sequence. */
+ REPLICATED_NOT_LAST(10_000);
+
+ /** Multiplier value. */
+ private final int multiplier;
+
+ /**
+ * Constructor.
+ *
+ * @param multiplier Multiplier value.
+ */
+ CollocationModelMultiplier(int multiplier) {
+ this.multiplier = multiplier;
+ }
+
+ /**
+ * @return Multiplier value.
+ */
+ public int multiplier() {
+ return multiplier;
+ }
+}
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/join/CollocationModelType.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/join/CollocationModelType.java
new file mode 100644
index 0000000..ea6b10e
--- /dev/null
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/join/CollocationModelType.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.h2.opt.join;
+
+/**
+ * Collocation type.
+ */
+public enum CollocationModelType {
+ /** */
+ PARTITIONED_COLLOCATED(true, true),
+
+ /** */
+ PARTITIONED_NOT_COLLOCATED(true, false),
+
+ /** */
+ REPLICATED(false, true);
+
+ /** */
+ private final boolean partitioned;
+
+ /** */
+ private final boolean collocated;
+
+ /**
+ * @param partitioned Partitioned.
+ * @param collocated Collocated.
+ */
+ CollocationModelType(boolean partitioned, boolean collocated) {
+ this.partitioned = partitioned;
+ this.collocated = collocated;
+ }
+
+ /**
+ * @return {@code true} If partitioned.
+ */
+ public boolean isPartitioned() {
+ return partitioned;
+ }
+
+ /**
+ * @return {@code true} If collocated.
+ */
+ public boolean isCollocated() {
+ return collocated;
+ }
+
+ /**
+ * @param partitioned Partitioned.
+ * @param collocated Collocated.
+ * @return Type.
+ */
+ public static CollocationModelType of(boolean partitioned, boolean collocated) {
+ if (collocated)
+ return partitioned ? PARTITIONED_COLLOCATED : REPLICATED;
+
+ assert partitioned;
+
+ return PARTITIONED_NOT_COLLOCATED;
+ }
+}
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/join/DistributedLookupBatch.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/join/DistributedLookupBatch.java
index 26206c9..490858f 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/join/DistributedLookupBatch.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/join/DistributedLookupBatch.java
@@ -21,7 +21,7 @@ import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.query.QueryUtils;
import org.apache.ignite.internal.processors.query.h2.H2Utils;
-import org.apache.ignite.internal.processors.query.h2.opt.GridH2IndexBase;
+import org.apache.ignite.internal.processors.query.h2.database.H2TreeIndex;
import org.apache.ignite.internal.processors.query.h2.opt.GridH2QueryContext;
import org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2IndexRangeRequest;
import org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2RowMessage;
@@ -51,8 +51,11 @@ import static org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2R
* Index lookup batch.
*/
public class DistributedLookupBatch implements IndexLookupBatch {
+ /** */
+ private static final Object EXPLICIT_NULL = new Object();
+
/** Index. */
- private final GridH2IndexBase idx;
+ private final H2TreeIndex idx;
/** */
private final GridCacheContext<?,?> cctx;
@@ -89,7 +92,7 @@ public class DistributedLookupBatch implements IndexLookupBatch {
* @param ucast Unicast or broadcast query.
* @param affColId Affinity column ID.
*/
- public DistributedLookupBatch(GridH2IndexBase idx, GridCacheContext<?, ?> cctx, boolean ucast, int affColId) {
+ public DistributedLookupBatch(H2TreeIndex idx, GridCacheContext<?, ?> cctx, boolean ucast, int affColId) {
this.idx = idx;
this.cctx = cctx;
this.ucast = ucast;
@@ -112,7 +115,7 @@ public class DistributedLookupBatch implements IndexLookupBatch {
Value affKeyLast = lastRow.getValue(affColId);
if (affKeyFirst != null && equal(affKeyFirst, affKeyLast))
- return affKeyFirst == ValueNull.INSTANCE ? GridH2IndexBase.EXPLICIT_NULL : affKeyFirst.getObject();
+ return affKeyFirst == ValueNull.INSTANCE ? EXPLICIT_NULL : affKeyFirst.getObject();
if (idx.getTable().rowDescriptor().isKeyColumn(affColId))
return null;
@@ -122,7 +125,7 @@ public class DistributedLookupBatch implements IndexLookupBatch {
Value pkLast = lastRow.getValue(QueryUtils.KEY_COL);
if (pkFirst == ValueNull.INSTANCE || pkLast == ValueNull.INSTANCE)
- return GridH2IndexBase.EXPLICIT_NULL;
+ return EXPLICIT_NULL;
if (pkFirst == null || pkLast == null || !equal(pkFirst, pkLast))
return null;
@@ -176,7 +179,7 @@ public class DistributedLookupBatch implements IndexLookupBatch {
if (affKey != null) {
// Affinity key is provided.
- if (affKey == GridH2IndexBase.EXPLICIT_NULL) // Affinity key is explicit null, we will not find anything.
+ if (affKey == EXPLICIT_NULL) // Affinity key is explicit null, we will not find anything.
return false;
segmentKeys = F.asList(rangeSegment(affKey, locQry));
@@ -325,7 +328,7 @@ public class DistributedLookupBatch implements IndexLookupBatch {
* @return Segment key for Affinity key.
*/
public SegmentKey rangeSegment(Object affKeyObj, boolean isLocalQry) {
- assert affKeyObj != null && affKeyObj != GridH2IndexBase.EXPLICIT_NULL : affKeyObj;
+ assert affKeyObj != null && affKeyObj != EXPLICIT_NULL : affKeyObj;
ClusterNode node;
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/join/RangeSource.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/join/RangeSource.java
index 405478e..64aa0ca 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/join/RangeSource.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/join/RangeSource.java
@@ -19,7 +19,7 @@ package org.apache.ignite.internal.processors.query.h2.opt.join;
import org.apache.ignite.internal.processors.cache.persistence.tree.BPlusTree;
import org.apache.ignite.internal.processors.query.h2.H2Utils;
-import org.apache.ignite.internal.processors.query.h2.opt.GridH2IndexBase;
+import org.apache.ignite.internal.processors.query.h2.database.H2TreeIndex;
import org.apache.ignite.internal.processors.query.h2.opt.H2Row;
import org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2RowMessage;
import org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2RowRange;
@@ -36,7 +36,7 @@ import static java.util.Collections.emptyIterator;
*/
public class RangeSource {
/** Index. */
- private final GridH2IndexBase idx;
+ private final H2TreeIndex idx;
/** */
private Iterator<GridH2RowRangeBounds> boundsIter;
@@ -59,7 +59,7 @@ public class RangeSource {
* @param filter Filter.
*/
public RangeSource(
- GridH2IndexBase idx,
+ H2TreeIndex idx,
Iterable<GridH2RowRangeBounds> bounds,
int segment,
BPlusTree.TreeRowClosure<H2Row, H2Row> filter
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/join/RangeStream.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/join/RangeStream.java
index b80cd61..6891ff3 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/join/RangeStream.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/join/RangeStream.java
@@ -23,8 +23,8 @@ import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.IgniteInterruptedCheckedException;
import org.apache.ignite.internal.processors.query.h2.H2Utils;
+import org.apache.ignite.internal.processors.query.h2.database.H2TreeIndex;
import org.apache.ignite.internal.processors.query.h2.opt.GridH2Cursor;
-import org.apache.ignite.internal.processors.query.h2.opt.GridH2IndexBase;
import org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2IndexRangeRequest;
import org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2IndexRangeResponse;
import org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2RowMessage;
@@ -58,7 +58,7 @@ public class RangeStream {
private final GridKernalContext ctx;
/** Index. */
- private final GridH2IndexBase idx;
+ private final H2TreeIndex idx;
/** */
private final DistributedJoinContext joinCtx;
@@ -88,7 +88,7 @@ public class RangeStream {
* @param joinCtx Join context.
* @param node Node.
*/
- public RangeStream(GridKernalContext ctx, GridH2IndexBase idx, DistributedJoinContext joinCtx, ClusterNode node) {
+ public RangeStream(GridKernalContext ctx, H2TreeIndex idx, DistributedJoinContext joinCtx, ClusterNode node) {
this.ctx = ctx;
this.idx = idx;
this.node = node;
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndexSorted.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndexSorted.java
index 880e305..482752a 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndexSorted.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndexSorted.java
@@ -32,6 +32,7 @@ import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.internal.GridKernalContext;
+import org.apache.ignite.internal.processors.query.h2.H2Utils;
import org.apache.ignite.internal.processors.query.h2.opt.GridH2Cursor;
import org.apache.ignite.internal.processors.query.h2.opt.H2PlainRowFactory;
import org.apache.ignite.internal.util.typedef.F;
@@ -49,7 +50,6 @@ import org.h2.value.Value;
import org.jetbrains.annotations.Nullable;
import static java.util.Collections.emptyIterator;
-import static org.apache.ignite.internal.processors.query.h2.opt.GridH2IndexBase.bubbleUp;
/**
* Sorted index.
@@ -258,7 +258,7 @@ public final class GridMergeIndexSorted extends GridMergeIndex {
return; // All streams are done.
if (streams[off].next())
- bubbleUp(streams, off, streamCmp);
+ H2Utils.bubbleUp(streams, off, streamCmp);
else
streams[off++] = null; // Move left bound and nullify empty stream.
}
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
index 8097800..fdfb44c 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
@@ -1209,7 +1209,8 @@ public class GridReduceQueryExecutor {
specialize,
locNodeHnd,
GridIoPolicy.QUERY_POOL,
- runLocParallel);
+ runLocParallel
+ );
}
/**