You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by av...@apache.org on 2017/03/01 14:32:58 UTC
[15/50] [abbrv] ignite git commit: Implemented.
Implemented.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/64ba13b0
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/64ba13b0
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/64ba13b0
Branch: refs/heads/master
Commit: 64ba13b0a3be6acbf7d629029b460a39c2e2b388
Parents: 4eac51c
Author: AMRepo <an...@gmail.com>
Authored: Mon Feb 20 21:24:29 2017 +0300
Committer: Andrey V. Mashenkov <an...@gmail.com>
Committed: Tue Feb 21 11:52:40 2017 +0300
----------------------------------------------------------------------
.../configuration/CacheConfiguration.java | 48 ++++
.../processors/cache/GridCacheProcessor.java | 3 +
.../processors/cache/IgniteCacheProxy.java | 6 +-
.../closure/GridClosureProcessor.java | 2 +-
.../processors/query/GridQueryIndexing.java | 27 +-
.../processors/query/GridQueryProcessor.java | 141 +++-------
.../messages/GridQueryNextPageRequest.java | 29 +-
.../messages/GridQueryNextPageResponse.java | 29 +-
.../cache/query/GridCacheTwoStepQuery.java | 17 ++
.../processors/query/h2/IgniteH2Indexing.java | 235 ++++++++++++++--
.../query/h2/opt/DistributedJoinMode.java | 51 ++++
.../query/h2/opt/GridH2IndexBase.java | 264 +++++++++++++-----
.../query/h2/opt/GridH2QueryContext.java | 84 ++++--
.../query/h2/opt/GridH2TreeIndex.java | 232 ++++++++++++----
.../query/h2/twostep/GridMapQueryExecutor.java | 227 +++++++++++----
.../query/h2/twostep/GridMergeIndex.java | 39 ++-
.../h2/twostep/GridReduceQueryExecutor.java | 69 +++--
.../h2/twostep/msg/GridH2IndexRangeRequest.java | 60 +++-
.../twostep/msg/GridH2IndexRangeResponse.java | 62 ++++-
.../h2/twostep/msg/GridH2QueryRequest.java | 5 +
.../query/IgniteSqlSegmentedIndexSelfTest.java | 263 ++++++++++++++++++
.../query/IgniteSqlSplitterSelfTest.java | 139 +++++++++-
.../h2/GridIndexingSpiAbstractSelfTest.java | 26 +-
.../FetchingQueryCursorStressTest.java | 277 +++++++++++++++++++
.../IgniteCacheQuerySelfTestSuite.java | 2 +
25 files changed, 1917 insertions(+), 420 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/64ba13b0/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java b/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java
index 0656dda..149f25a 100644
--- a/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java
+++ b/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java
@@ -223,6 +223,9 @@ public class CacheConfiguration<K, V> extends MutableConfiguration<K, V> {
/** Default threshold for concurrent loading of keys from {@link CacheStore}. */
public static final int DFLT_CONCURRENT_LOAD_ALL_THRESHOLD = 5;
+ /** Default SQL query parallelism level */
+ public static final int DFLT_SQL_QUERY_PARALLELISM_LVL = 1;
+
/** Cache name. */
private String name;
@@ -410,6 +413,9 @@ public class CacheConfiguration<K, V> extends MutableConfiguration<K, V> {
/** Query entities. */
private Collection<QueryEntity> qryEntities;
+ /** */
+ private int qryParallelism = DFLT_SQL_QUERY_PARALLELISM_LVL;
+
/** Empty constructor (all values are initialized to their defaults). */
public CacheConfiguration() {
/* No-op. */
@@ -462,6 +468,7 @@ public class CacheConfiguration<K, V> extends MutableConfiguration<K, V> {
interceptor = cc.getInterceptor();
invalidate = cc.isInvalidate();
isReadThrough = cc.isReadThrough();
+ qryParallelism = cc.getQueryParallelism();
isWriteThrough = cc.isWriteThrough();
storeKeepBinary = cc.isStoreKeepBinary() != null ? cc.isStoreKeepBinary() : DFLT_STORE_KEEP_BINARY;
listenerConfigurations = cc.listenerConfigurations;
@@ -2108,6 +2115,47 @@ public class CacheConfiguration<K, V> extends MutableConfiguration<K, V> {
}
/**
+ * Defines a hint to query execution engine on desired degree of parallelism within a single node.
+ * Query executor may or may not use this hint depending on estimated query costs. Query executor may define
+ * certain restrictions on parallelism depending on query type and/or cache type.
+ * <p>
+ * As of {@code Apache Ignite 1.9} this hint is only supported for SQL queries with the following restrictions:
+ * <ul>
+ * <li>Hint cannot be used for {@code REPLICATED} cache, exception is thrown otherwise</li>
+ * <li>All caches participating in query must have the same degree of parallelism, exception is thrown
+ * otherwise</li>
+ * </ul>
+ * These restrictions will be removed in future versions of Apache Ignite.
+ * <p>
+ * Defaults to {@code 1}.
+ */
+ public int getQueryParallelism() {
+ return qryParallelism;
+ }
+
+ /**
+ * Defines a hint to query execution engine on desired degree of parallelism within a single node.
+ * Query executor may or may not use this hint depending on estimated query costs. Query executor may define
+ * certain restrictions on parallelism depending on query type and/or cache type.
+ * <p>
+ * As of {@code Apache Ignite 1.9} this hint is only supported for SQL queries with the following restrictions:
+ * <ul>
+ * <li>Hint cannot be used for {@code REPLICATED} cache, exception is thrown otherwise</li>
+ * <li>All caches participating in query must have the same degree of parallelism, exception is thrown
+ * otherwise</li>
+ * </ul>
+ * These restrictions will be removed in future versions of Apache Ignite.
+ *
+ * @param qryParallelism Query parallelizm level.
+ * @return {@code this} for chaining.
+ */
+ public CacheConfiguration<K,V> setQueryParallelism(int qryParallelism) {
+ this.qryParallelism = qryParallelism;
+
+ return this;
+ }
+
+ /**
* Gets topology validator.
* <p>
* See {@link TopologyValidator} for details.
http://git-wip-us.apache.org/repos/asf/ignite/blob/64ba13b0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
index 7093403..c3e3f3b 100755
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
@@ -269,6 +269,9 @@ public class GridCacheProcessor extends GridProcessorAdapter {
if (cfg.getCacheMode() == REPLICATED)
cfg.setBackups(Integer.MAX_VALUE);
+ if( cfg.getQueryParallelism() > 1 && cfg.getCacheMode() != PARTITIONED)
+ throw new IgniteCheckedException("Cache index segmentation is supported for PARTITIONED mode only.");
+
if (cfg.getAffinityMapper() == null)
cfg.setAffinityMapper(cacheObjCtx.defaultAffMapper());
http://git-wip-us.apache.org/repos/asf/ignite/blob/64ba13b0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
index 1381670..f806d05 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
@@ -729,12 +729,8 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V
final SqlQuery p = (SqlQuery)qry;
if (isReplicatedDataNode() || ctx.isLocal() || qry.isLocal())
- return (QueryCursor<R>)new QueryCursorImpl<>(new Iterable<Cache.Entry<K, V>>() {
- @Override public Iterator<Cache.Entry<K, V>> iterator() {
- return ctx.kernalContext().query().queryLocal(ctx, p,
+ return (QueryCursor<R>)ctx.kernalContext().query().queryLocal(ctx, p,
opCtxCall != null && opCtxCall.isKeepBinary());
- }
- });
return (QueryCursor<R>)ctx.kernalContext().query().queryTwoStep(ctx, p);
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/64ba13b0/modules/core/src/main/java/org/apache/ignite/internal/processors/closure/GridClosureProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/closure/GridClosureProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/closure/GridClosureProcessor.java
index 20fb6a0..61ed8a0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/closure/GridClosureProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/closure/GridClosureProcessor.java
@@ -902,7 +902,7 @@ public class GridClosureProcessor extends GridProcessorAdapter {
* @return Future.
* @throws IgniteCheckedException Thrown in case of any errors.
*/
- private <R> IgniteInternalFuture<R> callLocal(@Nullable final Callable<R> c, byte plc)
+ public <R> IgniteInternalFuture<R> callLocal(@Nullable final Callable<R> c, byte plc)
throws IgniteCheckedException {
if (c == null)
return new GridFinishedFuture<>();
http://git-wip-us.apache.org/repos/asf/ignite/blob/64ba13b0/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java
index ca04724..37f0ade 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java
@@ -84,35 +84,26 @@ public interface GridQueryIndexing {
/**
* Queries individual fields (generally used by JDBC drivers).
*
- * @param spaceName Space name.
+ * @param cctx Cache context.
* @param qry Query.
- * @param params Query parameters.
* @param filter Space name and key filter.
- * @param enforceJoinOrder Enforce join order of tables in the query.
- * @param timeout Query timeout in milliseconds.
* @param cancel Query cancel.
- * @return Query result.
- * @throws IgniteCheckedException If failed.
+ * @return Cursor.
*/
- public GridQueryFieldsResult queryLocalSqlFields(@Nullable String spaceName, String qry,
- Collection<Object> params, IndexingQueryFilter filter, boolean enforceJoinOrder, int timeout,
- GridQueryCancel cancel) throws IgniteCheckedException;
+ public <K, V> QueryCursor<List<?>> queryLocalSqlFields(GridCacheContext<?, ?> cctx, SqlFieldsQuery qry,
+ IndexingQueryFilter filter, GridQueryCancel cancel) throws IgniteCheckedException;
/**
* Executes regular query.
*
- * @param spaceName Space name.
+ * @param cctx Cache context.
* @param qry Query.
- * @param alias Table alias used in Query.
- * @param params Query parameters.
- * @param type Query return type.
* @param filter Space name and key filter.
- * @return Queried rows.
- * @throws IgniteCheckedException If failed.
+ * @param keepBinary Keep binary flag.
+ * @return Cursor.
*/
- public <K, V> GridCloseableIterator<IgniteBiTuple<K, V>> queryLocalSql(@Nullable String spaceName, String qry,
- String alias, Collection<Object> params, GridQueryTypeDescriptor type, IndexingQueryFilter filter)
- throws IgniteCheckedException;
+ public <K, V> QueryCursor<Cache.Entry<K,V>> queryLocalSql(GridCacheContext<?, ?> cctx, SqlQuery qry,
+ IndexingQueryFilter filter, boolean keepBinary) throws IgniteCheckedException;
/**
* Executes text query.
http://git-wip-us.apache.org/repos/asf/ignite/blob/64ba13b0/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
index ee9224b..85744d5 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
@@ -754,42 +754,6 @@ public class GridQueryProcessor extends GridProcessorAdapter {
INDEXING.module() + " to classpath or moving it from 'optional' to 'libs' folder).");
}
- /**
- * @param space Space.
- * @param clause Clause.
- * @param params Parameters collection.
- * @param resType Result type.
- * @param filters Filters.
- * @return Key/value rows.
- * @throws IgniteCheckedException If failed.
- */
- @SuppressWarnings("unchecked")
- public <K, V> GridCloseableIterator<IgniteBiTuple<K, V>> query(final String space, final String clause,
- final Collection<Object> params, final String resType, final IndexingQueryFilter filters)
- throws IgniteCheckedException {
- checkEnabled();
-
- if (!busyLock.enterBusy())
- throw new IllegalStateException("Failed to execute query (grid is stopping).");
-
- try {
- final GridCacheContext<?, ?> cctx = ctx.cache().internalCache(space).context();
-
- return executeQuery(GridCacheQueryType.SQL_FIELDS, clause, cctx, new IgniteOutClosureX<GridCloseableIterator<IgniteBiTuple<K, V>>>() {
- @Override public GridCloseableIterator<IgniteBiTuple<K, V>> applyx() throws IgniteCheckedException {
- TypeDescriptor type = typesByName.get(new TypeName(space, resType));
-
- if (type == null || !type.registered())
- throw new CacheException("Failed to find SQL table for type: " + resType);
-
- return idx.queryLocalSql(space, clause, null, params, type, filters);
- }
- }, false);
- }
- finally {
- busyLock.leaveBusy();
- }
- }
/**
* @param cctx Cache context.
@@ -829,11 +793,12 @@ public class GridQueryProcessor extends GridProcessorAdapter {
throw new IllegalStateException("Failed to execute query (grid is stopping).");
try {
- return executeQuery(GridCacheQueryType.SQL, qry.getSql(), cctx, new IgniteOutClosureX<QueryCursor<Cache.Entry<K, V>>>() {
- @Override public QueryCursor<Cache.Entry<K, V>> applyx() throws IgniteCheckedException {
- return idx.queryTwoStep(cctx, qry);
- }
- }, true);
+ return executeQuery(GridCacheQueryType.SQL, qry.getSql(), cctx,
+ new IgniteOutClosureX<QueryCursor<Cache.Entry<K, V>>>() {
+ @Override public QueryCursor<Cache.Entry<K, V>> applyx() throws IgniteCheckedException {
+ return idx.queryTwoStep(cctx, qry);
+ }
+ }, true);
}
catch (IgniteCheckedException e) {
throw new IgniteException(e);
@@ -849,7 +814,7 @@ public class GridQueryProcessor extends GridProcessorAdapter {
* @param keepBinary Keep binary flag.
* @return Cursor.
*/
- public <K, V> Iterator<Cache.Entry<K, V>> queryLocal(
+ public <K, V> QueryCursor<Cache.Entry<K, V>> queryLocal(
final GridCacheContext<?, ?> cctx,
final SqlQuery qry,
final boolean keepBinary
@@ -859,54 +824,25 @@ public class GridQueryProcessor extends GridProcessorAdapter {
try {
return executeQuery(GridCacheQueryType.SQL, qry.getSql(), cctx,
- new IgniteOutClosureX<Iterator<Cache.Entry<K, V>>>() {
- @Override public Iterator<Cache.Entry<K, V>> applyx() throws IgniteCheckedException {
- String space = cctx.name();
+ new IgniteOutClosureX<QueryCursor<Cache.Entry<K, V>>>() {
+ @Override public QueryCursor<Cache.Entry<K, V>> applyx() throws IgniteCheckedException {
String type = qry.getType();
- String sqlQry = qry.getSql();
- Object[] params = qry.getArgs();
- TypeDescriptor typeDesc = typesByName.get(
- new TypeName(
- space,
+ GridQueryProcessor.TypeDescriptor typeDesc = typesByName.get(
+ new GridQueryProcessor.TypeName(
+ cctx.name(),
type));
if (typeDesc == null || !typeDesc.registered())
throw new CacheException("Failed to find SQL table for type: " + type);
- final GridCloseableIterator<IgniteBiTuple<K, V>> i = idx.queryLocalSql(
- space,
- qry.getSql(),
- qry.getAlias(),
- F.asList(params),
- typeDesc,
- idx.backupFilter(requestTopVer.get(), null));
+ qry.setType(typeDesc.name());
sendQueryExecutedEvent(
- sqlQry,
- params);
-
- return new ClIter<Cache.Entry<K, V>>() {
- @Override public void close() throws Exception {
- i.close();
- }
-
- @Override public boolean hasNext() {
- return i.hasNext();
- }
-
- @Override public Cache.Entry<K, V> next() {
- IgniteBiTuple<K, V> t = i.next();
-
- return new CacheEntryImpl<>(
- (K)cctx.unwrapBinaryIfNeeded(t.getKey(), keepBinary, false),
- (V)cctx.unwrapBinaryIfNeeded(t.getValue(), keepBinary, false));
- }
-
- @Override public void remove() {
- throw new UnsupportedOperationException();
- }
- };
+ qry.getSql(),
+ qry.getArgs());
+
+ return idx.queryLocalSql(cctx, qry, idx.backupFilter(requestTopVer.get(), null), keepBinary);
}
}, true);
}
@@ -994,13 +930,6 @@ public class GridQueryProcessor extends GridProcessorAdapter {
}
/**
- * Closeable iterator.
- */
- private interface ClIter<X> extends AutoCloseable, Iterator<X> {
- // No-op.
- }
-
- /**
* @param cctx Cache context.
* @param qry Query.
* @return Iterator.
@@ -1010,34 +939,26 @@ public class GridQueryProcessor extends GridProcessorAdapter {
throw new IllegalStateException("Failed to execute query (grid is stopping).");
try {
- final boolean keepBinary = cctx.keepBinary();
-
return executeQuery(GridCacheQueryType.SQL_FIELDS, qry.getSql(), cctx, new IgniteOutClosureX<QueryCursor<List<?>>>() {
@Override public QueryCursor<List<?>> applyx() throws IgniteCheckedException {
- final String space = cctx.name();
- final String sql = qry.getSql();
- final Object[] args = qry.getArgs();
- final GridQueryCancel cancel = new GridQueryCancel();
+ GridQueryCancel cancel = new GridQueryCancel();
- final GridQueryFieldsResult res = idx.queryLocalSqlFields(space, sql, F.asList(args),
- idx.backupFilter(requestTopVer.get(), null), qry.isEnforceJoinOrder(), qry.getTimeout(), cancel);
+ final QueryCursor<List<?>> cursor = idx.queryLocalSqlFields(cctx, qry,
+ idx.backupFilter(requestTopVer.get(), null), cancel);
- QueryCursorImpl<List<?>> cursor = new QueryCursorImpl<>(new Iterable<List<?>>() {
+ return new QueryCursorImpl<List<?>>(new Iterable<List<?>>() {
@Override public Iterator<List<?>> iterator() {
- try {
- sendQueryExecutedEvent(sql, args);
-
- return new GridQueryCacheObjectsIterator(res.iterator(), cctx, keepBinary);
- }
- catch (IgniteCheckedException e) {
- throw new IgniteException(e);
- }
- }
- }, cancel);
-
- cursor.fieldsMeta(res.metaData());
+ sendQueryExecutedEvent(qry.getSql(), qry.getArgs());
- return cursor;
+ return cursor.iterator();
+ }
+ }, cancel) {
+ @Override public List<GridQueryFieldMetadata> fieldsMeta() {
+ if (cursor instanceof QueryCursorImpl)
+ return ((QueryCursorImpl)cursor).fieldsMeta();
+ return super.fieldsMeta();
+ }
+ };
}
}, true);
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/64ba13b0/modules/core/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryNextPageRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryNextPageRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryNextPageRequest.java
index 1feff5a..acea084 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryNextPageRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryNextPageRequest.java
@@ -17,7 +17,6 @@
package org.apache.ignite.internal.processors.query.h2.twostep.messages;
-
import java.nio.ByteBuffer;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.plugin.extensions.communication.Message;
@@ -35,6 +34,9 @@ public class GridQueryNextPageRequest implements Message {
private long qryReqId;
/** */
+ private int segmentId;
+
+ /** */
private int qry;
/** */
@@ -50,11 +52,13 @@ public class GridQueryNextPageRequest implements Message {
/**
* @param qryReqId Query request ID.
* @param qry Query.
+ * @param segmentId Index segment ID.
* @param pageSize Page size.
*/
- public GridQueryNextPageRequest(long qryReqId, int qry, int pageSize) {
+ public GridQueryNextPageRequest(long qryReqId, int qry, int segmentId, int pageSize) {
this.qryReqId = qryReqId;
this.qry = qry;
+ this.segmentId = segmentId;
this.pageSize = pageSize;
}
@@ -72,6 +76,11 @@ public class GridQueryNextPageRequest implements Message {
return qry;
}
+ /** @return Index segment ID */
+ public int segmentId() {
+ return segmentId;
+ }
+
/**
* @return Page size.
*/
@@ -119,6 +128,12 @@ public class GridQueryNextPageRequest implements Message {
writer.incrementState();
+ case 3:
+ if (!writer.writeInt("segmentId", segmentId))
+ return false;
+
+ writer.incrementState();
+
}
return true;
@@ -156,6 +171,14 @@ public class GridQueryNextPageRequest implements Message {
reader.incrementState();
+ case 3:
+ segmentId = reader.readInt("segmentId");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
}
return reader.afterMessageRead(GridQueryNextPageRequest.class);
@@ -168,6 +191,6 @@ public class GridQueryNextPageRequest implements Message {
/** {@inheritDoc} */
@Override public byte fieldsCount() {
- return 3;
+ return 4;
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/64ba13b0/modules/core/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryNextPageResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryNextPageResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryNextPageResponse.java
index 4889069..e85c00b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryNextPageResponse.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryNextPageResponse.java
@@ -42,6 +42,9 @@ public class GridQueryNextPageResponse implements Message {
private long qryReqId;
/** */
+ private int segmentId;
+
+ /** */
private int qry;
/** */
@@ -73,6 +76,7 @@ public class GridQueryNextPageResponse implements Message {
/**
* @param qryReqId Query request ID.
+ * @param segmentId Index segment ID.
* @param qry Query.
* @param page Page.
* @param allRows All rows count.
@@ -80,12 +84,13 @@ public class GridQueryNextPageResponse implements Message {
* @param vals Values for rows in this page added sequentially.
* @param plainRows Not marshalled rows for local node.
*/
- public GridQueryNextPageResponse(long qryReqId, int qry, int page, int allRows, int cols,
+ public GridQueryNextPageResponse(long qryReqId, int segmentId, int qry, int page, int allRows, int cols,
Collection<Message> vals, Collection<?> plainRows) {
assert vals != null ^ plainRows != null;
assert cols > 0 : cols;
this.qryReqId = qryReqId;
+ this.segmentId = segmentId;
this.qry = qry;
this.page = page;
this.allRows = allRows;
@@ -102,6 +107,13 @@ public class GridQueryNextPageResponse implements Message {
}
/**
+ * @return Index segment ID.
+ */
+ public int segmentId() {
+ return segmentId;
+ }
+
+ /**
* @return Query.
*/
public int query() {
@@ -202,6 +214,12 @@ public class GridQueryNextPageResponse implements Message {
writer.incrementState();
+ case 7:
+ if (!writer.writeInt("segmentId", segmentId))
+ return false;
+
+ writer.incrementState();
+
}
return true;
@@ -271,6 +289,13 @@ public class GridQueryNextPageResponse implements Message {
reader.incrementState();
+ case 7:
+ segmentId = reader.readInt("segmentId");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
}
return reader.afterMessageRead(GridQueryNextPageResponse.class);
@@ -283,7 +308,7 @@ public class GridQueryNextPageResponse implements Message {
/** {@inheritDoc} */
@Override public byte fieldsCount() {
- return 7;
+ return 8;
}
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/64ba13b0/modules/indexing/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheTwoStepQuery.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheTwoStepQuery.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheTwoStepQuery.java
index f53936f..c127eeb 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheTwoStepQuery.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheTwoStepQuery.java
@@ -69,6 +69,9 @@ public class GridCacheTwoStepQuery {
/** */
private List<Integer> extraCaches;
+ /** */
+ private boolean local;
+
/**
* @param originalSql Original query SQL.
* @param schemas Schema names in query.
@@ -229,6 +232,20 @@ public class GridCacheTwoStepQuery {
}
/**
+ * @return {@code True} If query is local.
+ */
+ public boolean isLocal() {
+ return local;
+ }
+
+ /**
+ * @param local Local query flag.
+ */
+ public void local(boolean local) {
+ this.local = local;
+ }
+
+ /**
* @param args New arguments to copy with.
* @return Copy.
*/
http://git-wip-us.apache.org/repos/asf/ignite/blob/64ba13b0/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
index e4b0c1f..2f40d87 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
@@ -77,11 +77,13 @@ import org.apache.ignite.internal.processors.cache.CacheObjectContext;
import org.apache.ignite.internal.processors.cache.GridCacheAdapter;
import org.apache.ignite.internal.processors.cache.GridCacheAffinityManager;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
+import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
import org.apache.ignite.internal.processors.cache.IgniteInternalCache;
import org.apache.ignite.internal.processors.cache.QueryCursorImpl;
import org.apache.ignite.internal.processors.cache.query.GridCacheQueryMarshallable;
import org.apache.ignite.internal.processors.cache.query.GridCacheTwoStepQuery;
import org.apache.ignite.internal.processors.cache.query.IgniteQueryErrorCode;
+import org.apache.ignite.internal.processors.query.GridQueryCacheObjectsIterator;
import org.apache.ignite.internal.processors.query.GridRunningQueryInfo;
import org.apache.ignite.internal.processors.query.GridQueryCancel;
import org.apache.ignite.internal.processors.query.GridQueryFieldMetadata;
@@ -92,6 +94,7 @@ import org.apache.ignite.internal.processors.query.GridQueryIndexing;
import org.apache.ignite.internal.processors.query.GridQueryProperty;
import org.apache.ignite.internal.processors.query.GridQueryTypeDescriptor;
import org.apache.ignite.internal.processors.query.IgniteSQLException;
+import org.apache.ignite.internal.processors.query.h2.opt.DistributedJoinMode;
import org.apache.ignite.internal.processors.query.h2.opt.GridH2DefaultTableEngine;
import org.apache.ignite.internal.processors.query.h2.opt.GridH2KeyValueRowOffheap;
import org.apache.ignite.internal.processors.query.h2.opt.GridH2KeyValueRowOnheap;
@@ -187,6 +190,8 @@ import static org.apache.ignite.internal.processors.cache.query.GridCacheQueryTy
import static org.apache.ignite.internal.processors.query.GridQueryIndexType.FULLTEXT;
import static org.apache.ignite.internal.processors.query.GridQueryIndexType.GEO_SPATIAL;
import static org.apache.ignite.internal.processors.query.GridQueryIndexType.SORTED;
+import static org.apache.ignite.internal.processors.query.h2.opt.DistributedJoinMode.OFF;
+import static org.apache.ignite.internal.processors.query.h2.opt.DistributedJoinMode.distributedJoinMode;
import static org.apache.ignite.internal.processors.query.h2.opt.GridH2AbstractKeyValueRow.KEY_COL;
import static org.apache.ignite.internal.processors.query.h2.opt.GridH2QueryType.LOCAL;
import static org.apache.ignite.internal.processors.query.h2.opt.GridH2QueryType.PREPARE;
@@ -810,10 +815,22 @@ public class IgniteH2Indexing implements GridQueryIndexing {
removeTable(tbl);
}
- /** {@inheritDoc} */
+ /**
+ * Queries individual fields (generally used by JDBC drivers).
+ *
+ * @param spaceName Space name.
+ * @param qry Query.
+ * @param params Query parameters.
+ * @param filter Space name and key filter.
+ * @param enforceJoinOrder Enforce join order of tables in the query.
+ * @param timeout Query timeout in milliseconds.
+ * @param cancel Query cancel.
+ * @return Query result.
+ * @throws IgniteCheckedException If failed.
+ */
@SuppressWarnings("unchecked")
- @Override public GridQueryFieldsResult queryLocalSqlFields(@Nullable final String spaceName, final String qry,
- @Nullable final Collection<Object> params, final IndexingQueryFilter filters, boolean enforceJoinOrder,
+ public GridQueryFieldsResult queryLocalSqlFields(@Nullable final String spaceName, final String qry,
+ @Nullable final Collection<Object> params, final IndexingQueryFilter filter, boolean enforceJoinOrder,
final int timeout, final GridQueryCancel cancel)
throws IgniteCheckedException {
final Connection conn = connectionForSpace(spaceName);
@@ -833,7 +850,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
fldsQry.setEnforceJoinOrder(enforceJoinOrder);
fldsQry.setTimeout(timeout, TimeUnit.MILLISECONDS);
- return dmlProc.updateLocalSqlFields(spaceName, stmt, fldsQry, filters, cancel);
+ return dmlProc.updateLocalSqlFields(spaceName, stmt, fldsQry, filter, cancel);
}
List<GridQueryFieldMetadata> meta;
@@ -846,7 +863,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
}
final GridH2QueryContext ctx = new GridH2QueryContext(nodeId, nodeId, 0, LOCAL)
- .filter(filters).distributedJoins(false);
+ .filter(filter).distributedJoinMode(OFF);
return new GridQueryFieldsResultAdapter(meta, null) {
@Override public GridCloseableIterator<List<?>> iterator() throws IgniteCheckedException {
@@ -1099,14 +1116,113 @@ public class IgniteH2Indexing implements GridQueryIndexing {
}
/** {@inheritDoc} */
- @SuppressWarnings("unchecked")
- @Override public <K, V> GridCloseableIterator<IgniteBiTuple<K, V>> queryLocalSql(@Nullable String spaceName,
- final String qry, String alias, @Nullable final Collection<Object> params, GridQueryTypeDescriptor type,
- final IndexingQueryFilter filter) throws IgniteCheckedException {
- final TableDescriptor tbl = tableDescriptor(spaceName, type);
+ @Override public <K, V> QueryCursor<List<?>> queryLocalSqlFields(final GridCacheContext<?, ?> cctx,
+ final SqlFieldsQuery qry, final IndexingQueryFilter filter, final GridQueryCancel cancel)
+ throws IgniteCheckedException {
+
+ if (cctx.config().getQueryParallelism() > 1) {
+ qry.setDistributedJoins(true);
+
+ assert qry.isLocal();
+
+ return queryTwoStep(cctx, qry, cancel);
+ }
+ else {
+ final boolean keepBinary = cctx.keepBinary();
+
+ final String space = cctx.name();
+ final String sql = qry.getSql();
+ final Object[] args = qry.getArgs();
+
+ final GridQueryFieldsResult res = queryLocalSqlFields(space, sql, F.asList(args), filter,
+ qry.isEnforceJoinOrder(), qry.getTimeout(), cancel);
+
+ QueryCursorImpl<List<?>> cursor = new QueryCursorImpl<>(new Iterable<List<?>>() {
+ @Override public Iterator<List<?>> iterator() {
+ try {
+ return new GridQueryCacheObjectsIterator(res.iterator(), cctx, keepBinary);
+ }
+ catch (IgniteCheckedException e) {
+ throw new IgniteException(e);
+ }
+ }
+ }, cancel);
+
+ cursor.fieldsMeta(res.metaData());
+
+ return cursor;
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public <K, V> QueryCursor<Cache.Entry<K,V>> queryLocalSql(final GridCacheContext<?, ?> cctx,
+ final SqlQuery qry, final IndexingQueryFilter filter, final boolean keepBinary) throws IgniteCheckedException {
+ if (cctx.config().getQueryParallelism() > 1) {
+ qry.setDistributedJoins(true);
+
+ assert qry.isLocal();
+
+ return queryTwoStep(cctx, qry);
+ }
+ else {
+ String space = cctx.name();
+ String type = qry.getType();
+ String sqlQry = qry.getSql();
+ String alias = qry.getAlias();
+ Object[] params = qry.getArgs();
+
+ GridQueryCancel cancel = new GridQueryCancel();
+
+ final GridCloseableIterator<IgniteBiTuple<K, V>> i = queryLocalSql(space, sqlQry, alias,
+ F.asList(params), type, filter, cancel);
+
+ return new QueryCursorImpl<Cache.Entry<K, V>>(new Iterable<Cache.Entry<K, V>>() {
+ @Override public Iterator<Cache.Entry<K, V>> iterator() {
+ return new ClIter<Cache.Entry<K, V>>() {
+ @Override public void close() throws Exception {
+ i.close();
+ }
+
+ @Override public boolean hasNext() {
+ return i.hasNext();
+ }
+
+ @Override public Cache.Entry<K, V> next() {
+ IgniteBiTuple<K, V> t = i.next();
+
+ return new CacheEntryImpl<>(
+ (K)cctx.unwrapBinaryIfNeeded(t.get1(), keepBinary, false),
+ (V)cctx.unwrapBinaryIfNeeded(t.get2(), keepBinary, false));
+ }
+
+ @Override public void remove() {
+ throw new UnsupportedOperationException();
+ }
+ };
+ }
+ }, cancel);
+ }
+ }
+
+ /**
+ * Executes regular query.
+ *
+ * @param spaceName Space name.
+ * @param qry Query.
+ * @param alias Table alias.
+ * @param params Query parameters.
+ * @param type Query return type.
+ * @param filter Space name and key filter.
+ * @return Queried rows.
+ * @throws IgniteCheckedException If failed.
+ */
+ public <K, V> GridCloseableIterator<IgniteBiTuple<K, V>> queryLocalSql(@Nullable String spaceName,
+ final String qry, String alias, @Nullable final Collection<Object> params, String type,
+ final IndexingQueryFilter filter, GridQueryCancel cancel) throws IgniteCheckedException {
+ final TableDescriptor tbl = tableDescriptor(type, spaceName);
if (tbl == null)
- throw new IgniteSQLException("Failed to find SQL table for type: " + type.name(),
+ throw new IgniteSQLException("Failed to find SQL table for type: " + type,
IgniteQueryErrorCode.TABLE_NOT_FOUND);
String sql = generateQuery(qry, alias, tbl);
@@ -1115,7 +1231,8 @@ public class IgniteH2Indexing implements GridQueryIndexing {
setupConnection(conn, false, false);
- GridH2QueryContext.set(new GridH2QueryContext(nodeId, nodeId, 0, LOCAL).filter(filter).distributedJoins(false));
+ GridH2QueryContext.set(new GridH2QueryContext(nodeId, nodeId, 0, LOCAL).filter(filter)
+ .distributedJoinMode(OFF));
GridRunningQueryInfo run = new GridRunningQueryInfo(qryIdGen.incrementAndGet(), qry, SQL, spaceName,
U.currentTimeMillis(), null, true);
@@ -1123,7 +1240,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
runs.put(run.id(), run);
try {
- ResultSet rs = executeSqlQueryWithTimer(spaceName, conn, sql, params, true, 0, null);
+ ResultSet rs = executeSqlQueryWithTimer(spaceName, conn, sql, params, true, 0, cancel);
return new KeyValIterator(rs);
}
@@ -1178,8 +1295,9 @@ public class IgniteH2Indexing implements GridQueryIndexing {
fqry.setArgs(qry.getArgs());
fqry.setPageSize(qry.getPageSize());
fqry.setDistributedJoins(qry.isDistributedJoins());
+ fqry.setLocal(qry.isLocal());
- if(qry.getTimeout() > 0)
+ if (qry.getTimeout() > 0)
fqry.setTimeout(qry.getTimeout(), TimeUnit.MILLISECONDS);
final QueryCursor<List<?>> res = queryTwoStep(cctx, fqry, null);
@@ -1234,11 +1352,13 @@ public class IgniteH2Indexing implements GridQueryIndexing {
final boolean distributedJoins = qry.isDistributedJoins() && cctx.isPartitioned();
final boolean grpByCollocated = qry.isCollocated();
+ final DistributedJoinMode distributedJoinMode = distributedJoinMode(qry.isLocal(), distributedJoins);
+
GridCacheTwoStepQuery twoStepQry;
List<GridQueryFieldMetadata> meta;
final TwoStepCachedQueryKey cachedQryKey = new TwoStepCachedQueryKey(space, sqlQry, grpByCollocated,
- distributedJoins, enforceJoinOrder);
+ distributedJoins, enforceJoinOrder, qry.isLocal());
TwoStepCachedQuery cachedQry = twoStepCache.get(cachedQryKey);
if (cachedQry != null) {
@@ -1251,7 +1371,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
setupConnection(c, distributedJoins, enforceJoinOrder);
GridH2QueryContext.set(new GridH2QueryContext(locNodeId, locNodeId, 0, PREPARE)
- .distributedJoins(distributedJoins));
+ .distributedJoinMode(distributedJoinMode));
PreparedStatement stmt;
@@ -1286,9 +1406,9 @@ public class IgniteH2Indexing implements GridQueryIndexing {
GridH2QueryContext.clearThreadLocal();
}
- Prepared prepared = GridSqlQueryParser.prepared((JdbcPreparedStatement) stmt);
+ Prepared prepared = GridSqlQueryParser.prepared((JdbcPreparedStatement)stmt);
- if (qry instanceof JdbcSqlFieldsQuery && ((JdbcSqlFieldsQuery) qry).isQuery() != prepared.isQuery())
+ if (qry instanceof JdbcSqlFieldsQuery && ((JdbcSqlFieldsQuery)qry).isQuery() != prepared.isQuery())
throw new IgniteSQLException("Given statement type does not match that declared by JDBC driver",
IgniteQueryErrorCode.STMT_TYPE_MISMATCH);
@@ -1341,8 +1461,12 @@ public class IgniteH2Indexing implements GridQueryIndexing {
extraCaches = null;
}
+ //Prohibit usage indices with different numbers of segments in same query.
+ checkCacheIndexSegmentation(caches);
+
twoStepQry.caches(caches);
twoStepQry.extraCaches(extraCaches);
+ twoStepQry.local(qry.isLocal());
meta = meta(stmt.getMetaData());
}
@@ -1380,6 +1504,32 @@ public class IgniteH2Indexing implements GridQueryIndexing {
}
/**
+ * @throws IllegalStateException if segmented indices used with non-segmented indices.
+ */
+ private void checkCacheIndexSegmentation(List<Integer> caches) {
+ if (caches.isEmpty())
+ return; //Nnothing to check
+
+ GridCacheSharedContext sharedContext = ctx.cache().context();
+
+ int expectedParallelism = 0;
+
+ for (int i = 0; i < caches.size(); i++) {
+ GridCacheContext cctx = sharedContext.cacheContext(caches.get(i));
+
+ assert cctx != null;
+
+ if(!cctx.isPartitioned())
+ continue;
+
+ if(expectedParallelism == 0)
+ expectedParallelism = cctx.config().getQueryParallelism();
+ else if (expectedParallelism != 0 && cctx.config().getQueryParallelism() != expectedParallelism)
+ throw new IllegalStateException("Using indexes with different parallelism levels in same query is forbidden.");
+ }
+ }
+
+ /**
* Prepares statement for query.
*
* @param qry Query string.
@@ -1669,7 +1819,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
private void cleanupStatementCache() {
long cur = U.currentTimeMillis();
- for(Iterator<Map.Entry<Thread, StatementCache>> it = stmtCache.entrySet().iterator(); it.hasNext(); ) {
+ for (Iterator<Map.Entry<Thread, StatementCache>> it = stmtCache.entrySet().iterator(); it.hasNext(); ) {
Map.Entry<Thread, StatementCache> entry = it.next();
Thread t = entry.getKey();
@@ -1877,6 +2027,9 @@ public class IgniteH2Indexing implements GridQueryIndexing {
for (ClusterNode node : nodes) {
if (node.isLocal()) {
+ if (locNode != null)
+ throw new IllegalStateException();
+
locNode = node;
continue;
@@ -2163,23 +2316,29 @@ public class IgniteH2Indexing implements GridQueryIndexing {
/** */
private final boolean enforceJoinOrder;
+ /** */
+ private final boolean isLocal;
+
/**
* @param space Space.
* @param sql Sql.
* @param grpByCollocated Collocated GROUP BY.
* @param distributedJoins Distributed joins enabled.
* @param enforceJoinOrder Enforce join order of tables.
+ * @param isLocal Query is local flag.
*/
private TwoStepCachedQueryKey(String space,
String sql,
boolean grpByCollocated,
boolean distributedJoins,
- boolean enforceJoinOrder) {
+ boolean enforceJoinOrder,
+ boolean isLocal) {
this.space = space;
this.sql = sql;
this.grpByCollocated = grpByCollocated;
this.distributedJoins = distributedJoins;
this.enforceJoinOrder = enforceJoinOrder;
+ this.isLocal = isLocal;
}
/** {@inheritDoc} */
@@ -2204,7 +2363,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
if (space != null ? !space.equals(that.space) : that.space != null)
return false;
- return sql.equals(that.sql);
+ return isLocal == that.isLocal && sql.equals(that.sql);
}
/** {@inheritDoc} */
@@ -2212,8 +2371,9 @@ public class IgniteH2Indexing implements GridQueryIndexing {
int res = space != null ? space.hashCode() : 0;
res = 31 * res + sql.hashCode();
res = 31 * res + (grpByCollocated ? 1 : 0);
- res = 31 * res + (distributedJoins ? 1 : 0);
- res = 31 * res + (enforceJoinOrder ? 1 : 0);
+ res = res + (distributedJoins ? 2 : 0);
+ res = res + (enforceJoinOrder ? 4 : 0);
+ res = res + (isLocal ? 8 : 0);
return res;
}
@@ -2572,7 +2732,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
affCol = null;
// Add primary key index.
- idxs.add(new GridH2TreeIndex("_key_PK", tbl, true,
+ idxs.add(createTreeIndex("_key_PK", tbl, true,
treeIndexColumns(new ArrayList<IndexColumn>(2), keyCol, affCol)));
if (type().valueClass() == String.class) {
@@ -2618,7 +2778,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
cols = treeIndexColumns(cols, keyCol, affCol);
- idxs.add(new GridH2TreeIndex(name, tbl, false, cols));
+ idxs.add(createTreeIndex(name, tbl, false, cols));
}
else if (idx.type() == GEO_SPATIAL)
idxs.add(createH2SpatialIndex(tbl, name, cols.toArray(new IndexColumn[cols.size()])));
@@ -2629,7 +2789,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
// Add explicit affinity key index if nothing alike was found.
if (affCol != null && !affIdxFound) {
- idxs.add(new GridH2TreeIndex("AFFINITY_KEY", tbl, false,
+ idxs.add(createTreeIndex("AFFINITY_KEY", tbl, false,
treeIndexColumns(new ArrayList<IndexColumn>(2), affCol, keyCol)));
}
@@ -2676,6 +2836,22 @@ public class IgniteH2Indexing implements GridQueryIndexing {
throw new IgniteException("Failed to instantiate: " + className, e);
}
}
+
+ /**
+ * @param idxName Index name.
+ * @param tbl Table.
+ * @param pk Primary key flag.
+ * @param columns Index column list.
+ * @return
+ */
+ private Index createTreeIndex(String idxName, GridH2Table tbl, boolean pk, List<IndexColumn> columns) {
+ GridCacheContext<?, ?> cctx = tbl.rowDescriptor().context();
+
+ if (cctx != null && cctx.config().getQueryParallelism() > 1)
+ return new GridH2TreeIndex(idxName, tbl, pk, columns, cctx.config().getQueryParallelism());
+
+ return new GridH2TreeIndex(idxName, tbl, pk, columns, 1);
+ }
}
/**
@@ -2729,6 +2905,13 @@ public class IgniteH2Indexing implements GridQueryIndexing {
}
/**
+ * Closeable iterator.
+ */
+ private interface ClIter<X> extends AutoCloseable, Iterator<X> {
+ // No-op.
+ }
+
+ /**
* Field descriptor.
*/
static class SqlFieldMetadata implements GridQueryFieldMetadata {
http://git-wip-us.apache.org/repos/asf/ignite/blob/64ba13b0/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/DistributedJoinMode.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/DistributedJoinMode.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/DistributedJoinMode.java
new file mode 100644
index 0000000..cc06244
--- /dev/null
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/DistributedJoinMode.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.h2.opt;
+
+/**
+ * Defines set of distributed join modes.
+ */
+public enum DistributedJoinMode {
+ /**
+ * Distributed joins is disabled. Local joins will be performed instead.
+ */
+ OFF,
+
+ /**
+ * Distributed joins is enabled within local node only.
+ *
+ * NOTE: This mode is used with segmented indices for local sql queries.
+ * As in this case we need to make distributed join across local index segments
+ * and prevent range-queries to other nodes.
+ */
+ LOCAL_ONLY,
+
+ /**
+ * Distributed joins is enabled.
+ */
+ ON;
+
+ /**
+ * @param isLocal Query local flag.
+ * @param distributedJoins Query distributed joins flag.
+ * @return DistributedJoinMode for the query.
+ */
+ public static DistributedJoinMode distributedJoinMode(boolean isLocal, boolean distributedJoins) {
+ return distributedJoins ? (isLocal ? LOCAL_ONLY : ON) : OFF;
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/64ba13b0/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2IndexBase.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2IndexBase.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2IndexBase.java
index bab219c..131e03b 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2IndexBase.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2IndexBase.java
@@ -81,6 +81,8 @@ import org.jetbrains.annotations.Nullable;
import static java.util.Collections.emptyIterator;
import static java.util.Collections.singletonList;
+import static org.apache.ignite.internal.processors.query.h2.opt.DistributedJoinMode.LOCAL_ONLY;
+import static org.apache.ignite.internal.processors.query.h2.opt.DistributedJoinMode.OFF;
import static org.apache.ignite.internal.processors.query.h2.opt.GridH2AbstractKeyValueRow.KEY_COL;
import static org.apache.ignite.internal.processors.query.h2.opt.GridH2AbstractKeyValueRow.VAL_COL;
import static org.apache.ignite.internal.processors.query.h2.opt.GridH2CollocationModel.buildCollocationModel;
@@ -178,6 +180,13 @@ public abstract class GridH2IndexBase extends BaseIndex {
}
/**
+ * @return Index segment ID for current query context.
+ */
+ protected int threadLocalSegment() {
+ return 0;
+ }
+
+ /**
* If the index supports rebuilding it has to creates its own copy.
*
* @return Rebuilt copy.
@@ -252,7 +261,7 @@ public abstract class GridH2IndexBase extends BaseIndex {
// because on run stage reordering of joined tables by Optimizer is explicitly disabled
// and thus multiplier will be always the same, so it will not affect choice of index.
// Query expressions can not be distributed as well.
- if (qctx == null || qctx.type() != PREPARE || !qctx.distributedJoins() || ses.isPreparingQueryExpression())
+ if (qctx == null || qctx.type() != PREPARE || qctx.distributedJoinMode() == OFF || ses.isPreparingQueryExpression())
return GridH2CollocationModel.MULTIPLIER_COLLOCATED;
// We have to clear this cache because normally sub-query plan cost does not depend on anything
@@ -363,7 +372,7 @@ public abstract class GridH2IndexBase extends BaseIndex {
@Override public IndexLookupBatch createLookupBatch(TableFilter filter) {
GridH2QueryContext qctx = GridH2QueryContext.get();
- if (qctx == null || !qctx.distributedJoins() || !getTable().isPartitioned())
+ if (qctx == null || qctx.distributedJoinMode() == OFF || !getTable().isPartitioned())
return null;
IndexColumn affCol = getTable().getAffinityKeyColumn();
@@ -381,9 +390,11 @@ public abstract class GridH2IndexBase extends BaseIndex {
ucast = false;
}
- GridCacheContext<?,?> cctx = getTable().rowDescriptor().context();
+ GridCacheContext<?, ?> cctx = getTable().rowDescriptor().context();
- return new DistributedLookupBatch(cctx, ucast, affColId);
+ boolean isLocal = qctx.distributedJoinMode() == LOCAL_ONLY;
+
+ return new DistributedLookupBatch(cctx, ucast, affColId, isLocal);
}
/**
@@ -437,18 +448,18 @@ public abstract class GridH2IndexBase extends BaseIndex {
* @param node Requesting node.
* @param msg Request message.
*/
- private void onIndexRangeRequest(ClusterNode node, GridH2IndexRangeRequest msg) {
- GridH2QueryContext qctx = GridH2QueryContext.get(kernalContext().localNodeId(),
- msg.originNodeId(),
- msg.queryId(),
- MAP);
-
+ private void onIndexRangeRequest(final ClusterNode node, final GridH2IndexRangeRequest msg) {
GridH2IndexRangeResponse res = new GridH2IndexRangeResponse();
res.originNodeId(msg.originNodeId());
res.queryId(msg.queryId());
+ res.originSegmentId(msg.originSegmentId());
+ res.segment(msg.segment());
res.batchLookupId(msg.batchLookupId());
+ GridH2QueryContext qctx = GridH2QueryContext.get(kernalContext().localNodeId(), msg.originNodeId(),
+ msg.queryId(), msg.originSegmentId(), MAP);
+
if (qctx == null)
res.status(STATUS_NOT_FOUND);
else {
@@ -461,11 +472,11 @@ public abstract class GridH2IndexBase extends BaseIndex {
assert !msg.bounds().isEmpty() : "empty bounds";
- src = new RangeSource(msg.bounds(), snapshot0, qctx.filter());
+ src = new RangeSource(msg.bounds(), msg.segment(), snapshot0, qctx.filter());
}
else {
// This is request to fetch next portion of data.
- src = qctx.getSource(node.id(), msg.batchLookupId());
+ src = qctx.getSource(node.id(), msg.segment(), msg.batchLookupId());
assert src != null;
}
@@ -491,11 +502,11 @@ public abstract class GridH2IndexBase extends BaseIndex {
if (src.hasMoreRows()) {
// Save source for future fetches.
if (msg.bounds() != null)
- qctx.putSource(node.id(), msg.batchLookupId(), src);
+ qctx.putSource(node.id(), msg.segment(), msg.batchLookupId(), src);
}
else if (msg.bounds() == null) {
// Drop saved source.
- qctx.putSource(node.id(), msg.batchLookupId(), null);
+ qctx.putSource(node.id(), msg.segment(), msg.batchLookupId(), null);
}
assert !ranges.isEmpty();
@@ -520,17 +531,17 @@ public abstract class GridH2IndexBase extends BaseIndex {
*/
private void onIndexRangeResponse(ClusterNode node, GridH2IndexRangeResponse msg) {
GridH2QueryContext qctx = GridH2QueryContext.get(kernalContext().localNodeId(),
- msg.originNodeId(), msg.queryId(), MAP);
+ msg.originNodeId(), msg.queryId(), msg.originSegmentId(), MAP);
if (qctx == null)
return;
- Map<ClusterNode, RangeStream> streams = qctx.getStreams(msg.batchLookupId());
+ Map<SegmentKey, RangeStream> streams = qctx.getStreams(msg.batchLookupId());
if (streams == null)
return;
- RangeStream stream = streams.get(node);
+ RangeStream stream = streams.get(new SegmentKey(node, msg.segment()));
assert stream != null;
@@ -549,47 +560,69 @@ public abstract class GridH2IndexBase extends BaseIndex {
/**
* @param qctx Query context.
* @param batchLookupId Batch lookup ID.
+ * @param segmentId Segment ID.
* @return Index range request.
*/
- private static GridH2IndexRangeRequest createRequest(GridH2QueryContext qctx, int batchLookupId) {
+ private static GridH2IndexRangeRequest createRequest(GridH2QueryContext qctx, int batchLookupId, int segmentId) {
GridH2IndexRangeRequest req = new GridH2IndexRangeRequest();
req.originNodeId(qctx.originNodeId());
req.queryId(qctx.queryId());
+ req.originSegmentId(qctx.segment());
+ req.segment(segmentId);
req.batchLookupId(batchLookupId);
return req;
}
+
/**
* @param qctx Query context.
* @param cctx Cache context.
+ * @param isLocalQry Local query flag.
* @return Collection of nodes for broadcasting.
*/
- private List<ClusterNode> broadcastNodes(GridH2QueryContext qctx, GridCacheContext<?,?> cctx) {
+ private List<SegmentKey> broadcastSegments(GridH2QueryContext qctx, GridCacheContext<?, ?> cctx, boolean isLocalQry) {
Map<UUID, int[]> partMap = qctx.partitionsMap();
- List<ClusterNode> res;
+ List<ClusterNode> nodes;
+
+ if (isLocalQry) {
+ if (partMap != null && !partMap.containsKey(cctx.localNodeId()))
+ return Collections.<SegmentKey>emptyList(); // Prevent remote index call for local queries.
- if (partMap == null)
- res = new ArrayList<>(CU.affinityNodes(cctx, qctx.topologyVersion()));
+ nodes = Collections.singletonList(cctx.localNode());
+ }
else {
- res = new ArrayList<>(partMap.size());
+ if (partMap == null)
+ nodes = new ArrayList<>(CU.affinityNodes(cctx, qctx.topologyVersion()));
+ else {
+ nodes = new ArrayList<>(partMap.size());
- GridKernalContext ctx = kernalContext();
+ GridKernalContext ctx = kernalContext();
- for (UUID nodeId : partMap.keySet()) {
- ClusterNode node = ctx.discovery().node(nodeId);
+ for (UUID nodeId : partMap.keySet()) {
+ ClusterNode node = ctx.discovery().node(nodeId);
- if (node == null)
- throw new GridH2RetryException("Failed to find node.");
+ if (node == null)
+ throw new GridH2RetryException("Failed to find node.");
- res.add(node);
+ nodes.add(node);
+ }
}
+
+ if (F.isEmpty(nodes))
+ throw new GridH2RetryException("Failed to collect affinity nodes.");
}
- if (F.isEmpty(res))
- throw new GridH2RetryException("Failed to collect affinity nodes.");
+ int segmentsCount = segmentsCount();
+
+ List<SegmentKey> res = new ArrayList<>(nodes.size() * segmentsCount);
+
+ for (ClusterNode node : nodes) {
+ for (int seg = 0; seg < segmentsCount; seg++)
+ res.add(new SegmentKey(node, seg));
+ }
return res;
}
@@ -598,26 +631,81 @@ public abstract class GridH2IndexBase extends BaseIndex {
* @param cctx Cache context.
* @param qctx Query context.
* @param affKeyObj Affinity key.
- * @return Cluster nodes or {@code null} if affinity key is a null value.
+ * @param isLocalQry Local query flag.
+ * @return Segment key for Affinity key.
*/
- private ClusterNode rangeNode(GridCacheContext<?,?> cctx, GridH2QueryContext qctx, Object affKeyObj) {
+ private SegmentKey rangeSegment(GridCacheContext<?, ?> cctx, GridH2QueryContext qctx, Object affKeyObj, boolean isLocalQry) {
assert affKeyObj != null && affKeyObj != EXPLICIT_NULL : affKeyObj;
ClusterNode node;
- if (qctx.partitionsMap() != null) {
- // If we have explicit partitions map, we have to use it to calculate affinity node.
- UUID nodeId = qctx.nodeForPartition(cctx.affinity().partition(affKeyObj), cctx);
+ int partition = cctx.affinity().partition(affKeyObj);
+
+ if (isLocalQry) {
+ if (qctx.partitionsMap() != null) {
+ // If we have explicit partitions map, we have to use it to calculate affinity node.
+ UUID nodeId = qctx.nodeForPartition(partition, cctx);
+
+ if(!cctx.localNodeId().equals(nodeId))
+ return null; // Prevent remote index call for local queries.
+ }
+
+ if (!cctx.affinity().primaryByKey(cctx.localNode(), partition, qctx.topologyVersion()))
+ return null;
+
+ node = cctx.localNode();
+ }
+ else{
+ if (qctx.partitionsMap() != null) {
+ // If we have explicit partitions map, we have to use it to calculate affinity node.
+ UUID nodeId = qctx.nodeForPartition(partition, cctx);
node = cctx.discovery().node(nodeId);
}
else // Get primary node for current topology version.
node = cctx.affinity().primaryByKey(affKeyObj, qctx.topologyVersion());
- if (node == null) // Node was not found, probably topology changed and we need to retry the whole query.
- throw new GridH2RetryException("Failed to find node.");
+ if (node == null) // Node was not found, probably topology changed and we need to retry the whole query.
+ throw new GridH2RetryException("Failed to find node.");
+ }
+
+ return new SegmentKey(node, segment(partition));
+ }
+
+ /** */
+ protected class SegmentKey {
+ /** */
+ final ClusterNode node;
+
+ /** */
+ final int segmentId;
+
+ SegmentKey(ClusterNode node, int segmentId) {
+ assert node != null;
+
+ this.node = node;
+ this.segmentId = segmentId;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean equals(Object o) {
+ if (this == o)
+ return true;
+ if (o == null || getClass() != o.getClass())
+ return false;
+
+ SegmentKey key = (SegmentKey)o;
+
+ return segmentId == key.segmentId && node.id().equals(key.node.id());
- return node;
+ }
+
+ /** {@inheritDoc} */
+ @Override public int hashCode() {
+ int result = node.hashCode();
+ result = 31 * result + segmentId;
+ return result;
+ }
}
/**
@@ -740,6 +828,20 @@ public abstract class GridH2IndexBase extends BaseIndex {
return database.createRow(vals0, MEMORY_CALCULATE);
}
+ /** @return Index segments count. */
+ protected int segmentsCount() {
+ return 1;
+ }
+
+ /**
+ * @param partition Partition idx.
+ * @return Segment ID for given key
+ */
+ protected int segment(int partition) {
+ return 0;
+ }
+
+
/**
* Simple cursor from a single node.
*/
@@ -752,14 +854,14 @@ public abstract class GridH2IndexBase extends BaseIndex {
/**
* @param rangeId Range ID.
- * @param nodes Remote nodes.
+ * @param keys Remote index segment keys.
* @param rangeStreams Range streams.
*/
- private UnicastCursor(int rangeId, Collection<ClusterNode> nodes, Map<ClusterNode,RangeStream> rangeStreams) {
- assert nodes.size() == 1;
+ UnicastCursor(int rangeId, List<SegmentKey> keys, Map<SegmentKey, RangeStream> rangeStreams) {
+ assert keys.size() == 1;
this.rangeId = rangeId;
- this.stream = rangeStreams.get(F.first(nodes));
+ this.stream = rangeStreams.get(F.first(keys));
assert stream != null;
}
@@ -803,20 +905,19 @@ public abstract class GridH2IndexBase extends BaseIndex {
/**
* @param rangeId Range ID.
- * @param nodes Remote nodes.
+ * @param segmentKeys Remote nodes.
* @param rangeStreams Range streams.
*/
- private BroadcastCursor(int rangeId, Collection<ClusterNode> nodes, Map<ClusterNode,RangeStream> rangeStreams) {
- assert nodes.size() > 1;
+ BroadcastCursor(int rangeId, Collection<SegmentKey> segmentKeys, Map<SegmentKey, RangeStream> rangeStreams) {
this.rangeId = rangeId;
- streams = new RangeStream[nodes.size()];
+ streams = new RangeStream[segmentKeys.size()];
int i = 0;
- for (ClusterNode node : nodes) {
- RangeStream stream = rangeStreams.get(node);
+ for (SegmentKey segmentKey : segmentKeys) {
+ RangeStream stream = rangeStreams.get(segmentKey);
assert stream != null;
@@ -928,16 +1029,19 @@ public abstract class GridH2IndexBase extends BaseIndex {
final int affColId;
/** */
+ private final boolean localQuery;
+
+ /** */
GridH2QueryContext qctx;
/** */
int batchLookupId;
/** */
- Map<ClusterNode, RangeStream> rangeStreams = Collections.emptyMap();
+ Map<SegmentKey, RangeStream> rangeStreams = Collections.emptyMap();
/** */
- List<ClusterNode> broadcastNodes;
+ List<SegmentKey> broadcastSegments;
/** */
List<Future<Cursor>> res = Collections.emptyList();
@@ -952,11 +1056,13 @@ public abstract class GridH2IndexBase extends BaseIndex {
* @param cctx Cache Cache context.
* @param ucast Unicast or broadcast query.
* @param affColId Affinity column ID.
+ * @param localQuery Local query flag.
*/
- private DistributedLookupBatch(GridCacheContext<?,?> cctx, boolean ucast, int affColId) {
+ DistributedLookupBatch(GridCacheContext<?, ?> cctx, boolean ucast, int affColId, boolean localQuery) {
this.cctx = cctx;
this.ucast = ucast;
this.affColId = affColId;
+ this.localQuery = localQuery;
}
/**
@@ -1028,7 +1134,7 @@ public abstract class GridH2IndexBase extends BaseIndex {
Object affKey = affColId == -1 ? null : getAffinityKey(firstRow, lastRow);
- List<ClusterNode> nodes;
+ List<SegmentKey> segmentKeys;
Future<Cursor> fut;
if (affKey != null) {
@@ -1036,17 +1142,20 @@ public abstract class GridH2IndexBase extends BaseIndex {
if (affKey == EXPLICIT_NULL) // Affinity key is explicit null, we will not find anything.
return false;
- nodes = F.asList(rangeNode(cctx, qctx, affKey));
+ segmentKeys = F.asList(rangeSegment(cctx, qctx, affKey, localQuery));
}
else {
// Affinity key is not provided or is not the same in upper and lower bounds, we have to broadcast.
- if (broadcastNodes == null)
- broadcastNodes = broadcastNodes(qctx, cctx);
+ if (broadcastSegments == null)
+ broadcastSegments = broadcastSegments(qctx, cctx, localQuery);
- nodes = broadcastNodes;
+ segmentKeys = broadcastSegments;
}
- assert !F.isEmpty(nodes) : nodes;
+ if (localQuery && segmentKeys.isEmpty())
+ return false; // Nothing to do
+
+ assert !F.isEmpty(segmentKeys) : segmentKeys;
final int rangeId = res.size();
@@ -1058,21 +1167,21 @@ public abstract class GridH2IndexBase extends BaseIndex {
GridH2RowRangeBounds rangeBounds = rangeBounds(rangeId, first, last);
// Add range to every message of every participating node.
- for (int i = 0; i < nodes.size(); i++) {
- ClusterNode node = nodes.get(i);
- assert node != null;
+ for (int i = 0; i < segmentKeys.size(); i++) {
+ SegmentKey segmentKey = segmentKeys.get(i);
+ assert segmentKey != null;
- RangeStream stream = rangeStreams.get(node);
+ RangeStream stream = rangeStreams.get(segmentKey);
List<GridH2RowRangeBounds> bounds;
if (stream == null) {
- stream = new RangeStream(qctx, node);
+ stream = new RangeStream(qctx, segmentKey.node);
- stream.req = createRequest(qctx, batchLookupId);
+ stream.req = createRequest(qctx, batchLookupId, segmentKey.segmentId);
stream.req.bounds(bounds = new ArrayList<>());
- rangeStreams.put(node, stream);
+ rangeStreams.put(segmentKey, stream);
}
else
bounds = stream.req.bounds();
@@ -1084,9 +1193,9 @@ public abstract class GridH2IndexBase extends BaseIndex {
batchFull = true;
}
- fut = new DoneFuture<>(nodes.size() == 1 ?
- new UnicastCursor(rangeId, nodes, rangeStreams) :
- new BroadcastCursor(rangeId, nodes, rangeStreams));
+ fut = new DoneFuture<>(segmentKeys.size() == 1 ?
+ new UnicastCursor(rangeId, segmentKeys, rangeStreams) :
+ new BroadcastCursor(rangeId, segmentKeys, rangeStreams));
res.add(fut);
@@ -1138,7 +1247,7 @@ public abstract class GridH2IndexBase extends BaseIndex {
batchLookupId = 0;
rangeStreams = Collections.emptyMap();
- broadcastNodes = null;
+ broadcastSegments = null;
batchFull = false;
findCalled = false;
res = Collections.emptyList();
@@ -1244,7 +1353,7 @@ public abstract class GridH2IndexBase extends BaseIndex {
if (remainingRanges > 0) {
if (req.bounds() != null)
- req = createRequest(qctx, req.batchLookupId());
+ req = createRequest(qctx, req.batchLookupId(), req.segment());
// Prefetch next page.
send(singletonList(node), req);
@@ -1366,6 +1475,9 @@ public abstract class GridH2IndexBase extends BaseIndex {
final ConcurrentNavigableMap<GridSearchRowPointer, GridH2Row> tree;
/** */
+ private final int segment;
+
+ /** */
final IndexingQueryFilter filter;
/**
@@ -1375,9 +1487,11 @@ public abstract class GridH2IndexBase extends BaseIndex {
*/
RangeSource(
Iterable<GridH2RowRangeBounds> bounds,
+ int segment,
ConcurrentNavigableMap<GridSearchRowPointer, GridH2Row> tree,
IndexingQueryFilter filter
) {
+ this.segment = segment;
this.filter = filter;
this.tree = tree;
boundsIter = bounds.iterator();
@@ -1435,7 +1549,7 @@ public abstract class GridH2IndexBase extends BaseIndex {
SearchRow first = toSearchRow(bounds.first());
SearchRow last = toSearchRow(bounds.last());
- ConcurrentNavigableMap<GridSearchRowPointer,GridH2Row> t = tree != null ? tree : treeForRead();
+ ConcurrentNavigableMap<GridSearchRowPointer, GridH2Row> t = tree != null ? tree : treeForRead(segment);
curRange = doFind0(t, first, true, last, filter);
@@ -1452,9 +1566,10 @@ public abstract class GridH2IndexBase extends BaseIndex {
}
/**
- * @return Snapshot for current thread if there is one.
+ * @param segment Segment Id.
+ * @return Snapshot for requested segment if there is one.
*/
- protected ConcurrentNavigableMap<GridSearchRowPointer, GridH2Row> treeForRead() {
+ protected ConcurrentNavigableMap<GridSearchRowPointer, GridH2Row> treeForRead(int segment) {
throw new UnsupportedOperationException();
}
@@ -1505,7 +1620,8 @@ public abstract class GridH2IndexBase extends BaseIndex {
this.fltr = qryFilter.forSpace(spaceName);
this.isValRequired = qryFilter.isValueRequired();
- } else {
+ }
+ else {
this.fltr = null;
this.isValRequired = false;
http://git-wip-us.apache.org/repos/asf/ignite/blob/64ba13b0/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2QueryContext.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2QueryContext.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2QueryContext.java
index 19ea2b2..a7ee0dc 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2QueryContext.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2QueryContext.java
@@ -32,6 +32,7 @@ import org.apache.ignite.spi.indexing.IndexingQueryFilter;
import org.jetbrains.annotations.Nullable;
import org.jsr166.ConcurrentHashMap8;
+import static org.apache.ignite.internal.processors.query.h2.opt.DistributedJoinMode.OFF;
import static org.apache.ignite.internal.processors.query.h2.opt.GridH2QueryType.MAP;
/**
@@ -79,7 +80,7 @@ public class GridH2QueryContext {
private UUID[] partsNodes;
/** */
- private boolean distributedJoins;
+ private DistributedJoinMode distributedJoinMode;
/** */
private int pageSize;
@@ -94,7 +95,22 @@ public class GridH2QueryContext {
* @param type Query type.
*/
public GridH2QueryContext(UUID locNodeId, UUID nodeId, long qryId, GridH2QueryType type) {
- key = new Key(locNodeId, nodeId, qryId, type);
+ assert type != MAP;
+
+ key = new Key(locNodeId, nodeId, qryId, 0, type);
+ }
+
+ /**
+ * @param locNodeId Local node ID.
+ * @param nodeId The node who initiated the query.
+ * @param qryId The query ID.
+ * @param segmentId Index segment ID.
+ * @param type Query type.
+ */
+ public GridH2QueryContext(UUID locNodeId, UUID nodeId, long qryId, int segmentId, GridH2QueryType type) {
+ assert segmentId == 0 || type == MAP;
+
+ key = new Key(locNodeId, nodeId, qryId, segmentId, type);
}
/**
@@ -133,20 +149,20 @@ public class GridH2QueryContext {
}
/**
- * @param distributedJoins Distributed joins can be run in this query.
+ * @param distributedJoinMode Distributed join mode.
* @return {@code this}.
*/
- public GridH2QueryContext distributedJoins(boolean distributedJoins) {
- this.distributedJoins = distributedJoins;
+ public GridH2QueryContext distributedJoinMode(DistributedJoinMode distributedJoinMode) {
+ this.distributedJoinMode = distributedJoinMode;
return this;
}
/**
- * @return {@code true} If distributed joins can be run in this query.
+ * @return Distributed join mode.
*/
- public boolean distributedJoins() {
- return distributedJoins;
+ public DistributedJoinMode distributedJoinMode() {
+ return distributedJoinMode;
}
/**
@@ -226,6 +242,11 @@ public class GridH2QueryContext {
return nodeIds[p];
}
+ /** @return index segment ID. */
+ public int segment() {
+ return key.segmentId;
+ }
+
/**
* @param idxId Index ID.
* @param snapshot Index snapshot.
@@ -303,11 +324,12 @@ public class GridH2QueryContext {
/**
* @param ownerId Owner node ID.
+ * @param segmentId Index segment ID.
* @param batchLookupId Batch lookup ID.
* @param src Range source.
*/
- public synchronized void putSource(UUID ownerId, int batchLookupId, Object src) {
- SourceKey srcKey = new SourceKey(ownerId, batchLookupId);
+ public synchronized void putSource(UUID ownerId, int segmentId, int batchLookupId, Object src) {
+ SourceKey srcKey = new SourceKey(ownerId, segmentId, batchLookupId);
if (src != null) {
if (sources == null)
@@ -321,15 +343,16 @@ public class GridH2QueryContext {
/**
* @param ownerId Owner node ID.
+ * @param segmentId Index segment ID.
* @param batchLookupId Batch lookup ID.
* @return Range source.
*/
@SuppressWarnings("unchecked")
- public synchronized <T> T getSource(UUID ownerId, int batchLookupId) {
+ public synchronized <T> T getSource(UUID ownerId, int segmentId, int batchLookupId) {
if (sources == null)
return null;
- return (T)sources.get(new SourceKey(ownerId, batchLookupId));
+ return (T)sources.get(new SourceKey(ownerId, segmentId, batchLookupId));
}
/**
@@ -356,7 +379,7 @@ public class GridH2QueryContext {
assert qctx.get() == null;
// We need MAP query context to be available to other threads to run distributed joins.
- if (x.key.type == MAP && x.distributedJoins() && qctxs.putIfAbsent(x.key, x) != null)
+ if (x.key.type == MAP && x.distributedJoinMode() != OFF && qctxs.putIfAbsent(x.key, x) != null)
throw new IllegalStateException("Query context is already set.");
qctx.set(x);
@@ -381,7 +404,14 @@ public class GridH2QueryContext {
* @return {@code True} if context was found.
*/
public static boolean clear(UUID locNodeId, UUID nodeId, long qryId, GridH2QueryType type) {
- return doClear(new Key(locNodeId, nodeId, qryId, type), false);
+ boolean res = false;
+
+ for (Key key : qctxs.keySet()) {
+ if (key.locNodeId.equals(locNodeId) && key.nodeId.equals(nodeId) && key.qryId == qryId && key.type == type)
+ res |= doClear(new Key(locNodeId, nodeId, qryId, key.segmentId, type), false);
+ }
+
+ return res;
}
/**
@@ -463,6 +493,7 @@ public class GridH2QueryContext {
* @param locNodeId Local node ID.
* @param nodeId The node who initiated the query.
* @param qryId The query ID.
+ * @param segmentId Index segment ID.
* @param type Query type.
* @return Query context.
*/
@@ -470,9 +501,10 @@ public class GridH2QueryContext {
UUID locNodeId,
UUID nodeId,
long qryId,
+ int segmentId,
GridH2QueryType type
) {
- return qctxs.get(new Key(locNodeId, nodeId, qryId, type));
+ return qctxs.get(new Key(locNodeId, nodeId, qryId, segmentId, type));
}
/**
@@ -528,15 +560,19 @@ public class GridH2QueryContext {
private final long qryId;
/** */
+ private final int segmentId;
+
+ /** */
private final GridH2QueryType type;
/**
* @param locNodeId Local node ID.
* @param nodeId The node who initiated the query.
* @param qryId The query ID.
+ * @param segmentId Index segment ID.
* @param type Query type.
*/
- private Key(UUID locNodeId, UUID nodeId, long qryId, GridH2QueryType type) {
+ private Key(UUID locNodeId, UUID nodeId, long qryId, int segmentId, GridH2QueryType type) {
assert locNodeId != null;
assert nodeId != null;
assert type != null;
@@ -544,6 +580,7 @@ public class GridH2QueryContext {
this.locNodeId = locNodeId;
this.nodeId = nodeId;
this.qryId = qryId;
+ this.segmentId = segmentId;
this.type = type;
}
@@ -568,6 +605,7 @@ public class GridH2QueryContext {
res = 31 * res + nodeId.hashCode();
res = 31 * res + (int)(qryId ^ (qryId >>> 32));
res = 31 * res + type.hashCode();
+ res = 31 * res + segmentId;
return res;
}
@@ -586,14 +624,19 @@ public class GridH2QueryContext {
UUID ownerId;
/** */
+ int segmentId;
+
+ /** */
int batchLookupId;
/**
* @param ownerId Owner node ID.
+ * @param segmentId Index segment ID.
* @param batchLookupId Batch lookup ID.
*/
- SourceKey(UUID ownerId, int batchLookupId) {
+ SourceKey(UUID ownerId, int segmentId, int batchLookupId) {
this.ownerId = ownerId;
+ this.segmentId = segmentId;
this.batchLookupId = batchLookupId;
}
@@ -601,12 +644,15 @@ public class GridH2QueryContext {
@Override public boolean equals(Object o) {
SourceKey srcKey = (SourceKey)o;
- return batchLookupId == srcKey.batchLookupId && ownerId.equals(srcKey.ownerId);
+ return batchLookupId == srcKey.batchLookupId && segmentId == srcKey.segmentId &&
+ ownerId.equals(srcKey.ownerId);
}
/** {@inheritDoc} */
@Override public int hashCode() {
- return 31 * ownerId.hashCode() + batchLookupId;
+ int hash = ownerId.hashCode();
+ hash = 31 * hash + segmentId;
+ return 31 * hash + batchLookupId;
}
}
}