You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vo...@apache.org on 2016/11/09 08:38:56 UTC
[42/50] [abbrv] ignite git commit: Merge branch 'ignite-1.6.10' into
ignite-1.7.3
Merge branch 'ignite-1.6.10' into ignite-1.7.3
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/5fac786b
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/5fac786b
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/5fac786b
Branch: refs/heads/master
Commit: 5fac786b6dbb179127ac725180acb54d0f6f4b0a
Parents: d0f4b23 a863eee
Author: devozerov <vo...@gridgain.com>
Authored: Mon Oct 31 21:31:05 2016 +0300
Committer: thatcoach <pp...@list.ru>
Committed: Mon Oct 31 21:58:20 2016 +0300
----------------------------------------------------------------------
.../java/org/apache/ignite/IgniteScheduler.java | 13 +
.../cache/query/QueryCancelledException.java | 35 +++
.../apache/ignite/cache/query/QueryCursor.java | 8 +-
.../ignite/cache/query/SqlFieldsQuery.java | 26 ++
.../org/apache/ignite/cache/query/SqlQuery.java | 25 ++
.../ignite/internal/IgniteSchedulerImpl.java | 18 ++
.../processors/cache/QueryCursorImpl.java | 92 +++++--
.../closure/GridClosureProcessor.java | 1 +
.../processors/query/GridQueryCancel.java | 84 +++++++
.../processors/query/GridQueryFieldsResult.java | 3 +-
.../query/GridQueryFieldsResultAdapter.java | 3 +-
.../processors/query/GridQueryIndexing.java | 11 +-
.../processors/query/GridQueryProcessor.java | 105 ++++++--
.../twostep/messages/GridQueryFailResponse.java | 34 ++-
.../h2/twostep/messages/GridQueryRequest.java | 31 ++-
.../junits/GridTestKernalContext.java | 1 -
.../processors/query/h2/IgniteH2Indexing.java | 160 +++++++++---
.../query/h2/twostep/GridMapQueryExecutor.java | 66 +++--
.../h2/twostep/GridReduceQueryExecutor.java | 117 ++++++---
...niteCacheDistributedQueryCancelSelfTest.java | 176 +++++++++++++
...butedQueryStopOnCancelOrTimeoutSelfTest.java | 248 +++++++++++++++++++
...cheQueryAbstractDistributedJoinSelfTest.java | 7 +
.../IgniteCacheQueryNodeRestartSelfTest2.java | 125 ++++++----
...nCancelOrTimeoutDistributedJoinSelfTest.java | 7 +
...eCacheLocalQueryCancelOrTimeoutSelfTest.java | 158 ++++++++++++
.../h2/GridIndexingSpiAbstractSelfTest.java | 4 +-
.../IgniteCacheQuerySelfTestSuite2.java | 8 +
27 files changed, 1388 insertions(+), 178 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/5fac786b/modules/core/src/main/java/org/apache/ignite/cache/query/SqlFieldsQuery.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/cache/query/SqlFieldsQuery.java
index 48dab6b,d1a5117..d3f85af
--- a/modules/core/src/main/java/org/apache/ignite/cache/query/SqlFieldsQuery.java
+++ b/modules/core/src/main/java/org/apache/ignite/cache/query/SqlFieldsQuery.java
@@@ -56,12 -58,9 +58,15 @@@ public final class SqlFieldsQuery exten
/** Collocation flag. */
private boolean collocated;
+ /** Query timeout in millis. */
+ private int timeout;
+
+ /** */
+ private boolean enforceJoinOrder;
+
+ /** */
+ private boolean distributedJoins;
+
/**
* Constructs SQL fields query.
*
http://git-wip-us.apache.org/repos/asf/ignite/blob/5fac786b/modules/core/src/main/java/org/apache/ignite/cache/query/SqlQuery.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/cache/query/SqlQuery.java
index f809b8d,51c6cb5..83e171d
--- a/modules/core/src/main/java/org/apache/ignite/cache/query/SqlQuery.java
+++ b/modules/core/src/main/java/org/apache/ignite/cache/query/SqlQuery.java
@@@ -43,9 -44,9 +44,12 @@@ public final class SqlQuery<K, V> exten
@GridToStringInclude
private Object[] args;
+ /** Timeout in millis. */
+ private int timeout;
+
+ /** */
+ private boolean distributedJoins;
+
/**
* Constructs query for the given type name and SQL query.
*
http://git-wip-us.apache.org/repos/asf/ignite/blob/5fac786b/modules/core/src/main/java/org/apache/ignite/internal/processors/closure/GridClosureProcessor.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/5fac786b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java
index 643cb8c,b1b3c68..6bffa5d
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java
@@@ -80,13 -80,15 +80,15 @@@ public interface GridQueryIndexing
* @param spaceName Space name.
* @param qry Query.
* @param params Query parameters.
- * @param filters Space name and key filters.
+ * @param filter Space name and key filter.
+ * @param enforceJoinOrder Enforce join order of tables in the query.
+ * @param timeout Query timeout in milliseconds.
+ * @param cancel Query cancel.
* @return Query result.
* @throws IgniteCheckedException If failed.
*/
- public GridQueryFieldsResult execute(@Nullable String spaceName, String qry,
- Collection<Object> params, IndexingQueryFilter filters, int timeout, GridQueryCancel cancel)
- throws IgniteCheckedException;
+ public GridQueryFieldsResult queryLocalSqlFields(@Nullable String spaceName, String qry,
- Collection<Object> params, IndexingQueryFilter filter, boolean enforceJoinOrder) throws IgniteCheckedException;
++ Collection<Object> params, IndexingQueryFilter filter, boolean enforceJoinOrder, int timeout, GridQueryCancel cancel) throws IgniteCheckedException;
/**
* Executes regular query.
http://git-wip-us.apache.org/repos/asf/ignite/blob/5fac786b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
index 12cf962,3d185c6..27c0b71
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
@@@ -17,6 -17,6 +17,7 @@@
package org.apache.ignite.internal.processors.query;
++import java.util.concurrent.TimeUnit;
import java.lang.reflect.AccessibleObject;
import java.lang.reflect.Field;
import java.lang.reflect.Member;
@@@ -719,6 -740,43 +734,43 @@@ public class GridQueryProcessor extend
}
/**
+ * @param space Space.
+ * @param clause Clause.
+ * @param params Parameters collection.
+ * @param resType Result type.
+ * @param filters Filters.
+ * @return Key/value rows.
+ * @throws IgniteCheckedException If failed.
+ */
+ @SuppressWarnings("unchecked")
+ public <K, V> GridCloseableIterator<IgniteBiTuple<K, V>> query(final String space, final String clause,
+ final Collection<Object> params, final String resType, final IndexingQueryFilter filters)
+ throws IgniteCheckedException {
+ checkEnabled();
+
+ if (!busyLock.enterBusy())
+ throw new IllegalStateException("Failed to execute query (grid is stopping).");
+
+ try {
+ final GridCacheContext<?, ?> cctx = ctx.cache().internalCache(space).context();
+
+ return executeQuery(cctx, new IgniteOutClosureX<GridCloseableIterator<IgniteBiTuple<K, V>>>() {
+ @Override public GridCloseableIterator<IgniteBiTuple<K, V>> applyx() throws IgniteCheckedException {
+ TypeDescriptor type = typesByName.get(new TypeName(space, resType));
+
+ if (type == null || !type.registered())
+ throw new CacheException("Failed to find SQL table for type: " + resType);
+
- return idx.query(space, clause, params, type, filters);
++ return idx.queryLocalSql(space, clause, params, type, filters);
+ }
+ }, false);
+ }
+ finally {
+ busyLock.leaveBusy();
+ }
+ }
+
+ /**
* @param cctx Cache context.
* @param qry Query.
* @return Cursor.
@@@ -868,6 -925,24 +920,24 @@@
}
/**
+ * @param timeout Timeout.
+ * @param timeUnit Time unit.
+ * @return Converted time.
+ */
+ public static int validateTimeout(int timeout, TimeUnit timeUnit) {
+ A.ensure(timeUnit != TimeUnit.MICROSECONDS && timeUnit != TimeUnit.NANOSECONDS,
- "timeUnit minimal resolution is millisecond.");
++ "timeUnit minimal resolution is millisecond.");
+
+ A.ensure(timeout >= 0, "timeout value should be non-negative.");
+
+ long tmp = TimeUnit.MILLISECONDS.convert(timeout, timeUnit);
+
+ A.ensure(timeout <= Integer.MAX_VALUE, "timeout value too large.");
+
+ return (int) tmp;
+ }
+
+ /**
* Closeable iterator.
*/
private interface ClIter<X> extends AutoCloseable, Iterator<X> {
@@@ -888,14 -963,13 +958,13 @@@
return executeQuery(cctx, new IgniteOutClosureX<QueryCursor<List<?>>>() {
@Override public QueryCursor<List<?>> applyx() throws IgniteCheckedException {
- String space = cctx.name();
- String sql = qry.getSql();
- Object[] args = qry.getArgs();
+ final String space = cctx.name();
+ final String sql = qry.getSql();
+ final Object[] args = qry.getArgs();
+ final GridQueryCancel cancel = new GridQueryCancel();
- final GridQueryFieldsResult res = idx.execute(space, sql, F.asList(args),
- idx.backupFilter(null, requestTopVer.get(), null), qry.getTimeout(), cancel);
+ final GridQueryFieldsResult res = idx.queryLocalSqlFields(space, sql, F.asList(args),
- idx.backupFilter(requestTopVer.get(), null), qry.isEnforceJoinOrder());
-
- sendQueryExecutedEvent(sql, args);
++ idx.backupFilter(requestTopVer.get(), null), qry.isEnforceJoinOrder(), qry.getTimeout(), cancel);
QueryCursorImpl<List<?>> cursor = new QueryCursorImpl<>(new Iterable<List<?>>() {
@Override public Iterator<List<?>> iterator() {
@@@ -2412,4 -2501,4 +2487,4 @@@
private enum IndexType {
ASC, DESC, TEXT
}
--}
++}
http://git-wip-us.apache.org/repos/asf/ignite/blob/5fac786b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryRequest.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryRequest.java
index f7de86c,550cf9b..6e42f1c
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryRequest.java
@@@ -167,9 -169,16 +173,16 @@@ public class GridQueryRequest implement
}
/**
+ * @return Timeout.
+ */
+ public int timeout() {
+ return this.timeout;
+ }
+
+ /**
* @return Queries.
*/
- public Collection<GridCacheSqlQuery> queries() throws IgniteCheckedException {
+ public Collection<GridCacheSqlQuery> queries() {
return qrys;
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/5fac786b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
----------------------------------------------------------------------
diff --cc modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
index 331b6f9,ab332c1..ed42bc6
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
@@@ -75,8 -76,8 +77,9 @@@ import org.apache.ignite.internal.proce
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.IgniteInternalCache;
import org.apache.ignite.internal.processors.cache.QueryCursorImpl;
+import org.apache.ignite.internal.processors.cache.query.GridCacheQueryMarshallable;
import org.apache.ignite.internal.processors.cache.query.GridCacheTwoStepQuery;
+ import org.apache.ignite.internal.processors.query.GridQueryCancel;
import org.apache.ignite.internal.processors.query.GridQueryFieldMetadata;
import org.apache.ignite.internal.processors.query.GridQueryFieldsResult;
import org.apache.ignite.internal.processors.query.GridQueryFieldsResultAdapter;
@@@ -132,9 -128,9 +135,10 @@@ import org.h2.index.Index
import org.h2.index.SpatialIndex;
import org.h2.jdbc.JdbcConnection;
import org.h2.jdbc.JdbcPreparedStatement;
+ import org.h2.jdbc.JdbcStatement;
import org.h2.message.DbException;
import org.h2.mvstore.cache.CacheLongKeyLIRS;
+import org.h2.result.SortOrder;
import org.h2.server.web.WebServer;
import org.h2.table.Column;
import org.h2.table.IndexColumn;
@@@ -390,7 -356,7 +394,7 @@@ public class IgniteH2Indexing implement
PreparedStatement stmt = cache.get(sql);
- if (stmt != null && !stmt.isClosed()) {
- if (stmt != null && !((JdbcStatement)stmt).wasCancelled()) {
++ if (stmt != null && !stmt.isClosed() && !((JdbcStatement)stmt).wasCancelled()) {
assert stmt.getConnection() == c;
return stmt;
@@@ -764,31 -733,36 +768,36 @@@
/** {@inheritDoc} */
@SuppressWarnings("unchecked")
- @Override public GridQueryFieldsResult execute(@Nullable final String spaceName, final String qry,
- @Nullable final Collection<Object> params, final IndexingQueryFilter filters,
+ @Override public GridQueryFieldsResult queryLocalSqlFields(@Nullable final String spaceName, final String qry,
- @Nullable final Collection<Object> params, final IndexingQueryFilter filters, boolean enforceJoinOrder)
++ @Nullable final Collection<Object> params, final IndexingQueryFilter filters, boolean enforceJoinOrder,
+ final int timeout, final GridQueryCancel cancel)
throws IgniteCheckedException {
- Connection conn = connectionForSpace(spaceName);
- setFilters(filters);
++ final Connection conn = connectionForSpace(spaceName);
- try {
- final Connection conn = connectionForThread(schema(spaceName));
+ initLocalQueryContext(conn, enforceJoinOrder, filters);
+ try {
- ResultSet rs = executeSqlQueryWithTimer(spaceName, conn, qry, params, true);
+ final PreparedStatement stmt = preparedStatementWithParams(conn, qry, params, true);
- List<GridQueryFieldMetadata> meta = null;
+ List<GridQueryFieldMetadata> meta;
- if (rs != null) {
- try {
- meta = meta(rs.getMetaData());
- }
- catch (SQLException e) {
- throw new IgniteCheckedException("Failed to get meta data.", e);
- }
+ try {
+ meta = meta(stmt.getMetaData());
+ }
+ catch (SQLException e) {
+ throw new IgniteCheckedException("Cannot prepare query metadata", e);
}
- return new GridQueryFieldsResultAdapter(meta, new FieldsIterator(rs));
+ return new GridQueryFieldsResultAdapter(meta, null) {
+ @Override public GridCloseableIterator<List<?>> iterator() throws IgniteCheckedException{
+ ResultSet rs = executeSqlQueryWithTimer(spaceName, stmt, conn, qry, params, timeout, cancel);
+
+ return new FieldsIterator(rs);
+ }
+ };
}
finally {
- setFilters(null);
+ GridH2QueryContext.clearThreadLocal();
}
}
@@@ -877,12 -851,54 +886,52 @@@
bindParameters(stmt, params);
+ return stmt;
+ }
+
+ /**
+ * Executes sql query statement.
+ *
+ * @param conn Connection,.
+ * @param stmt Statement.
+ * @param cancel Query cancel.
+ * @return Result.
+ * @throws IgniteCheckedException If failed.
+ */
+ private ResultSet executeSqlQuery(final Connection conn, final PreparedStatement stmt,
+ int timeoutMillis, @Nullable GridQueryCancel cancel)
+ throws IgniteCheckedException {
+
+ if (timeoutMillis > 0)
+ ((Session)((JdbcConnection)conn).getSession()).setQueryTimeout(timeoutMillis);
+
+ if (cancel != null) {
+ cancel.set(new Runnable() {
+ @Override public void run() {
+ try {
+ stmt.cancel();
- } catch (SQLException ignored) {
++ }
++ catch (SQLException ignored) {
+ // No-op.
+ }
+ }
+ });
+ }
+
try {
return stmt.executeQuery();
}
catch (SQLException e) {
+ // Throw special exception.
+ if (e.getErrorCode() == ErrorCode.STATEMENT_WAS_CANCELED)
+ throw new QueryCancelledException();
+
throw new IgniteCheckedException("Failed to execute SQL query.", e);
}
+ finally {
- if(cancel != null)
- cancel.setCompleted();
-
+ if (timeoutMillis > 0)
+ ((Session)((JdbcConnection)conn).getSession()).setQueryTimeout(0);
+ }
}
/**
@@@ -982,14 -1034,10 +1057,14 @@@
if (tbl == null)
throw new CacheException("Failed to find SQL table for type: " + type.name());
- setFilters(filters);
+ String sql = generateQuery(qry, tbl);
+
+ Connection conn = connectionForThread(tbl.schemaName());
+
+ initLocalQueryContext(conn, false, filter);
try {
- ResultSet rs = executeSqlQueryWithTimer(spaceName, conn, sql, params, true);
- ResultSet rs = executeQuery(spaceName, qry, params, tbl, null);
++ ResultSet rs = executeSqlQueryWithTimer(spaceName, conn, sql, params, true, 0, null);
return new KeyValIterator(rs);
}
@@@ -998,18 -1046,20 +1073,20 @@@
}
}
- /** {@inheritDoc} */
- private Iterable<List<?>> doQueryTwoStep(final GridCacheContext<?, ?> cctx, final GridCacheTwoStepQuery qry,
- final boolean keepCacheObj,
+ /**
+ * @param cctx Cache context.
+ * @param qry Query.
+ * @param keepCacheObj Flag to keep cache object.
+ * @param enforceJoinOrder Enforce join order of tables.
+ * @return Iterable result.
+ */
+ private Iterable<List<?>> runQueryTwoStep(final GridCacheContext<?,?> cctx, final GridCacheTwoStepQuery qry,
- final boolean keepCacheObj, final boolean enforceJoinOrder) {
++ final boolean keepCacheObj, final boolean enforceJoinOrder,
+ final int timeoutMillis,
+ final GridQueryCancel cancel) {
return new Iterable<List<?>>() {
@Override public Iterator<List<?>> iterator() {
- return rdcQryExec.query(cctx, qry, keepCacheObj, enforceJoinOrder);
- try {
- return rdcQryExec.query(cctx, qry, keepCacheObj, timeoutMillis, cancel);
- }
- finally {
- if (cancel != null)
- cancel.setCompleted();
- }
++ return rdcQryExec.query(cctx, qry, keepCacheObj, enforceJoinOrder, timeoutMillis, cancel);
}
};
}
@@@ -1038,8 -1088,10 +1115,11 @@@
fqry.setArgs(qry.getArgs());
fqry.setPageSize(qry.getPageSize());
+ fqry.setDistributedJoins(qry.isDistributedJoins());
+ if(qry.getTimeout() > 0)
+ fqry.setTimeout(qry.getTimeout(), TimeUnit.MILLISECONDS);
+
final QueryCursor<List<?>> res = queryTwoStep(cctx, fqry);
final Iterable<Cache.Entry<K, V>> converted = new Iterable<Cache.Entry<K, V>>() {
@@@ -1203,8 -1198,10 +1283,10 @@@
twoStepQry.pageSize(qry.getPageSize());
+ GridQueryCancel cancel = new GridQueryCancel();
+
QueryCursorImpl<List<?>> cursor = new QueryCursorImpl<>(
- runQueryTwoStep(cctx, twoStepQry, cctx.keepBinary(), enforceJoinOrder));
- doQueryTwoStep(cctx, twoStepQry, cctx.keepBinary(), qry.getTimeout(), cancel), cancel);
++ runQueryTwoStep(cctx, twoStepQry, cctx.keepBinary(), enforceJoinOrder, qry.getTimeout(), cancel), cancel);
cursor.fieldsMeta(meta);
@@@ -1263,7 -1271,7 +1345,7 @@@
if (!upper.startsWith("FROM"))
from = " FROM " + t +
(upper.startsWith("WHERE") || upper.startsWith("ORDER") || upper.startsWith("LIMIT") ?
-- " " : " WHERE ");
++ " " : " WHERE ");
qry = "SELECT " + t + "." + KEY_FIELD_NAME + ", " + t + "." + VAL_FIELD_NAME + from + qry;
@@@ -1561,29 -1552,10 +1643,29 @@@
if (tbl == null)
return -1;
- IgniteSpiCloseableIterator<List<?>> iter = execute(spaceName,
- "SELECT COUNT(*) FROM " + tbl.fullTableName(), null, null, 0, null).iterator();
+ Connection conn = connectionForSpace(spaceName);
+
+ setupConnection(conn, false, false);
+
- ResultSet rs = executeSqlQuery(conn,
- "SELECT COUNT(*) FROM " + tbl.fullTableName(), null, false);
-
+ try {
++ ResultSet rs = executeSqlQuery(conn, prepareStatement(conn, "SELECT COUNT(*) FROM " + tbl.fullTableName(), false),
++ 0, null);
++
+ if (!rs.next())
+ throw new IllegalStateException();
- return ((Number)iter.next().get(0)).longValue();
+ return rs.getLong(1);
+ }
+ catch (SQLException e) {
+ throw new IgniteCheckedException(e);
+ }
+ }
+
+ /**
+ * @return Busy lock.
+ */
+ public GridSpinBusyLock busyLock() {
+ return busyLock;
}
/**
@@@ -1875,11 -1765,11 +1957,11 @@@
U.error(log, "Failed to drop schema on cache stop (will ignore): " + U.maskName(ccfg.getName()), e);
}
- for (Iterator<Map.Entry<T3<String, String, Boolean>, TwoStepCachedQuery>> it = twoStepCache.entrySet().iterator();
- it.hasNext();) {
- Map.Entry<T3<String, String, Boolean>, TwoStepCachedQuery> e = it.next();
+ for (Iterator<Map.Entry<TwoStepCachedQueryKey, TwoStepCachedQuery>> it = twoStepCache.entrySet().iterator();
- it.hasNext();) {
++ it.hasNext();) {
+ Map.Entry<TwoStepCachedQueryKey, TwoStepCachedQuery> e = it.next();
- if (F.eq(e.getKey().get1(), ccfg.getName()))
+ if (F.eq(e.getKey().space, ccfg.getName()))
it.remove();
}
}
@@@ -3000,4 -2715,10 +3082,10 @@@
lastUsage = U.currentTimeMillis();
}
}
- }
+
+ /** {@inheritDoc} */
+ @Override public void cancelAllQueries() {
+ for (Connection conn : conns)
+ U.close(conn, log);
+ }
-}
++}
http://git-wip-us.apache.org/repos/asf/ignite/blob/5fac786b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
----------------------------------------------------------------------
diff --cc modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
index bb5e419,1f05bf7..0314b3d
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
@@@ -35,6 -32,6 +35,7 @@@ import javax.cache.CacheException
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteLogger;
++import org.apache.ignite.cache.query.QueryCancelledException;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.events.CacheQueryExecutedEvent;
import org.apache.ignite.events.CacheQueryReadEvent;
@@@ -51,12 -49,9 +52,13 @@@ import org.apache.ignite.internal.proce
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionsReservation;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridReservable;
import org.apache.ignite.internal.processors.cache.query.CacheQueryType;
+import org.apache.ignite.internal.processors.cache.query.GridCacheQueryMarshallable;
import org.apache.ignite.internal.processors.cache.query.GridCacheSqlQuery;
+ import org.apache.ignite.internal.processors.query.GridQueryCancel;
import org.apache.ignite.internal.processors.query.h2.IgniteH2Indexing;
+import org.apache.ignite.internal.processors.query.h2.opt.GridH2QueryContext;
+import org.apache.ignite.internal.processors.query.h2.opt.GridH2RetryException;
+import org.apache.ignite.internal.processors.query.h2.opt.GridH2Table;
import org.apache.ignite.internal.processors.query.h2.twostep.messages.GridQueryCancelRequest;
import org.apache.ignite.internal.processors.query.h2.twostep.messages.GridQueryFailResponse;
import org.apache.ignite.internal.processors.query.h2.twostep.messages.GridQueryNextPageRequest;
@@@ -158,8 -144,8 +160,8 @@@ public class GridMapQueryExecutor
if (nodeRess == null)
return;
- for (QueryResults ress : nodeRess.values())
- ress.cancel();
+ for (QueryResults ress : nodeRess.results().values())
- ress.cancel();
++ ress.cancel(true);
}
}, EventType.EVT_NODE_FAILED, EventType.EVT_NODE_LEFT);
@@@ -237,7 -208,7 +239,7 @@@
if (results == null)
return;
-- results.cancel();
++ results.cancel(true);
}
/**
@@@ -409,226 -393,132 +411,231 @@@
* @param req Query request.
*/
private void onQueryRequest(ClusterNode node, GridQueryRequest req) {
- ConcurrentMap<Long,QueryResults> nodeRess = resultsForNode(node.id());
+ List<Integer> cacheIds;
- QueryResults qr = null;
+ if (req.extraSpaces() != null) {
+ cacheIds = new ArrayList<>(req.extraSpaces().size() + 1);
- List<GridReservable> reserved = new ArrayList<>();
+ cacheIds.add(CU.cacheId(req.space()));
- try {
- // Unmarshall query params.
- Collection<GridCacheSqlQuery> qrys;
+ for (String extraSpace : req.extraSpaces())
+ cacheIds.add(CU.cacheId(extraSpace));
+ }
+ else
+ cacheIds = Collections.singletonList(CU.cacheId(req.space()));
+
+ onQueryRequest0(node,
+ req.requestId(),
+ req.queries(),
+ cacheIds,
+ req.topologyVersion(),
+ null,
+ req.partitions(),
+ null,
+ req.pageSize(),
- false);
++ false,
++ req.timeout());
+ }
- try {
- qrys = req.queries();
+ /**
+ * @param node Node.
+ * @param req Query request.
+ */
+ private void onQueryRequest(ClusterNode node, GridH2QueryRequest req) {
+ Map<UUID,int[]> partsMap = req.partitions();
+ int[] parts = partsMap == null ? null : partsMap.get(ctx.localNodeId());
+
+ onQueryRequest0(node,
+ req.requestId(),
+ req.queries(),
+ req.caches(),
+ req.topologyVersion(),
+ partsMap,
+ parts,
+ req.tables(),
+ req.pageSize(),
- req.isFlagSet(GridH2QueryRequest.FLAG_DISTRIBUTED_JOINS));
++ req.isFlagSet(GridH2QueryRequest.FLAG_DISTRIBUTED_JOINS),
++ req.timeout());
+ }
- if (!node.isLocal()) {
- Marshaller m = ctx.config().getMarshaller();
+ /**
+ * @param node Node authored request.
+ * @param reqId Request ID.
+ * @param qrys Queries to execute.
+ * @param cacheIds Caches which will be affected by these queries.
+ * @param topVer Topology version.
+ * @param partsMap Partitions map for unstable topology.
+ * @param parts Explicit partitions for current node.
+ * @param tbls Tables.
+ * @param pageSize Page size.
+ * @param distributedJoins Can we expect distributed joins to be ran.
+ */
+ private void onQueryRequest0(
+ ClusterNode node,
+ long reqId,
+ Collection<GridCacheSqlQuery> qrys,
+ List<Integer> cacheIds,
+ AffinityTopologyVersion topVer,
+ Map<UUID, int[]> partsMap,
+ int[] parts,
+ Collection<String> tbls,
+ int pageSize,
- boolean distributedJoins
++ boolean distributedJoins,
++ int timeout
+ ) {
+ // Prepare to run queries.
+ GridCacheContext<?, ?> mainCctx = ctx.cache().context().cacheContext(cacheIds.get(0));
- for (GridCacheSqlQuery qry : qrys)
- qry.unmarshallParams(m, ctx);
- }
- }
- catch (IgniteCheckedException e) {
- throw new CacheException("Failed to unmarshall parameters.", e);
- }
+ if (mainCctx == null)
+ throw new CacheException("Failed to find cache.");
+
+ NodeResults nodeRess = resultsForNode(node.id());
- List<String> caches = (List<String>)F.concat(true, req.space(), req.extraSpaces());
+ QueryResults qr = null;
- // Topology version can be null in rolling restart with previous version!
- final AffinityTopologyVersion topVer = req.topologyVersion();
+ List<GridReservable> reserved = new ArrayList<>();
+ try {
if (topVer != null) {
// Reserve primary for topology version or explicit partitions.
- if (!reservePartitions(caches, topVer, req.partitions(), reserved)) {
- sendRetry(node, req.requestId());
+ if (!reservePartitions(cacheIds, topVer, parts, reserved)) {
+ sendRetry(node, reqId);
return;
}
}
- // Prepare to run queries.
- GridCacheContext<?,?> mainCctx = cacheContext(req.space());
+ qr = new QueryResults(reqId, qrys.size(), mainCctx);
- if (mainCctx == null)
- throw new CacheException("Failed to find cache: " + req.space());
+ if (nodeRess.results().put(reqId, qr) != null)
+ throw new IllegalStateException();
- qr = new QueryResults(req.requestId(), qrys.size(), mainCctx);
+ // Prepare query context.
+ GridH2QueryContext qctx = new GridH2QueryContext(ctx.localNodeId(),
+ node.id(),
+ reqId,
+ mainCctx.isReplicated() ? REPLICATED : MAP)
+ .filter(h2.backupFilter(topVer, parts))
+ .partitionsMap(partsMap)
+ .distributedJoins(distributedJoins)
+ .pageSize(pageSize)
+ .topologyVersion(topVer)
+ .reservations(reserved);
- if (nodeRess.put(req.requestId(), qr) != null)
- throw new IllegalStateException();
+ List<GridH2Table> snapshotedTbls = null;
- h2.setFilters(h2.backupFilter(caches, topVer, req.partitions()));
+ if (!F.isEmpty(tbls)) {
+ snapshotedTbls = new ArrayList<>(tbls.size());
- // TODO Prepare snapshots for all the needed tables before the run.
+ for (String identifier : tbls) {
+ GridH2Table tbl = h2.dataTable(identifier);
- // Run queries.
- int i = 0;
+ Objects.requireNonNull(tbl, identifier);
- for (GridCacheSqlQuery qry : qrys) {
- ResultSet rs = h2.executeSqlQueryWithTimer(req.space(),
- h2.connectionForSpace(req.space()),
- qry.query(),
- F.asList(qry.parameters()),
- true,
- req.timeout(),
- qr.cancels[i]);
+ tbl.snapshotIndexes(qctx);
- if (ctx.event().isRecordable(EVT_CACHE_QUERY_EXECUTED)) {
- ctx.event().record(new CacheQueryExecutedEvent<>(
- node,
- "SQL query executed.",
- EVT_CACHE_QUERY_EXECUTED,
- CacheQueryType.SQL.name(),
- mainCctx.namex(),
- null,
- qry.query(),
- null,
- null,
- qry.parameters(),
- node.id(),
- null));
+ snapshotedTbls.add(tbl);
}
+ }
- assert rs instanceof JdbcResultSet : rs.getClass();
+ Connection conn = h2.connectionForSpace(mainCctx.name());
- qr.addResult(i, qry, node.id(), rs);
+ // Here we enforce join order to have the same behavior on all the nodes.
+ h2.setupConnection(conn, distributedJoins, true);
- if (qr.canceled) {
- qr.result(i).close();
+ GridH2QueryContext.set(qctx);
- return;
+ // qctx is set, we have to release reservations inside of it.
+ reserved = null;
+
+ try {
+ if (nodeRess.cancelled(reqId)) {
+ GridH2QueryContext.clear(ctx.localNodeId(), node.id(), reqId, qctx.type());
+
+ nodeRess.results().remove(reqId);
+
- return;
++ throw new QueryCancelledException();
}
- // Send the first page.
- sendNextPage(nodeRess, node, qr, i, req.pageSize());
+ // Run queries.
+ int i = 0;
+
+ boolean evt = ctx.event().isRecordable(EVT_CACHE_QUERY_EXECUTED);
+
+ for (GridCacheSqlQuery qry : qrys) {
+ ResultSet rs = h2.executeSqlQueryWithTimer(mainCctx.name(), conn, qry.query(),
- F.asList(qry.parameters()), true);
++ F.asList(qry.parameters()), true,
++ timeout,
++ qr.cancels[i]);
+
+ if (evt) {
+ ctx.event().record(new CacheQueryExecutedEvent<>(
+ node,
+ "SQL query executed.",
+ EVT_CACHE_QUERY_EXECUTED,
+ CacheQueryType.SQL.name(),
+ mainCctx.namex(),
+ null,
+ qry.query(),
+ null,
+ null,
+ qry.parameters(),
+ node.id(),
+ null));
+ }
+
+ assert rs instanceof JdbcResultSet : rs.getClass();
+
+ qr.addResult(i, qry, node.id(), rs);
+
+ if (qr.canceled) {
+ qr.result(i).close();
- return;
- i++;
++ throw new QueryCancelledException();
+ }
+
+ // Send the first page.
+ sendNextPage(nodeRess, node, qr, i, pageSize);
+
+ i++;
+ }
+ }
+ finally {
+ GridH2QueryContext.clearThreadLocal();
+
+ if (!distributedJoins)
+ qctx.clearContext(false);
+
+ if (!F.isEmpty(snapshotedTbls)) {
+ for (GridH2Table dataTbl : snapshotedTbls)
+ dataTbl.releaseSnapshots();
+ }
}
}
catch (Throwable e) {
if (qr != null) {
- nodeRess.remove(req.requestId(), qr);
+ nodeRess.results().remove(reqId, qr);
-- qr.cancel();
++ qr.cancel(false);
}
- U.error(log, "Failed to execute local query: " + req, e);
+ if (X.hasCause(e, GridH2RetryException.class))
+ sendRetry(node, reqId);
+ else {
+ U.error(log, "Failed to execute local query.", e);
- sendError(node, req.requestId(), e);
+ sendError(node, reqId, e);
- if (e instanceof Error)
- throw (Error)e;
+ if (e instanceof Error)
+ throw (Error)e;
+ }
}
finally {
- h2.setFilters(null);
-
- // Release reserved partitions.
- for (GridReservable r : reserved)
- r.release();
-
- // Ensure all cancels state is correct.
- if (qr != null)
- for (int i = 0; i < qr.cancels.length; i++) {
- GridQueryCancel cancel = qr.cancels[i];
-
- if (cancel != null)
- cancel.setCompleted();
- }
+ if (reserved != null) {
+ // Release reserved partitions.
+ for (int i = 0; i < reserved.size(); i++)
+ reserved.get(i).release();
+ }
}
}
@@@ -661,12 -548,12 +668,24 @@@
* @param req Request.
*/
private void onNextPageRequest(ClusterNode node, GridQueryNextPageRequest req) {
- ConcurrentMap<Long, QueryResults> nodeRess = qryRess.get(node.id());
+ NodeResults nodeRess = qryRess.get(node.id());
+
- QueryResults qr = nodeRess == null ? null : nodeRess.results().get(req.queryRequestId());
++ if (nodeRess == null) {
++ sendError(node, req.queryRequestId(), new CacheException("No node result found for request: " + req));
- if (qr == null || qr.canceled)
- QueryResults qr = nodeRess == null ? null : nodeRess.get(req.queryRequestId());
++ return;
++ } else if (nodeRess.cancelled(req.queryRequestId())) {
++ sendError(node, req.queryRequestId(), new QueryCancelledException());
++
++ return;
++ }
+
- if (qr == null || qr.canceled)
++ QueryResults qr = nodeRess.results().get(req.queryRequestId());
++
++ if (qr == null)
sendError(node, req.queryRequestId(), new CacheException("No query result found for request: " + req));
++ else if (qr.canceled)
++ sendError(node, req.queryRequestId(), new QueryCancelledException());
else
sendNextPage(nodeRess, node, qr, req.query(), req.pageSize());
}
@@@ -854,9 -705,9 +880,9 @@@
}
/**
- *
+ * Cancels the query.
*/
-- void cancel() {
++ void cancel(boolean forceQryCancel) {
if (canceled)
return;
@@@ -865,8 -716,16 +891,18 @@@
for (int i = 0; i < results.length(); i++) {
QueryResult res = results.get(i);
- if (res != null)
+ if (res != null) {
res.close();
+
+ continue;
+ }
+
- GridQueryCancel cancel = cancels[i];
++ if (forceQryCancel) {
++ GridQueryCancel cancel = cancels[i];
+
- if (cancel != null)
- cancel.cancel();
++ if (cancel != null)
++ cancel.cancel();
++ }
}
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/5fac786b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
----------------------------------------------------------------------
diff --cc modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
index 04449ac,3fdbf42..3847373
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
@@@ -46,6 -45,6 +45,7 @@@ import org.apache.ignite.IgniteCheckedE
import org.apache.ignite.IgniteClientDisconnectedException;
import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteLogger;
++import org.apache.ignite.cache.query.QueryCancelledException;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.events.DiscoveryEvent;
import org.apache.ignite.events.Event;
@@@ -64,9 -62,10 +64,10 @@@ import org.apache.ignite.internal.proce
import org.apache.ignite.internal.processors.cache.query.GridCacheSqlQuery;
import org.apache.ignite.internal.processors.cache.query.GridCacheTwoStepQuery;
import org.apache.ignite.internal.processors.query.GridQueryCacheObjectsIterator;
+ import org.apache.ignite.internal.processors.query.GridQueryCancel;
import org.apache.ignite.internal.processors.query.h2.GridH2ResultSetIterator;
import org.apache.ignite.internal.processors.query.h2.IgniteH2Indexing;
-import org.apache.ignite.cache.query.QueryCancelledException;
+import org.apache.ignite.internal.processors.query.h2.opt.GridH2QueryContext;
import org.apache.ignite.internal.processors.query.h2.sql.GridSqlQuerySplitter;
import org.apache.ignite.internal.processors.query.h2.sql.GridSqlType;
import org.apache.ignite.internal.processors.query.h2.twostep.messages.GridQueryCancelRequest;
@@@ -74,14 -73,11 +75,15 @@@ import org.apache.ignite.internal.proce
import org.apache.ignite.internal.processors.query.h2.twostep.messages.GridQueryNextPageRequest;
import org.apache.ignite.internal.processors.query.h2.twostep.messages.GridQueryNextPageResponse;
import org.apache.ignite.internal.processors.query.h2.twostep.messages.GridQueryRequest;
+import org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2QueryRequest;
import org.apache.ignite.internal.util.GridSpinBusyLock;
+import org.apache.ignite.internal.util.typedef.CIX2;
import org.apache.ignite.internal.util.typedef.F;
++import org.apache.ignite.internal.util.typedef.X;
import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteBiClosure;
import org.apache.ignite.lang.IgniteFuture;
-import org.apache.ignite.marshaller.Marshaller;
+import org.apache.ignite.lang.IgniteProductVersion;
import org.apache.ignite.plugin.extensions.communication.Message;
import org.h2.command.ddl.CreateTableData;
import org.h2.engine.Session;
@@@ -468,16 -456,13 +476,20 @@@ public class GridReduceQueryExecutor
/**
* @param cctx Cache context.
* @param qry Query.
- * @param keepBinary Keep binary.
+ * @param keepPortable Keep portable.
+ * @param enforceJoinOrder Enforce join order of tables.
- * @return Cursor.
+ * @param timeoutMillis Timeout in milliseconds.
+ * @param cancel Query cancel.
+ * @return Rows iterator.
*/
- public Iterator<List<?>> query(GridCacheContext<?,?> cctx, GridCacheTwoStepQuery qry, boolean keepBinary,
- int timeoutMillis, GridQueryCancel cancel) {
+ public Iterator<List<?>> query(
+ GridCacheContext<?, ?> cctx,
+ GridCacheTwoStepQuery qry,
+ boolean keepPortable,
- boolean enforceJoinOrder
++ boolean enforceJoinOrder,
++ int timeoutMillis,
++ GridQueryCancel cancel
+ ) {
for (int attempt = 0;; attempt++) {
if (attempt != 0) {
try {
@@@ -579,39 -575,27 +595,49 @@@
mapQrys.add(new GridCacheSqlQuery("EXPLAIN " + mapQry.query(), mapQry.parameters()));
}
- boolean retry = false;
-
- if (nodes.size() != 1 || !F.first(nodes).isLocal()) { // Marshall params for remotes.
- Marshaller m = ctx.config().getMarshaller();
+ IgniteProductVersion minNodeVer = cctx.shared().exchange().minimumNodeVersion(topVer);
- for (GridCacheSqlQuery mapQry : mapQrys)
- mapQry.marshallParams(m);
- }
+ final boolean oldStyle = minNodeVer.compareToIgnoreTimestamp(DISTRIBUTED_JOIN_SINCE) < 0;
+ final boolean distributedJoins = qry.distributedJoins();
+ cancel.set(new Runnable() {
+ @Override public void run() {
- send(finalNodes, new GridQueryCancelRequest(qryReqId), null);
++ send(finalNodes, new GridQueryCancelRequest(qryReqId), null, false);
+ }
+ });
+
+ boolean retry = false;
+
+ if (oldStyle && distributedJoins)
+ throw new CacheException("Failed to enable distributed joins. Topology contains older data nodes.");
+
if (send(nodes,
- new GridQueryRequest(qryReqId, r.pageSize, space, mapQrys, topVer, extraSpaces, null, timeoutMillis), partsMap)) {
+ oldStyle ?
+ new GridQueryRequest(qryReqId,
+ r.pageSize,
+ space,
+ mapQrys,
+ topVer,
+ extraSpaces(space, qry.spaces()),
- null) :
++ null,
++ timeoutMillis) :
+ new GridH2QueryRequest()
+ .requestId(qryReqId)
+ .topologyVersion(topVer)
+ .pageSize(r.pageSize)
+ .caches(qry.caches())
+ .tables(distributedJoins ? qry.tables() : null)
+ .partitions(convert(partsMap))
+ .queries(mapQrys)
- .flags(distributedJoins ? GridH2QueryRequest.FLAG_DISTRIBUTED_JOINS : 0),
++ .flags(distributedJoins ? GridH2QueryRequest.FLAG_DISTRIBUTED_JOINS : 0)
++ .timeout(timeoutMillis),
+ oldStyle && partsMap != null ? new ExplicitPartitionsSpecializer(partsMap) : null,
+ distributedJoins)
- ) {
++ ) {
awaitAllReplies(r, nodes);
+ cancel.checkCancelled();
+
Object state = r.state.get();
if (state != null) {
@@@ -663,30 -653,20 +692,34 @@@
resIter = res.iterator();
}
else {
+ cancel.checkCancelled();
+
- GridCacheSqlQuery rdc = qry.reduceQuery();
+ UUID locNodeId = ctx.localNodeId();
+
+ h2.setupConnection(r.conn, false, enforceJoinOrder);
- // Statement caching is prohibited here because we can't guarantee correct merge index reuse.
- ResultSet res = h2.executeSqlQueryWithTimer(space,
- r.conn,
- rdc.query(),
- F.asList(rdc.parameters()),
- false,
- timeoutMillis,
- cancel);
+ GridH2QueryContext.set(new GridH2QueryContext(locNodeId, locNodeId, qryReqId, REDUCE)
+ .pageSize(r.pageSize).distributedJoins(false));
- resIter = new Iter(res);
+ try {
+ if (qry.explain())
+ return explainPlan(r.conn, space, qry);
+
+ GridCacheSqlQuery rdc = qry.reduceQuery();
+
+ ResultSet res = h2.executeSqlQueryWithTimer(space,
+ r.conn,
+ rdc.query(),
+ F.asList(rdc.parameters()),
- false);
++ false,
++ timeoutMillis,
++ cancel);
+
+ resIter = new Iter(res);
+ }
+ finally {
+ GridH2QueryContext.clearThreadLocal();
+ }
}
}
@@@ -699,16 -677,7 +730,7 @@@
continue;
}
- final Collection<ClusterNode> finalNodes = nodes;
-
- return new GridQueryCacheObjectsIterator(resIter, cctx, keepPortable) {
- @Override public void close() throws Exception {
- super.close();
-
- if (distributedJoins || !allIndexesFetched(r.idxs))
- send(finalNodes, new GridQueryCancelRequest(qryReqId), null, false);
- }
- };
- return new GridQueryCacheObjectsIterator(resIter, cctx, keepBinary);
++ return new GridQueryCacheObjectsIterator(resIter, cctx, keepPortable);
}
catch (IgniteCheckedException | RuntimeException e) {
U.closeQuiet(r.conn);
@@@ -741,19 -717,33 +770,43 @@@
}
/**
+ * @param idxs Merge indexes.
+ * @return {@code true} If all remote data was fetched.
+ */
+ private static boolean allIndexesFetched(List<GridMergeIndex> idxs) {
+ for (int i = 0; i < idxs.size(); i++) {
+ if (!idxs.get(i).fetchedAll())
+ return false;
+ }
+
+ return true;
+ }
+
+ /**
+ * Returns true if the exception is triggered by query cancel.
+ *
+ * @param e Exception.
+ * @return {@code true} if exception is caused by cancel.
+ */
+ private boolean wasCancelled(CacheException e) {
- return e.getSuppressed() != null && e.getSuppressed().length > 0 &&
- e.getSuppressed()[0] instanceof QueryCancelledException;
++ return X.hasSuppressed(e, QueryCancelledException.class);
+ }
+
+ /**
- * Explicitly cancels remote queries.
- * @param nodes Nodes.
+ * @param r Query run.
+ * @param qryReqId Query id.
+ */
+ private void cancelRemoteQueriesIfNeeded(Collection<ClusterNode> nodes, QueryRun r, long qryReqId) {
+ for (GridMergeIndex idx : r.idxs) {
+ if (!idx.fetchedAll()) {
- send(nodes, new GridQueryCancelRequest(qryReqId), null);
++ send(nodes, new GridQueryCancelRequest(qryReqId), null, false);
+
+ break;
+ }
+ }
+ }
+
+ /**
* @param r Query run.
* @param nodes Nodes to check periodically if they alive.
* @throws IgniteInterruptedCheckedException If interrupted.
@@@ -1290,7 -1259,7 +1345,7 @@@
latch.countDown();
for (GridMergeIndex idx : idxs) // Fail all merge indexes.
-- idx.fail(nodeId);
++ idx.fail(nodeId, o instanceof CacheException ? (CacheException) o : null);
}
/**
@@@ -1332,24 -1301,4 +1387,24 @@@
return res;
}
}
+
+ /**
+ *
+ */
+ private class ExplicitPartitionsSpecializer implements IgniteBiClosure<ClusterNode,Message,Message> {
+ /** */
+ private final Map<ClusterNode,IntArray> partsMap;
+
+ /**
+ * @param partsMap Partitions map.
+ */
+ private ExplicitPartitionsSpecializer(Map<ClusterNode,IntArray> partsMap) {
+ this.partsMap = partsMap;
+ }
+
+ /** {@inheritDoc} */
+ @Override public Message apply(ClusterNode n, Message msg) {
+ return copy(msg, n, partsMap);
+ }
+ }
- }
+ }
http://git-wip-us.apache.org/repos/asf/ignite/blob/5fac786b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheQueryAbstractDistributedJoinSelfTest.java
----------------------------------------------------------------------
diff --cc modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheQueryAbstractDistributedJoinSelfTest.java
index 0000000,0000000..be34a09
new file mode 100644
--- /dev/null
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheQueryAbstractDistributedJoinSelfTest.java
@@@ -1,0 -1,0 +1,7 @@@
++package org.apache.ignite.internal.processors.cache.distributed.near;
++
++/**
++ * Created by vozerov on 31.10.2016.
++ */
++public class IgniteCacheQueryAbstractDistributedJoinSelfTest {
++}
http://git-wip-us.apache.org/repos/asf/ignite/blob/5fac786b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheQueryStopOnCancelOrTimeoutDistributedJoinSelfTest.java
----------------------------------------------------------------------
diff --cc modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheQueryStopOnCancelOrTimeoutDistributedJoinSelfTest.java
index 0000000,0000000..80bd62e
new file mode 100644
--- /dev/null
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheQueryStopOnCancelOrTimeoutDistributedJoinSelfTest.java
@@@ -1,0 -1,0 +1,7 @@@
++package org.apache.ignite.internal.processors.cache.distributed.near;
++
++/**
++ * Created by vozerov on 31.10.2016.
++ */
++public class IgniteCacheQueryStopOnCancelOrTimeoutDistributedJoinSelfTest {
++}
http://git-wip-us.apache.org/repos/asf/ignite/blob/5fac786b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/GridIndexingSpiAbstractSelfTest.java
----------------------------------------------------------------------
diff --cc modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/GridIndexingSpiAbstractSelfTest.java
index 760ee19,6e493ea..bcf8f9d
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/GridIndexingSpiAbstractSelfTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/GridIndexingSpiAbstractSelfTest.java
@@@ -349,8 -348,8 +349,8 @@@ public abstract class GridIndexingSpiAb
// Fields query
GridQueryFieldsResult fieldsRes =
- spi.execute("A", "select a.a.name n1, a.a.age a1, b.a.name n2, " +
- "b.a.age a2 from a.a, b.a where a.a.id = b.a.id ", Collections.emptySet(), null, 0, null);
+ spi.queryLocalSqlFields("A", "select a.a.name n1, a.a.age a1, b.a.name n2, " +
- "b.a.age a2 from a.a, b.a where a.a.id = b.a.id ", Collections.emptySet(), null, false);
++ "b.a.age a2 from a.a, b.a where a.a.id = b.a.id ", Collections.emptySet(), null, false, 0, null);
String[] aliases = {"N1", "A1", "N2", "A2"};
Object[] vals = { "Valera", 19, "Kolya", 25};
@@@ -451,8 -450,7 +451,8 @@@
time = now;
range *= 3;
- GridQueryFieldsResult res = spi.execute("A", sql, Arrays.<Object>asList(1, range), null, 0, null);
+ GridQueryFieldsResult res = spi.queryLocalSqlFields("A", sql, Arrays.<Object>asList(1, range), null,
- false);
++ false, 0, null);
assert res.iterator().hasNext();
http://git-wip-us.apache.org/repos/asf/ignite/blob/5fac786b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite2.java
----------------------------------------------------------------------
diff --cc modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite2.java
index 568f880,9128f76..5722c01
--- a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite2.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite2.java
@@@ -39,7 -41,12 +41,8 @@@ import org.apache.ignite.internal.proce
import org.apache.ignite.internal.processors.cache.distributed.replicated.IgniteCacheReplicatedFieldsQueryP2PEnabledSelfTest;
import org.apache.ignite.internal.processors.cache.distributed.replicated.IgniteCacheReplicatedFieldsQuerySelfTest;
import org.apache.ignite.internal.processors.cache.local.IgniteCacheLocalFieldsQuerySelfTest;
+ import org.apache.ignite.internal.processors.cache.local.IgniteCacheLocalQueryCancelOrTimeoutSelfTest;
import org.apache.ignite.internal.processors.cache.query.GridCacheSwapScanQuerySelfTest;
-import org.apache.ignite.internal.processors.cache.reducefields.GridCacheReduceFieldsQueryAtomicSelfTest;
-import org.apache.ignite.internal.processors.cache.reducefields.GridCacheReduceFieldsQueryLocalSelfTest;
-import org.apache.ignite.internal.processors.cache.reducefields.GridCacheReduceFieldsQueryPartitionedSelfTest;
-import org.apache.ignite.internal.processors.cache.reducefields.GridCacheReduceFieldsQueryReplicatedSelfTest;
import org.apache.ignite.internal.processors.query.h2.sql.BaseH2CompareQueryTest;
import org.apache.ignite.internal.processors.query.h2.sql.H2CompareBigQueryTest;
import org.apache.ignite.spi.communication.tcp.GridOrderedMessageCancelSelfTest;