You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vo...@apache.org on 2019/02/03 10:46:09 UTC
[ignite] branch master updated: IGNITE-11169: SQL: encapsulated
distributed join information into separate context object to simplify (and
possibly remove in future) GridH2QueryContext object. This closes #6005.
This is an automated email from the ASF dual-hosted git repository.
vozerov pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new c71e7df IGNITE-11169: SQL: encapsulated distributed join information into separate context object to simplify (and possibly remove in future) GridH2QueryContext object. This closes #6005.
c71e7df is described below
commit c71e7dfed112b57861620270eb31c11226ebf7b9
Author: devozerov <pp...@gmail.com>
AuthorDate: Sun Feb 3 13:45:58 2019 +0300
IGNITE-11169: SQL: encapsulated distributed join information into separate context object to simplify (and possibly remove in future) GridH2QueryContext object. This closes #6005.
---
.../processors/query/h2/IgniteH2Indexing.java | 48 ++--
.../processors/query/h2/opt/GridH2IndexBase.java | 52 ++--
.../query/h2/opt/GridH2QueryContext.java | 290 ++-------------------
.../processors/query/h2/opt/GridH2QueryType.java | 7 +-
.../query/h2/opt/join/CollocationModel.java | 70 +++--
.../query/h2/opt/join/DistributedJoinContext.java | 263 +++++++++++++++++++
.../query/h2/opt/join/DistributedJoinMode.java | 51 ----
.../query/h2/opt/join/DistributedLookupBatch.java | 64 ++---
.../processors/query/h2/opt/join/RangeStream.java | 13 +-
.../query/h2/sql/GridSqlQuerySplitter.java | 43 +++
.../processors/query/h2/sql/SplitterContext.java | 85 ++++++
.../query/h2/twostep/GridMapQueryExecutor.java | 118 ++++-----
.../query/h2/twostep/GridReduceQueryExecutor.java | 13 +-
13 files changed, 610 insertions(+), 507 deletions(-)
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
index 1579ec6..5b83088 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
@@ -182,9 +182,6 @@ import static org.apache.ignite.internal.processors.cache.query.GridCacheQueryTy
import static org.apache.ignite.internal.processors.query.h2.PreparedStatementEx.MVCC_CACHE_ID;
import static org.apache.ignite.internal.processors.query.h2.PreparedStatementEx.MVCC_STATE;
import static org.apache.ignite.internal.processors.query.h2.opt.GridH2QueryType.LOCAL;
-import static org.apache.ignite.internal.processors.query.h2.opt.GridH2QueryType.PREPARE;
-import static org.apache.ignite.internal.processors.query.h2.opt.join.DistributedJoinMode.OFF;
-import static org.apache.ignite.internal.processors.query.h2.opt.join.DistributedJoinMode.distributedJoinMode;
import static org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2QueryRequest.isDataPageScanEnabled;
/**
@@ -545,8 +542,14 @@ public class IgniteH2Indexing implements GridQueryIndexing {
IgniteQueryErrorCode.UNSUPPORTED_OPERATION);
}
- final GridH2QueryContext ctx = new GridH2QueryContext(nodeId, nodeId, 0, LOCAL)
- .filter(filter).distributedJoinMode(OFF);
+ final GridH2QueryContext ctx = new GridH2QueryContext(
+ nodeId,
+ nodeId,
+ 0,
+ 0,
+ LOCAL,
+ filter
+ );
boolean forUpdate = GridSqlQueryParser.isForUpdateQuery(p);
@@ -691,8 +694,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
}
}
- return new H2FieldsIterator(rs, mvccTracker0, sfuFut0 != null,
- detachedConn);
+ return new H2FieldsIterator(rs, mvccTracker0, sfuFut0 != null, detachedConn);
}
catch (IgniteCheckedException | RuntimeException | Error e) {
detachedConn.recycle();
@@ -1843,8 +1845,6 @@ public class IgniteH2Indexing implements GridQueryIndexing {
if (!hasTwoStep)
return new ParsingResult(prepared, newQry, remainingSql, null, null, null);
- final UUID locNodeId = ctx.localNodeId();
-
// Now we're sure to have a distributed query. Let's try to get a two-step plan from the cache, or perform the
// split if needed.
H2TwoStepCachedQueryKey cachedQryKey = new H2TwoStepCachedQueryKey(schemaName, qry.getSql(),
@@ -1863,28 +1863,20 @@ public class IgniteH2Indexing implements GridQueryIndexing {
}
try {
- GridH2QueryContext.set(new GridH2QueryContext(locNodeId, locNodeId, 0, PREPARE)
- .distributedJoinMode(distributedJoinMode(qry.isLocal(), qry.isDistributedJoins())));
-
- try {
- GridCacheTwoStepQuery twoStepQry = split(prepared, newQry);
+ GridCacheTwoStepQuery twoStepQry = split(prepared, newQry);
- return new ParsingResult(prepared, newQry, remainingSql, twoStepQry,
- cachedQryKey, H2Utils.meta(stmt.getMetaData()));
- }
- catch (IgniteCheckedException e) {
- throw new IgniteSQLException("Failed to bind parameters: [qry=" + newQry.getSql() + ", params=" +
- Arrays.deepToString(newQry.getArgs()) + "]", IgniteQueryErrorCode.PARSING, e);
- }
- catch (SQLException e) {
- throw new IgniteSQLException(e);
- }
- finally {
- U.close(stmt, log);
- }
+ return new ParsingResult(prepared, newQry, remainingSql, twoStepQry,
+ cachedQryKey, H2Utils.meta(stmt.getMetaData()));
+ }
+ catch (IgniteCheckedException e) {
+ throw new IgniteSQLException("Failed to bind parameters: [qry=" + newQry.getSql() + ", params=" +
+ Arrays.deepToString(newQry.getArgs()) + "]", IgniteQueryErrorCode.PARSING, e);
+ }
+ catch (SQLException e) {
+ throw new IgniteSQLException(e);
}
finally {
- GridH2QueryContext.clearThreadLocal();
+ U.close(stmt, log);
}
}
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2IndexBase.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2IndexBase.java
index ab9929d..c53d0ec 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2IndexBase.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2IndexBase.java
@@ -31,11 +31,13 @@ import org.apache.ignite.internal.processors.query.QueryUtils;
import org.apache.ignite.internal.processors.query.h2.H2Cursor;
import org.apache.ignite.internal.processors.query.h2.H2Utils;
import org.apache.ignite.internal.processors.query.h2.opt.join.CursorIteratorWrapper;
+import org.apache.ignite.internal.processors.query.h2.opt.join.DistributedJoinContext;
import org.apache.ignite.internal.processors.query.h2.opt.join.DistributedLookupBatch;
import org.apache.ignite.internal.processors.query.h2.opt.join.CollocationModel;
import org.apache.ignite.internal.processors.query.h2.opt.join.RangeSource;
import org.apache.ignite.internal.processors.query.h2.opt.join.RangeStream;
import org.apache.ignite.internal.processors.query.h2.opt.join.SegmentKey;
+import org.apache.ignite.internal.processors.query.h2.sql.SplitterContext;
import org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2IndexRangeRequest;
import org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2IndexRangeResponse;
import org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2RowMessage;
@@ -55,7 +57,6 @@ import org.h2.engine.Session;
import org.h2.index.BaseIndex;
import org.h2.index.IndexCondition;
import org.h2.index.IndexLookupBatch;
-import org.h2.index.ViewIndex;
import org.h2.message.DbException;
import org.h2.result.Row;
import org.h2.result.SearchRow;
@@ -73,10 +74,7 @@ import java.util.Map;
import java.util.UUID;
import static java.util.Collections.singletonList;
-import static org.apache.ignite.internal.processors.query.h2.opt.join.DistributedJoinMode.OFF;
-import static org.apache.ignite.internal.processors.query.h2.opt.join.CollocationModel.buildCollocationModel;
import static org.apache.ignite.internal.processors.query.h2.opt.GridH2QueryType.MAP;
-import static org.apache.ignite.internal.processors.query.h2.opt.GridH2QueryType.PREPARE;
import static org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2IndexRangeResponse.STATUS_ERROR;
import static org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2IndexRangeResponse.STATUS_NOT_FOUND;
import static org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2IndexRangeResponse.STATUS_OK;
@@ -206,41 +204,23 @@ public abstract class GridH2IndexBase extends BaseIndex {
/**
* @param ses Session.
- */
- private static void clearViewIndexCache(Session ses) {
- Map<Object,ViewIndex> viewIdxCache = ses.getViewIndexCache(true);
-
- if (!viewIdxCache.isEmpty())
- viewIdxCache.clear();
- }
-
- /**
- * @param ses Session.
* @param filters All joined table filters.
* @param filter Current filter.
* @return Multiplier.
*/
public final int getDistributedMultiplier(Session ses, TableFilter[] filters, int filter) {
- GridH2QueryContext qctx = GridH2QueryContext.get();
-
// We do optimizations with respect to distributed joins only on PREPARE stage only.
// Notice that we check for isJoinBatchEnabled, because we can do multiple different
// optimization passes on PREPARE stage.
// Query expressions can not be distributed as well.
- if (qctx == null || qctx.type() != PREPARE || qctx.distributedJoinMode() == OFF ||
- !ses.isJoinBatchEnabled() || ses.isPreparingQueryExpression())
- return CollocationModel.MULTIPLIER_COLLOCATED;
+ SplitterContext ctx = SplitterContext.get();
- // We have to clear this cache because normally sub-query plan cost does not depend on anything
- // other than index condition masks and sort order, but in our case it can depend on order
- // of previous table filters.
- clearViewIndexCache(ses);
+ if (!ctx.distributedJoins() || !ses.isJoinBatchEnabled() || ses.isPreparingQueryExpression())
+ return CollocationModel.MULTIPLIER_COLLOCATED;
assert filters != null;
- CollocationModel c = buildCollocationModel(qctx, ses.getSubQueryInfo(), filters, filter, false);
-
- return c.calculateMultiplier();
+ return CollocationModel.distributedMultiplier(ctx, ses, filters, filter);
}
/** {@inheritDoc} */
@@ -287,7 +267,7 @@ public abstract class GridH2IndexBase extends BaseIndex {
@Override public IndexLookupBatch createLookupBatch(TableFilter[] filters, int filter) {
GridH2QueryContext qctx = GridH2QueryContext.get();
- if (qctx == null || qctx.distributedJoinMode() == OFF || !getTable().isPartitioned())
+ if (qctx == null || qctx.distributedJoinContext() == null || !getTable().isPartitioned())
return null;
IndexColumn affCol = getTable().getAffinityKeyColumn();
@@ -387,6 +367,10 @@ public abstract class GridH2IndexBase extends BaseIndex {
if (qctx == null)
res.status(STATUS_NOT_FOUND);
else {
+ DistributedJoinContext joinCtx = qctx.distributedJoinContext();
+
+ assert joinCtx != null;
+
try {
RangeSource src;
@@ -398,14 +382,14 @@ public abstract class GridH2IndexBase extends BaseIndex {
}
else {
// This is request to fetch next portion of data.
- src = qctx.getSource(node.id(), msg.segment(), msg.batchLookupId());
+ src = joinCtx.getSource(node.id(), msg.segment(), msg.batchLookupId());
assert src != null;
}
List<GridH2RowRange> ranges = new ArrayList<>();
- int maxRows = qctx.pageSize();
+ int maxRows = joinCtx.pageSize();
assert maxRows > 0 : maxRows;
@@ -426,11 +410,11 @@ public abstract class GridH2IndexBase extends BaseIndex {
if (src.hasMoreRows()) {
// Save source for future fetches.
if (msg.bounds() != null)
- qctx.putSource(node.id(), msg.segment(), msg.batchLookupId(), src);
+ joinCtx.putSource(node.id(), msg.segment(), msg.batchLookupId(), src);
}
else if (msg.bounds() == null) {
// Drop saved source.
- qctx.putSource(node.id(), msg.segment(), msg.batchLookupId(), null);
+ joinCtx.putSource(node.id(), msg.segment(), msg.batchLookupId(), null);
}
res.ranges(ranges);
@@ -466,7 +450,11 @@ public abstract class GridH2IndexBase extends BaseIndex {
if (qctx == null)
return;
- Map<SegmentKey, RangeStream> streams = qctx.getStreams(msg.batchLookupId());
+ DistributedJoinContext joinCtx = qctx.distributedJoinContext();
+
+ assert joinCtx != null;
+
+ Map<SegmentKey, RangeStream> streams = joinCtx.getStreams(msg.batchLookupId());
if (streams == null)
return;
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2QueryContext.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2QueryContext.java
index babced3..40e7edb 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2QueryContext.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2QueryContext.java
@@ -17,26 +17,19 @@
package org.apache.ignite.internal.processors.query.h2.opt;
-import java.util.HashMap;
import java.util.List;
-import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
-import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
-import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridReservable;
import org.apache.ignite.internal.processors.cache.mvcc.MvccSnapshot;
-import org.apache.ignite.internal.processors.query.h2.opt.join.DistributedJoinMode;
-import org.apache.ignite.internal.processors.query.h2.opt.join.CollocationModel;
-import org.apache.ignite.internal.processors.query.h2.opt.join.SourceKey;
+import org.apache.ignite.internal.processors.query.h2.opt.join.DistributedJoinContext;
import org.apache.ignite.internal.processors.query.h2.twostep.MapQueryLazyWorker;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.spi.indexing.IndexingQueryFilter;
import org.jetbrains.annotations.Nullable;
-import static org.apache.ignite.internal.processors.query.h2.opt.join.DistributedJoinMode.OFF;
import static org.apache.ignite.internal.processors.query.h2.opt.GridH2QueryType.MAP;
/**
@@ -53,40 +46,13 @@ public class GridH2QueryContext {
private final QueryContextKey key;
/** */
- private volatile boolean cleared;
-
- /** */
private List<GridReservable> reservations;
- /** Range streams for indexes. */
- private Map<Integer, Object> streams;
-
- /** Range sources for indexes. */
- private Map<SourceKey, Object> sources;
-
- /** */
- private int batchLookupIdGen;
-
- /** */
- private IndexingQueryFilter filter;
-
- /** */
- private AffinityTopologyVersion topVer;
-
/** */
- private Map<UUID, int[]> partsMap;
+ private final IndexingQueryFilter filter;
- /** */
- private UUID[] partsNodes;
-
- /** */
- private DistributedJoinMode distributedJoinMode;
-
- /** */
- private int pageSize;
-
- /** */
- private CollocationModel qryCollocationMdl;
+ /** Distributed join context. */
+ private DistributedJoinContext distributedJoinCtx;
/** */
private MvccSnapshot mvccSnapshot;
@@ -98,29 +64,22 @@ public class GridH2QueryContext {
* @param locNodeId Local node ID.
* @param nodeId The node who initiated the query.
* @param qryId The query ID.
- * @param type Query type.
- */
- public GridH2QueryContext(UUID locNodeId, UUID nodeId, long qryId, GridH2QueryType type) {
- assert type != MAP;
-
- key = new QueryContextKey(locNodeId, nodeId, qryId, 0, type);
- }
-
- /**
- * @param locNodeId Local node ID.
- * @param nodeId The node who initiated the query.
- * @param qryId The query ID.
* @param segmentId Index segment ID.
* @param type Query type.
*/
- public GridH2QueryContext(UUID locNodeId,
+ public GridH2QueryContext(
+ UUID locNodeId,
UUID nodeId,
long qryId,
int segmentId,
- GridH2QueryType type) {
+ GridH2QueryType type,
+ IndexingQueryFilter filter
+ ) {
assert segmentId == 0 || type == MAP;
key = new QueryContextKey(locNodeId, nodeId, qryId, segmentId, type);
+
+ this.filter = filter;
}
/**
@@ -141,61 +100,27 @@ public class GridH2QueryContext {
}
/**
- * @return Type.
- */
- public GridH2QueryType type() {
- return key.type();
- }
-
- /**
- * @return Origin node ID.
- */
- public UUID originNodeId() {
- return key.nodeId();
- }
-
- /**
- * @return Query request ID.
- */
- public long queryId() {
- return key.queryId();
- }
-
- /**
- * @return Query collocation model.
- */
- public CollocationModel queryCollocationModel() {
- return qryCollocationMdl;
- }
-
- /**
- * @param qryCollocationMdl Query collocation model.
+ * @param distributedJoinCtx Distributed join context.
+ * @return This instance for chaining.
*/
- public void queryCollocationModel(CollocationModel qryCollocationMdl) {
- this.qryCollocationMdl = qryCollocationMdl;
- }
-
- /**
- * @param distributedJoinMode Distributed join mode.
- * @return {@code this}.
- */
- public GridH2QueryContext distributedJoinMode(DistributedJoinMode distributedJoinMode) {
- this.distributedJoinMode = distributedJoinMode;
+ public GridH2QueryContext distributedJoinContext(@Nullable DistributedJoinContext distributedJoinCtx) {
+ this.distributedJoinCtx = distributedJoinCtx;
return this;
}
/**
- * @return Distributed join mode.
+ * @return Distributed join context.
*/
- public DistributedJoinMode distributedJoinMode() {
- return distributedJoinMode;
+ @Nullable public DistributedJoinContext distributedJoinContext() {
+ return distributedJoinCtx;
}
/**
* @param reservations Reserved partitions or group reservations.
* @return {@code this}.
*/
+ @SuppressWarnings("AssignmentOrReturnOfFieldWithMutableType")
public GridH2QueryContext reservations(List<GridReservable> reservations) {
this.reservations = reservations;
@@ -203,148 +128,13 @@ public class GridH2QueryContext {
}
/**
- * @param topVer Topology version.
- * @return {@code this}.
+ * @return index segment ID.
*/
- public GridH2QueryContext topologyVersion(AffinityTopologyVersion topVer) {
- this.topVer = topVer;
-
- return this;
- }
-
- /**
- * @return Topology version.
- */
- public AffinityTopologyVersion topologyVersion() {
- return topVer;
- }
-
- /**
- * @param partsMap Partitions map.
- * @return {@code this}.
- */
- public GridH2QueryContext partitionsMap(Map<UUID,int[]> partsMap) {
- this.partsMap = partsMap;
-
- return this;
- }
-
- /**
- * @return Partitions map.
- */
- public Map<UUID,int[]> partitionsMap() {
- return partsMap;
- }
-
- /**
- * @param p Partition.
- * @param cctx Cache context.
- * @return Owning node ID.
- */
- public UUID nodeForPartition(int p, GridCacheContext<?, ?> cctx) {
- UUID[] nodeIds = partsNodes;
-
- if (nodeIds == null) {
- assert partsMap != null;
-
- nodeIds = new UUID[cctx.affinity().partitions()];
-
- for (Map.Entry<UUID, int[]> e : partsMap.entrySet()) {
- UUID nodeId = e.getKey();
- int[] nodeParts = e.getValue();
-
- assert nodeId != null;
- assert !F.isEmpty(nodeParts);
-
- for (int part : nodeParts) {
- assert nodeIds[part] == null;
-
- nodeIds[part] = nodeId;
- }
- }
-
- partsNodes = nodeIds;
- }
-
- return nodeIds[p];
- }
-
- /** @return index segment ID. */
public int segment() {
return key.segmentId();
}
/**
- * @param batchLookupId Batch lookup ID.
- * @param streams Range streams.
- */
- public synchronized void putStreams(int batchLookupId, Object streams) {
- if (this.streams == null) {
- if (streams == null)
- return;
-
- this.streams = new HashMap<>();
- }
-
- if (streams == null)
- this.streams.remove(batchLookupId);
- else
- this.streams.put(batchLookupId, streams);
- }
-
- /**
- * @param batchLookupId Batch lookup ID.
- * @return Range streams.
- */
- @SuppressWarnings("unchecked")
- public synchronized <T> T getStreams(int batchLookupId) {
- if (streams == null)
- return null;
-
- return (T)streams.get(batchLookupId);
- }
-
- /**
- * @param ownerId Owner node ID.
- * @param segmentId Index segment ID.
- * @param batchLookupId Batch lookup ID.
- * @param src Range source.
- */
- public synchronized void putSource(UUID ownerId, int segmentId, int batchLookupId, Object src) {
- SourceKey srcKey = new SourceKey(ownerId, segmentId, batchLookupId);
-
- if (src != null) {
- if (sources == null)
- sources = new HashMap<>();
-
- sources.put(srcKey, src);
- }
- else if (sources != null)
- sources.remove(srcKey);
- }
-
- /**
- * @param ownerId Owner node ID.
- * @param segmentId Index segment ID.
- * @param batchLookupId Batch lookup ID.
- * @return Range source.
- */
- @SuppressWarnings("unchecked")
- public synchronized <T> T getSource(UUID ownerId, int segmentId, int batchLookupId) {
- if (sources == null)
- return null;
-
- return (T)sources.get(new SourceKey(ownerId, segmentId, batchLookupId));
- }
-
- /**
- * @return Next batch ID.
- */
- public int nextBatchLookupId() {
- return ++batchLookupIdGen;
- }
-
- /**
* Sets current thread local context. This method must be called when all the non-volatile properties are
* already set to ensure visibility for other threads.
*
@@ -354,7 +144,7 @@ public class GridH2QueryContext {
assert qctx.get() == null;
// We need MAP query context to be available to other threads to run distributed joins.
- if (x.key.type() == MAP && x.distributedJoinMode() != OFF && qctxs.putIfAbsent(x.key, x) != null)
+ if (x.key.type() == MAP && x.distributedJoinContext() != null && qctxs.putIfAbsent(x.key, x) != null)
throw new IllegalStateException("Query context is already set.");
qctx.set(x);
@@ -421,7 +211,8 @@ public class GridH2QueryContext {
*/
@SuppressWarnings("ForLoopReplaceableByForEach")
public void clearContext(boolean nodeStop) {
- cleared = true;
+ if (distributedJoinCtx != null)
+ distributedJoinCtx.cancel();
List<GridReservable> r = reservations;
@@ -432,13 +223,6 @@ public class GridH2QueryContext {
}
/**
- * @return {@code true} If the context is cleared.
- */
- public boolean isCleared() {
- return cleared;
- }
-
- /**
* @param locNodeId Local node ID.
* @param nodeId Dead node ID.
*/
@@ -496,33 +280,6 @@ public class GridH2QueryContext {
}
/**
- * @param filter Filter.
- * @return {@code this}.
- */
- public GridH2QueryContext filter(IndexingQueryFilter filter) {
- this.filter = filter;
-
- return this;
- }
-
- /**
- * @return Page size.
- */
- public int pageSize() {
- return pageSize;
- }
-
- /**
- * @param pageSize Page size.
- * @return {@code this}.
- */
- public GridH2QueryContext pageSize(int pageSize) {
- this.pageSize = pageSize;
-
- return this;
- }
-
- /**
* @return Lazy worker, if any, or {@code null} if none.
*/
public MapQueryLazyWorker lazyWorker() {
@@ -543,5 +300,4 @@ public class GridH2QueryContext {
@Override public String toString() {
return S.toString(GridH2QueryContext.class, this);
}
-
}
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2QueryType.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2QueryType.java
index f6d0408..ae0b053 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2QueryType.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2QueryType.java
@@ -40,10 +40,5 @@ public enum GridH2QueryType {
* Replicated query over a network. Such a query can be sent from a client node or node which
* did not load all the partitions yet.
*/
- REPLICATED,
-
- /**
- * Parsing and optimization stage.
- */
- PREPARE,
+ REPLICATED;
}
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/join/CollocationModel.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/join/CollocationModel.java
index 03653c7..e5eea1d 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/join/CollocationModel.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/join/CollocationModel.java
@@ -20,16 +20,18 @@ package org.apache.ignite.internal.processors.query.h2.opt.join;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
+import java.util.Map;
import javax.cache.CacheException;
-import org.apache.ignite.internal.processors.query.h2.opt.GridH2QueryContext;
import org.apache.ignite.internal.processors.query.h2.opt.GridH2Table;
import org.apache.ignite.internal.processors.query.h2.sql.GridSqlQueryParser;
+import org.apache.ignite.internal.processors.query.h2.sql.SplitterContext;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.SB;
import org.h2.command.dml.Query;
import org.h2.command.dml.Select;
import org.h2.command.dml.SelectUnion;
+import org.h2.engine.Session;
import org.h2.expression.Comparison;
import org.h2.expression.Expression;
import org.h2.expression.ExpressionColumn;
@@ -513,14 +515,6 @@ public final class CollocationModel {
}
/**
- * @return Multiplier.
- */
- public int calculateMultiplier() {
- // We don't need multiplier for union here because it will be summarized in H2.
- return multiplier(false);
- }
-
- /**
* @param withUnion With respect to union.
* @return Multiplier.
*/
@@ -629,29 +623,56 @@ public final class CollocationModel {
}
/**
- * @param qctx Query context.
+ * Get distributed multiplier for the given sequence of tables.
+ *
+ * @param ctx Splitter context.
+ * @param ses Session.
+ * @param filters Filters.
+ * @param filter Filter index.
+ * @return Multiplier.
+ */
+ public static int distributedMultiplier(
+ SplitterContext ctx,
+ Session ses,
+ TableFilter[] filters,
+ int filter
+ ) {
+ clearViewIndexCache(ses);
+
+ CollocationModel model = buildCollocationModel(ctx, ses.getSubQueryInfo(), filters, filter, false);
+
+ return model.multiplier(false);
+ }
+
+ /**
+ * @param ctx Splitter context.
* @param info Sub-query info.
* @param filters Filters.
* @param filter Filter.
* @param validate Query validation flag.
* @return Collocation.
*/
- public static CollocationModel buildCollocationModel(GridH2QueryContext qctx, SubQueryInfo info,
- TableFilter[] filters, int filter, boolean validate) {
+ private static CollocationModel buildCollocationModel(
+ SplitterContext ctx,
+ SubQueryInfo info,
+ TableFilter[] filters,
+ int filter,
+ boolean validate
+ ) {
CollocationModel cm;
if (info != null) {
// Go up until we reach the root query.
- cm = buildCollocationModel(qctx, info.getUpper(), info.getFilters(), info.getFilter(), validate);
+ cm = buildCollocationModel(ctx, info.getUpper(), info.getFilters(), info.getFilter(), validate);
}
else {
// We are at the root query.
- cm = qctx.queryCollocationModel();
+ cm = ctx.collocationModel();
if (cm == null) {
cm = createChildModel(null, -1, null, true, validate);
- qctx.queryCollocationModel(cm);
+ ctx.collocationModel(cm);
}
}
@@ -713,11 +734,13 @@ public final class CollocationModel {
* @param validate Query validation flag.
* @return Built model.
*/
- private static CollocationModel buildCollocationModel(CollocationModel upper,
+ private static CollocationModel buildCollocationModel(
+ CollocationModel upper,
int filter,
Query qry,
List<CollocationModel> unions,
- boolean validate) {
+ boolean validate
+ ) {
if (qry.isUnion()) {
if (unions == null)
unions = new ArrayList<>();
@@ -769,6 +792,19 @@ public final class CollocationModel {
}
/**
+ * @param ses Session.
+ */
+ private static void clearViewIndexCache(Session ses) {
+ // We have to clear this cache because normally sub-query plan cost does not depend on anything
+ // other than index condition masks and sort order, but in our case it can depend on order
+ // of previous table filters.
+ Map<Object,ViewIndex> viewIdxCache = ses.getViewIndexCache(true);
+
+ if (!viewIdxCache.isEmpty())
+ viewIdxCache.clear();
+ }
+
+ /**
* Collocation type.
*/
private enum Type {
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/join/DistributedJoinContext.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/join/DistributedJoinContext.java
new file mode 100644
index 0000000..3960602
--- /dev/null
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/join/DistributedJoinContext.java
@@ -0,0 +1,263 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.h2.opt.join;
+
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.cache.GridCacheContext;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.internal.S;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.UUID;
+
+/**
+ * Context for distributed joins.
+ */
+public class DistributedJoinContext {
+ /** Local flag. */
+ private final boolean loc;
+
+ /** */
+ private final AffinityTopologyVersion topVer;
+
+ /** */
+ private final Map<UUID, int[]> partsMap;
+
+ /** */
+ private final UUID originNodeId;
+
+ /** */
+ private final long qryId;
+
+ /** */
+ private final int segment;
+
+ /** */
+ private final int pageSize;
+
+ /** Range streams for indexes. */
+ private Map<Integer, Object> streams;
+
+ /** Range sources for indexes. */
+ private Map<SourceKey, Object> sources;
+
+ /** */
+ private int batchLookupIdGen;
+
+ /** */
+ private UUID[] partsNodes;
+
+ /** */
+ private boolean cancelled;
+
+ /**
+ * Constructor.
+ *
+ * @param loc Local flag.
+ * @param topVer Topology version.
+ * @param partsMap Partitions map.
+ * @param originNodeId ID of the node started the query.
+ * @param qryId Query ID.
+ * @param segment Segment.
+ * @param pageSize Pahe size.
+ */
+ @SuppressWarnings("AssignmentOrReturnOfFieldWithMutableType")
+ public DistributedJoinContext(boolean loc, AffinityTopologyVersion topVer, Map<UUID, int[]> partsMap,
+ UUID originNodeId, long qryId, int segment, int pageSize) {
+ this.loc = loc;
+ this.topVer = topVer;
+ this.partsMap = partsMap;
+ this.originNodeId = originNodeId;
+ this.qryId = qryId;
+ this.segment = segment;
+ this.pageSize = pageSize;
+ }
+
+ /**
+ * @return Local flag.
+ */
+ public boolean local() {
+ return loc;
+ }
+
+ /**
+ * @return Affinity topology version.
+ */
+ public AffinityTopologyVersion topologyVersion() {
+ return topVer;
+ }
+
+ /**
+ * @return Partitions map.
+ */
+ @SuppressWarnings("AssignmentOrReturnOfFieldWithMutableType")
+ public Map<UUID,int[]> partitionsMap() {
+ return partsMap;
+ }
+
+ /**
+ * @return Origin node ID.
+ */
+ public UUID originNodeId() {
+ return originNodeId;
+ }
+
+ /**
+ * @return Query request ID.
+ */
+ public long queryId() {
+ return qryId;
+ }
+
+ /**
+ * @return index segment ID.
+ */
+ public int segment() {
+ return segment;
+ }
+
+ /**
+ * @return Page size.
+ */
+ public int pageSize() {
+ return pageSize;
+ }
+
+ /**
+ * @param p Partition.
+ * @param cctx Cache context.
+ * @return Owning node ID.
+ */
+ public UUID nodeForPartition(int p, GridCacheContext<?, ?> cctx) {
+ UUID[] nodeIds = partsNodes;
+
+ if (nodeIds == null) {
+ assert partsMap != null;
+
+ nodeIds = new UUID[cctx.affinity().partitions()];
+
+ for (Map.Entry<UUID, int[]> e : partsMap.entrySet()) {
+ UUID nodeId = e.getKey();
+ int[] nodeParts = e.getValue();
+
+ assert nodeId != null;
+ assert !F.isEmpty(nodeParts);
+
+ for (int part : nodeParts) {
+ assert nodeIds[part] == null;
+
+ nodeIds[part] = nodeId;
+ }
+ }
+
+ partsNodes = nodeIds;
+ }
+
+ return nodeIds[p];
+ }
+
+ /**
+ * @param batchLookupId Batch lookup ID.
+ * @param streams Range streams.
+ */
+ public synchronized void putStreams(int batchLookupId, Object streams) {
+ if (this.streams == null) {
+ if (streams == null)
+ return;
+
+ this.streams = new HashMap<>();
+ }
+
+ if (streams == null)
+ this.streams.remove(batchLookupId);
+ else
+ this.streams.put(batchLookupId, streams);
+ }
+
+ /**
+ * @param batchLookupId Batch lookup ID.
+ * @return Range streams.
+ */
+ @SuppressWarnings("unchecked")
+ public synchronized <T> T getStreams(int batchLookupId) {
+ if (streams == null)
+ return null;
+
+ return (T)streams.get(batchLookupId);
+ }
+
+ /**
+ * @param ownerId Owner node ID.
+ * @param segmentId Index segment ID.
+ * @param batchLookupId Batch lookup ID.
+ * @param src Range source.
+ */
+ public synchronized void putSource(UUID ownerId, int segmentId, int batchLookupId, Object src) {
+ SourceKey srcKey = new SourceKey(ownerId, segmentId, batchLookupId);
+
+ if (src != null) {
+ if (sources == null)
+ sources = new HashMap<>();
+
+ sources.put(srcKey, src);
+ }
+ else if (sources != null)
+ sources.remove(srcKey);
+ }
+
+ /**
+ * @param ownerId Owner node ID.
+ * @param segmentId Index segment ID.
+ * @param batchLookupId Batch lookup ID.
+ * @return Range source.
+ */
+ @SuppressWarnings("unchecked")
+ public synchronized <T> T getSource(UUID ownerId, int segmentId, int batchLookupId) {
+ if (sources == null)
+ return null;
+
+ return (T)sources.get(new SourceKey(ownerId, segmentId, batchLookupId));
+ }
+
+ /**
+ * @return Next batch ID.
+ */
+ public int nextBatchLookupId() {
+ return ++batchLookupIdGen;
+ }
+
+ /**
+ * @return Cleared flag.
+ */
+ public boolean isCancelled() {
+ return cancelled;
+ }
+
+ /**
+ * Mark as cleared.
+ */
+ public void cancel() {
+ cancelled = true;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(DistributedJoinContext.class, this);
+ }
+}
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/join/DistributedJoinMode.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/join/DistributedJoinMode.java
deleted file mode 100644
index 7c958da..0000000
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/join/DistributedJoinMode.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.query.h2.opt.join;
-
-/**
- * Defines set of distributed join modes.
- */
-public enum DistributedJoinMode {
- /**
- * Distributed joins is disabled. Local joins will be performed instead.
- */
- OFF,
-
- /**
- * Distributed joins is enabled within local node only.
- *
- * NOTE: This mode is used with segmented indices for local sql queries.
- * As in this case we need to make distributed join across local index segments
- * and prevent range-queries to other nodes.
- */
- LOCAL_ONLY,
-
- /**
- * Distributed joins is enabled.
- */
- ON;
-
- /**
- * @param isLocal Query local flag.
- * @param distributedJoins Query distributed joins flag.
- * @return DistributedJoinMode for the query.
- */
- public static DistributedJoinMode distributedJoinMode(boolean isLocal, boolean distributedJoins) {
- return distributedJoins ? (isLocal ? LOCAL_ONLY : ON) : OFF;
- }
-}
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/join/DistributedLookupBatch.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/join/DistributedLookupBatch.java
index 5411a4a..26206c9 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/join/DistributedLookupBatch.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/join/DistributedLookupBatch.java
@@ -44,7 +44,6 @@ import java.util.Map;
import java.util.UUID;
import java.util.concurrent.Future;
-import static org.apache.ignite.internal.processors.query.h2.opt.join.DistributedJoinMode.LOCAL_ONLY;
import static org.apache.ignite.internal.processors.query.h2.opt.GridH2RowDescriptor.COL_NOT_EXISTS;
import static org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2RowRangeBounds.rangeBounds;
@@ -64,8 +63,8 @@ public class DistributedLookupBatch implements IndexLookupBatch {
/** */
private final int affColId;
- /** */
- private GridH2QueryContext qctx;
+ /** Join context. */
+ private DistributedJoinContext joinCtx;
/** */
private int batchLookupId;
@@ -143,27 +142,29 @@ public class DistributedLookupBatch implements IndexLookupBatch {
/** {@inheritDoc} */
@SuppressWarnings({"ForLoopReplaceableByForEach", "IfMayBeConditional"})
@Override public boolean addSearchRows(SearchRow firstRow, SearchRow lastRow) {
- if (qctx == null || findCalled) {
- if (qctx == null) {
+ if (joinCtx == null || findCalled) {
+ if (joinCtx == null) {
// It is the first call after query begin (may be after reuse),
// reinitialize query context and result.
- qctx = GridH2QueryContext.get();
+ GridH2QueryContext qctx = GridH2QueryContext.get();
res = new ArrayList<>();
assert qctx != null;
assert !findCalled;
+
+ joinCtx = qctx.distributedJoinContext();
}
else {
// Cleanup after the previous lookup phase.
assert batchLookupId != 0;
findCalled = false;
- qctx.putStreams(batchLookupId, null);
+ joinCtx.putStreams(batchLookupId, null);
res.clear();
}
// Reinitialize for the next lookup phase.
- batchLookupId = qctx.nextBatchLookupId();
+ batchLookupId = joinCtx.nextBatchLookupId();
rangeStreams = new HashMap<>();
}
@@ -212,9 +213,9 @@ public class DistributedLookupBatch implements IndexLookupBatch {
List<GridH2RowRangeBounds> bounds;
if (stream == null) {
- stream = new RangeStream(cctx.kernalContext(), idx, qctx, segmentKey.node());
+ stream = new RangeStream(cctx.kernalContext(), idx, joinCtx, segmentKey.node());
- stream.request(createRequest(qctx, batchLookupId, segmentKey.segmentId()));
+ stream.request(createRequest(joinCtx, batchLookupId, segmentKey.segmentId()));
stream.request().bounds(bounds = new ArrayList<>());
rangeStreams.put(segmentKey, stream);
@@ -225,7 +226,7 @@ public class DistributedLookupBatch implements IndexLookupBatch {
bounds.add(rangeBounds);
// If at least one node will have a full batch then we are ok.
- if (bounds.size() >= qctx.pageSize())
+ if (bounds.size() >= joinCtx.pageSize())
batchFull = true;
}
@@ -260,9 +261,7 @@ public class DistributedLookupBatch implements IndexLookupBatch {
* @return {@code True} if local query execution is enforced.
*/
private boolean localQuery() {
- assert qctx != null : "Missing query context: " + this;
-
- return qctx.distributedJoinMode() == LOCAL_ONLY;
+ return joinCtx != null && joinCtx.local();
}
/**
@@ -275,7 +274,7 @@ public class DistributedLookupBatch implements IndexLookupBatch {
return;
}
- qctx.putStreams(batchLookupId, rangeStreams);
+ joinCtx.putStreams(batchLookupId, rangeStreams);
// Start streaming.
for (RangeStream stream : rangeStreams.values()) {
@@ -296,14 +295,16 @@ public class DistributedLookupBatch implements IndexLookupBatch {
/** {@inheritDoc} */
@Override public void reset(boolean beforeQry) {
- if (beforeQry || qctx == null) // Query context can be null if addSearchRows was never called.
+ if (beforeQry || joinCtx == null) // Query context can be null if addSearchRows was never called.
return;
assert batchLookupId != 0;
// Do cleanup after the query run.
- qctx.putStreams(batchLookupId, null);
- qctx = null; // The same query can be reused multiple times for different query contexts.
+ joinCtx.putStreams(batchLookupId, null);
+
+ joinCtx = null; // The same query can be reused multiple times for different query contexts.
+
batchLookupId = 0;
rangeStreams = Collections.emptyMap();
@@ -331,28 +332,28 @@ public class DistributedLookupBatch implements IndexLookupBatch {
int partition = cctx.affinity().partition(affKeyObj);
if (isLocalQry) {
- if (qctx.partitionsMap() != null) {
+ if (joinCtx.partitionsMap() != null) {
// If we have explicit partitions map, we have to use it to calculate affinity node.
- UUID nodeId = qctx.nodeForPartition(partition, cctx);
+ UUID nodeId = joinCtx.nodeForPartition(partition, cctx);
if(!cctx.localNodeId().equals(nodeId))
return null; // Prevent remote index call for local queries.
}
- if (!cctx.affinity().primaryByKey(cctx.localNode(), partition, qctx.topologyVersion()))
+ if (!cctx.affinity().primaryByKey(cctx.localNode(), partition, joinCtx.topologyVersion()))
return null;
node = cctx.localNode();
}
else{
- if (qctx.partitionsMap() != null) {
+ if (joinCtx.partitionsMap() != null) {
// If we have explicit partitions map, we have to use it to calculate affinity node.
- UUID nodeId = qctx.nodeForPartition(partition, cctx);
+ UUID nodeId = joinCtx.nodeForPartition(partition, cctx);
node = cctx.discovery().node(nodeId);
}
else // Get primary node for current topology version.
- node = cctx.affinity().primaryByKey(affKeyObj, qctx.topologyVersion());
+ node = cctx.affinity().primaryByKey(affKeyObj, joinCtx.topologyVersion());
if (node == null) // Node was not found, probably topology changed and we need to retry the whole query.
throw H2Utils.retryException("Failed to get primary node by key for range segment.");
@@ -366,7 +367,7 @@ public class DistributedLookupBatch implements IndexLookupBatch {
* @return Collection of nodes for broadcasting.
*/
public List<SegmentKey> broadcastSegments(boolean isLocalQry) {
- Map<UUID, int[]> partMap = qctx.partitionsMap();
+ Map<UUID, int[]> partMap = joinCtx.partitionsMap();
List<ClusterNode> nodes;
@@ -378,7 +379,7 @@ public class DistributedLookupBatch implements IndexLookupBatch {
}
else {
if (partMap == null)
- nodes = new ArrayList<>(CU.affinityNodes(cctx, qctx.topologyVersion()));
+ nodes = new ArrayList<>(CU.affinityNodes(cctx, joinCtx.topologyVersion()));
else {
nodes = new ArrayList<>(partMap.size());
@@ -411,17 +412,18 @@ public class DistributedLookupBatch implements IndexLookupBatch {
}
/**
- * @param qctx Query context.
+ * @param joinCtx Join context.
* @param batchLookupId Batch lookup ID.
* @param segmentId Segment ID.
* @return Index range request.
*/
- public static GridH2IndexRangeRequest createRequest(GridH2QueryContext qctx, int batchLookupId, int segmentId) {
+ public static GridH2IndexRangeRequest createRequest(DistributedJoinContext joinCtx, int batchLookupId,
+ int segmentId) {
GridH2IndexRangeRequest req = new GridH2IndexRangeRequest();
- req.originNodeId(qctx.originNodeId());
- req.queryId(qctx.queryId());
- req.originSegmentId(qctx.segment());
+ req.originNodeId(joinCtx.originNodeId());
+ req.queryId(joinCtx.queryId());
+ req.originSegmentId(joinCtx.segment());
req.segment(segmentId);
req.batchLookupId(batchLookupId);
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/join/RangeStream.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/join/RangeStream.java
index 0684089..b80cd61 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/join/RangeStream.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/join/RangeStream.java
@@ -25,7 +25,6 @@ import org.apache.ignite.internal.IgniteInterruptedCheckedException;
import org.apache.ignite.internal.processors.query.h2.H2Utils;
import org.apache.ignite.internal.processors.query.h2.opt.GridH2Cursor;
import org.apache.ignite.internal.processors.query.h2.opt.GridH2IndexBase;
-import org.apache.ignite.internal.processors.query.h2.opt.GridH2QueryContext;
import org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2IndexRangeRequest;
import org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2IndexRangeResponse;
import org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2RowMessage;
@@ -62,7 +61,7 @@ public class RangeStream {
private final GridH2IndexBase idx;
/** */
- private final GridH2QueryContext qctx;
+ private final DistributedJoinContext joinCtx;
/** */
private final ClusterNode node;
@@ -86,14 +85,14 @@ public class RangeStream {
private int cursorRangeId = -1;
/**
- * @param qctx Query context.
+ * @param joinCtx Join context.
* @param node Node.
*/
- public RangeStream(GridKernalContext ctx, GridH2IndexBase idx, GridH2QueryContext qctx, ClusterNode node) {
+ public RangeStream(GridKernalContext ctx, GridH2IndexBase idx, DistributedJoinContext joinCtx, ClusterNode node) {
this.ctx = ctx;
this.idx = idx;
this.node = node;
- this.qctx = qctx;
+ this.joinCtx = joinCtx;
}
/**
@@ -137,7 +136,7 @@ public class RangeStream {
final long start = U.currentTimeMillis();
for (int attempt = 0;; attempt++) {
- if (qctx.isCleared())
+ if (joinCtx.isCancelled())
throw H2Utils.retryException("Query is cancelled.");
if (ctx.isStopping())
@@ -164,7 +163,7 @@ public class RangeStream {
if (remainingRanges > 0) {
if (req.bounds() != null)
- req = DistributedLookupBatch.createRequest(qctx, req.batchLookupId(), req.segment());
+ req = DistributedLookupBatch.createRequest(joinCtx, req.batchLookupId(), req.segment());
// Prefetch next page.
idx.send(singletonList(node), req);
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQuerySplitter.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQuerySplitter.java
index 741f859..28166d5 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQuerySplitter.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQuerySplitter.java
@@ -129,6 +129,7 @@ public class GridSqlQuerySplitter {
* @param distributedJoins Distributed joins flag.
* @param extractor Partition extractor.
*/
+ @SuppressWarnings("AssignmentOrReturnOfFieldWithMutableType")
public GridSqlQuerySplitter(Object[] params, boolean collocatedGrpBy, boolean distributedJoins,
PartitionExtractor extractor) {
this.params = params;
@@ -183,6 +184,45 @@ public class GridSqlQuerySplitter {
boolean enforceJoinOrder,
PartitionExtractor partExtractor
) throws SQLException, IgniteCheckedException {
+ SplitterContext.set(distributedJoins);
+
+ try {
+ return split0(
+ conn,
+ prepared,
+ params,
+ collocatedGrpBy,
+ distributedJoins,
+ enforceJoinOrder,
+ partExtractor
+ );
+ }
+ finally {
+ SplitterContext.set(false);
+ }
+ }
+
+ /**
+ * @param conn Connection.
+ * @param prepared Prepared.
+ * @param params Parameters.
+ * @param collocatedGrpBy Whether the query has collocated GROUP BY keys.
+ * @param distributedJoins If distributed joins enabled.
+ * @param enforceJoinOrder Enforce join order.
+ * @param partExtractor Partition extractor.
+ * @return Two step query.
+ * @throws SQLException If failed.
+ * @throws IgniteCheckedException If failed.
+ */
+ public static GridCacheTwoStepQuery split0(
+ Connection conn,
+ Prepared prepared,
+ Object[] params,
+ boolean collocatedGrpBy,
+ boolean distributedJoins,
+ boolean enforceJoinOrder,
+ PartitionExtractor partExtractor
+ ) throws SQLException, IgniteCheckedException {
if (params == null)
params = GridCacheSqlQuery.EMPTY_PARAMS;
@@ -836,6 +876,7 @@ public class GridSqlQuerySplitter {
* @param wrapAlias Alias of the wrap query.
* @param select The original select.
*/
+ @SuppressWarnings("IfMayBeConditional")
private void pushDownSelectColumns(
Set<GridSqlAlias> tblAliases,
Map<String,GridSqlAlias> cols,
@@ -1237,6 +1278,7 @@ public class GridSqlQuerySplitter {
* @param cols Columns from SELECT clause.
* @return Map of columns with types.
*/
+ @SuppressWarnings("IfMayBeConditional")
private LinkedHashMap<String,?> collectColumns(List<GridSqlAst> cols) {
LinkedHashMap<String, GridSqlType> res = new LinkedHashMap<>(cols.size(), 1f, false);
@@ -1523,6 +1565,7 @@ public class GridSqlQuerySplitter {
* @param hasDistinctAggregate If query has distinct aggregate expression.
* @param first If this is the first aggregate found in this expression.
*/
+ @SuppressWarnings("IfMayBeConditional")
private void splitAggregate(
GridSqlAst parentExpr,
int aggIdx,
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/SplitterContext.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/SplitterContext.java
new file mode 100644
index 0000000..013cd0e
--- /dev/null
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/SplitterContext.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.h2.sql;
+
+import org.apache.ignite.internal.processors.query.h2.opt.join.CollocationModel;
+
+/**
+ * Splitter context while delegates optimization information to H2 internals.
+ */
+public class SplitterContext {
+ /** Empty context. */
+ private static final SplitterContext EMPTY = new SplitterContext(false);
+
+ /** Splitter context. */
+ private static final ThreadLocal<SplitterContext> CTX = ThreadLocal.withInitial(() -> EMPTY);
+
+ /** Whether distributed joins are enabled. */
+ private final boolean distributedJoins;
+
+ /** Query collocation model. */
+ private CollocationModel collocationModel;
+
+ /**
+ * @return Current context.
+ */
+ public static SplitterContext get() {
+ return CTX.get();
+ }
+
+ /**
+ * Set new context.
+ *
+ * @param distributedJoins Whether distributed joins are enabled.
+ */
+ public static void set(boolean distributedJoins) {
+ SplitterContext ctx = distributedJoins ? new SplitterContext(true) : EMPTY;
+
+ CTX.set(ctx);
+ }
+
+ /**
+ * Constructor.
+ *
+ * @param distributedJoins Whether distributed joins are enabled.
+ */
+ public SplitterContext(boolean distributedJoins) {
+ this.distributedJoins = distributedJoins;
+ }
+
+ /**
+ * @return Whether distributed joins are enabled.
+ */
+ public boolean distributedJoins() {
+ return distributedJoins;
+ }
+
+ /**
+ * @return Query collocation model.
+ */
+ public CollocationModel collocationModel() {
+ return collocationModel;
+ }
+
+ /**
+ * @param collocationModel Query collocation model.
+ */
+ public void collocationModel(CollocationModel collocationModel) {
+ this.collocationModel = collocationModel;
+ }
+}
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
index e67294d..399bf25 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
@@ -21,12 +21,10 @@ import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
-import java.util.AbstractCollection;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
-import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.UUID;
@@ -76,8 +74,9 @@ import org.apache.ignite.internal.processors.query.h2.IgniteH2Indexing;
import org.apache.ignite.internal.processors.query.h2.ResultSetEnlistFuture;
import org.apache.ignite.internal.processors.query.h2.UpdateResult;
import org.apache.ignite.internal.processors.query.h2.opt.GridH2QueryContext;
+import org.apache.ignite.internal.processors.query.h2.opt.GridH2QueryType;
import org.apache.ignite.internal.processors.query.h2.opt.GridH2RetryException;
-import org.apache.ignite.internal.processors.query.h2.opt.join.DistributedJoinMode;
+import org.apache.ignite.internal.processors.query.h2.opt.join.DistributedJoinContext;
import org.apache.ignite.internal.processors.query.h2.sql.GridSqlQueryParser;
import org.apache.ignite.internal.processors.query.h2.twostep.messages.GridQueryCancelRequest;
import org.apache.ignite.internal.processors.query.h2.twostep.messages.GridQueryFailResponse;
@@ -95,7 +94,6 @@ import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteInClosure;
import org.apache.ignite.lang.IgniteUuid;
-import org.apache.ignite.plugin.extensions.communication.Message;
import org.apache.ignite.spi.indexing.IndexingQueryFilter;
import org.apache.ignite.thread.IgniteThread;
import org.h2.command.Prepared;
@@ -113,8 +111,6 @@ import static org.apache.ignite.internal.processors.cache.distributed.dht.topolo
import static org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState.OWNING;
import static org.apache.ignite.internal.processors.query.h2.opt.GridH2QueryType.MAP;
import static org.apache.ignite.internal.processors.query.h2.opt.GridH2QueryType.REPLICATED;
-import static org.apache.ignite.internal.processors.query.h2.opt.join.DistributedJoinMode.OFF;
-import static org.apache.ignite.internal.processors.query.h2.opt.join.DistributedJoinMode.distributedJoinMode;
import static org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2QueryRequest.isDataPageScanEnabled;
import static org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2ValueMessageFactory.toMessages;
@@ -334,7 +330,18 @@ public class GridMapQueryExecutor {
if (F.isEmpty(cacheIds))
return null;
- Collection<Integer> partIds = wrap(explicitParts);
+ Collection<Integer> partIds;
+
+ if (explicitParts == null)
+ partIds = null;
+ else if (explicitParts.length == 0)
+ partIds = Collections.emptyList();
+ else {
+ partIds = new ArrayList<>(explicitParts.length);
+
+ for (int explicitPart : explicitParts)
+ partIds.add(explicitPart);
+ }
for (int i = 0; i < cacheIds.size(); i++) {
GridCacheContext<?, ?> cctx = ctx.cache().context().cacheContext(cacheIds.get(i));
@@ -515,44 +522,6 @@ public class GridMapQueryExecutor {
}
/**
- * @param ints Integers.
- * @return Collection wrapper.
- */
- private static Collection<Integer> wrap(final int[] ints) {
- if (ints == null)
- return null;
-
- if (ints.length == 0)
- return Collections.emptySet();
-
- return new AbstractCollection<Integer>() {
- @SuppressWarnings("NullableProblems")
- @Override public Iterator<Integer> iterator() {
- return new Iterator<Integer>() {
- /** */
- private int i = 0;
-
- @Override public boolean hasNext() {
- return i < ints.length;
- }
-
- @Override public Integer next() {
- return ints[i++];
- }
-
- @Override public void remove() {
- throw new UnsupportedOperationException();
- }
- };
- }
-
- @Override public int size() {
- return ints.length;
- }
- };
- }
-
- /**
* @param node Node.
* @param req Query request.
*/
@@ -563,9 +532,8 @@ public class GridMapQueryExecutor {
final int[] parts = qryParts == null ? partsMap == null ? null : partsMap.get(ctx.localNodeId()) : qryParts;
- final DistributedJoinMode joinMode = distributedJoinMode(
- req.isFlagSet(GridH2QueryRequest.FLAG_IS_LOCAL),
- req.isFlagSet(GridH2QueryRequest.FLAG_DISTRIBUTED_JOINS));
+ boolean distributedJoins = req.isFlagSet(GridH2QueryRequest.FLAG_DISTRIBUTED_JOINS);
+ boolean local = req.isFlagSet(GridH2QueryRequest.FLAG_IS_LOCAL);
final boolean enforceJoinOrder = req.isFlagSet(GridH2QueryRequest.FLAG_ENFORCE_JOIN_ORDER);
final boolean explain = req.isFlagSet(GridH2QueryRequest.FLAG_EXPLAIN);
@@ -660,7 +628,8 @@ public class GridMapQueryExecutor {
partsMap,
parts,
req.pageSize(),
- joinMode,
+ distributedJoins,
+ local,
enforceJoinOrder,
false, // Replicated is always false here (see condition above).
req.timeout(),
@@ -687,7 +656,8 @@ public class GridMapQueryExecutor {
partsMap,
parts,
req.pageSize(),
- joinMode,
+ distributedJoins,
+ local,
enforceJoinOrder,
false,
req.timeout(),
@@ -717,7 +687,8 @@ public class GridMapQueryExecutor {
partsMap,
parts,
req.pageSize(),
- joinMode,
+ distributedJoins,
+ local,
enforceJoinOrder,
replicated,
req.timeout(),
@@ -742,7 +713,8 @@ public class GridMapQueryExecutor {
* @param partsMap Partitions map for unstable topology.
* @param parts Explicit partitions for current node.
* @param pageSize Page size.
- * @param distributedJoinMode Query distributed join mode.
+ * @param distributeJoins Query distributed join mode.
+ * @param local Lcoal flag.
* @param lazy Streaming flag.
* @param mvccSnapshot MVCC snapshot.
* @param tx Transaction.
@@ -762,7 +734,8 @@ public class GridMapQueryExecutor {
final Map<UUID, int[]> partsMap,
final int[] parts,
final int pageSize,
- final DistributedJoinMode distributedJoinMode,
+ final boolean distributeJoins,
+ final boolean local,
final boolean enforceJoinOrder,
final boolean replicated,
final int timeout,
@@ -800,7 +773,8 @@ public class GridMapQueryExecutor {
partsMap,
parts,
pageSize,
- distributedJoinMode,
+ distributeJoins,
+ local,
enforceJoinOrder,
replicated,
timeout,
@@ -872,23 +846,37 @@ public class GridMapQueryExecutor {
throw new IllegalStateException();
// Prepare query context.
+ DistributedJoinContext distirbutedJoinCtx = null;
+
+ if (distributeJoins) {
+ distirbutedJoinCtx = new DistributedJoinContext(
+ local,
+ topVer,
+ partsMap,
+ node.id(),
+ reqId,
+ segmentId,
+ pageSize
+ );
+ }
+
+ GridH2QueryType qryTyp = replicated ? REPLICATED : MAP;
+
GridH2QueryContext qctx = new GridH2QueryContext(ctx.localNodeId(),
node.id(),
reqId,
segmentId,
- replicated ? REPLICATED : MAP)
- .filter(h2.backupFilter(topVer, parts))
- .partitionsMap(partsMap)
- .distributedJoinMode(distributedJoinMode)
- .pageSize(pageSize)
- .topologyVersion(topVer)
+ qryTyp,
+ h2.backupFilter(topVer, parts)
+ )
+ .distributedJoinContext(distirbutedJoinCtx)
.reservations(reserved)
.mvccSnapshot(mvccSnapshot)
.lazyWorker(worker);
Connection conn = h2.connections().connectionForThread().connection(schemaName);
- H2Utils.setupConnection(conn, distributedJoinMode != OFF, enforceJoinOrder);
+ H2Utils.setupConnection(conn, distributeJoins, enforceJoinOrder);
GridH2QueryContext.set(qctx);
@@ -897,7 +885,7 @@ public class GridMapQueryExecutor {
try {
if (nodeRess.cancelled(reqId)) {
- GridH2QueryContext.clear(ctx.localNodeId(), node.id(), reqId, qctx.type());
+ GridH2QueryContext.clear(ctx.localNodeId(), node.id(), reqId, qryTyp);
nodeRess.cancelRequest(reqId);
@@ -1091,7 +1079,7 @@ public class GridMapQueryExecutor {
if (qctx != null) { // No-op if already released.
GridH2QueryContext.clearThreadLocal();
- if (qctx.distributedJoinMode() == OFF)
+ if (qctx.distributedJoinContext() == null)
qctx.clearContext(false);
}
}
@@ -1100,7 +1088,7 @@ public class GridMapQueryExecutor {
* @param node Node.
* @param req DML request.
*/
- private void onDmlRequest(final ClusterNode node, final GridH2DmlRequest req) throws IgniteCheckedException {
+ private void onDmlRequest(final ClusterNode node, final GridH2DmlRequest req) {
int[] parts = req.queryPartitions();
List<Integer> cacheIds = req.caches();
@@ -1388,7 +1376,7 @@ public class GridMapQueryExecutor {
GridQueryNextPageResponse msg = new GridQueryNextPageResponse(reqId, segmentId,
/*qry*/0, /*page*/0, /*allRows*/0, /*cols*/1,
- loc ? null : Collections.<Message>emptyList(),
+ loc ? null : Collections.emptyList(),
loc ? Collections.<Value[]>emptyList() : null,
false);
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
index 3170eae..8097800 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
@@ -108,7 +108,6 @@ import static org.apache.ignite.internal.processors.cache.mvcc.MvccUtils.mvccEna
import static org.apache.ignite.internal.processors.cache.mvcc.MvccUtils.tx;
import static org.apache.ignite.internal.processors.cache.query.GridCacheSqlQuery.EMPTY_PARAMS;
import static org.apache.ignite.internal.processors.query.h2.opt.GridH2QueryType.REDUCE;
-import static org.apache.ignite.internal.processors.query.h2.opt.join.DistributedJoinMode.OFF;
import static org.apache.ignite.internal.processors.query.h2.sql.GridSqlQuerySplitter.mergeTableIdentifier;
import static org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2QueryRequest.setDataPageScanEnabled;
@@ -760,8 +759,16 @@ public class GridReduceQueryExecutor {
H2Utils.setupConnection(r.connection(), false, enforceJoinOrder);
- GridH2QueryContext.set(new GridH2QueryContext(locNodeId, locNodeId, qryReqId, REDUCE)
- .pageSize(r.pageSize()).distributedJoinMode(OFF));
+ GridH2QueryContext qctx = new GridH2QueryContext(
+ locNodeId,
+ locNodeId,
+ qryReqId,
+ 0,
+ REDUCE,
+ null
+ );
+
+ GridH2QueryContext.set(qctx);
try {
if (qry.explain())