You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by am...@apache.org on 2022/01/10 13:05:41 UTC
[ignite] branch master updated: IGNITE-16223 Log running query id. (#9704)
This is an automated email from the ASF dual-hosted git repository.
amashenkov pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new 77b1357c IGNITE-16223 Log running query id. (#9704)
77b1357c is described below
commit 77b1357c913984a43facc654200dc6053ed8b736
Author: Andrew V. Mashenkov <AM...@users.noreply.github.com>
AuthorDate: Mon Jan 10 16:05:20 2022 +0300
IGNITE-16223 Log running query id. (#9704)
---
.../qa/query/WarningOnBigQueryResultsBaseTest.java | 15 +++---
.../processors/bulkload/BulkLoadProcessor.java | 4 +-
.../processors/query/RunningQueryManager.java | 20 ++++---
.../cache/query/RegisteredQueryCursor.java | 6 +--
.../processors/query/h2/CommandProcessor.java | 4 +-
.../query/h2/H2QueryFetchSizeInterceptor.java | 4 +-
.../internal/processors/query/h2/H2QueryInfo.java | 45 ++++++++++------
.../processors/query/h2/IgniteH2Indexing.java | 61 +++++++++++++++++-----
.../processors/query/h2/MapH2QueryInfo.java | 18 +++----
.../processors/query/h2/ReduceH2QueryInfo.java | 7 ++-
.../query/h2/twostep/GridMapQueryExecutor.java | 6 ++-
.../query/h2/twostep/GridReduceQueryExecutor.java | 8 ++-
.../query/h2/twostep/msg/GridH2QueryRequest.java | 40 +++++++++++++-
13 files changed, 171 insertions(+), 67 deletions(-)
diff --git a/modules/clients/src/test/java/org/apache/ignite/qa/query/WarningOnBigQueryResultsBaseTest.java b/modules/clients/src/test/java/org/apache/ignite/qa/query/WarningOnBigQueryResultsBaseTest.java
index 70f06c4..e463cb6 100644
--- a/modules/clients/src/test/java/org/apache/ignite/qa/query/WarningOnBigQueryResultsBaseTest.java
+++ b/modules/clients/src/test/java/org/apache/ignite/qa/query/WarningOnBigQueryResultsBaseTest.java
@@ -33,6 +33,7 @@ import org.apache.ignite.configuration.ClientConnectorConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.processors.cache.index.AbstractIndexingCommonTest;
+import org.apache.ignite.internal.util.GridLogThrottle;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.lang.IgnitePredicate;
import org.apache.ignite.testframework.GridTestUtils;
@@ -63,8 +64,8 @@ public class WarningOnBigQueryResultsBaseTest extends AbstractIndexingCommonTest
/** Log message pattern. */
private static final Pattern logPtrn = Pattern.compile(
- "fetched=([0-9]+), duration=([0-9]+)ms, type=(MAP|LOCAL|REDUCE), distributedJoin=(true|false), enforceJoinOrder=(true|false), " +
- "lazy=(true|false), schema=(\\S+), sql");
+ "fetched=([0-9]+), duration=([0-9]+)ms, type=(MAP|LOCAL|REDUCE), distributedJoin=(true|false), " +
+ "enforceJoinOrder=(true|false), lazy=(true|false), schema=(\\S+), sql");
/** Test log. */
private static Map<String, BigResultsLogListener> logListeners = new HashMap<>();
@@ -124,6 +125,9 @@ public class WarningOnBigQueryResultsBaseTest extends AbstractIndexingCommonTest
@Override protected void beforeTestsStarted() throws Exception {
super.beforeTestsStarted();
+ // Negative timeout to disable a throttling of the huge results warning messages.
+ GridLogThrottle.throttleTimeout(-1);
+
// Starts the first node.
startGrid(0);
@@ -162,6 +166,8 @@ public class WarningOnBigQueryResultsBaseTest extends AbstractIndexingCommonTest
@Override protected void afterTestsStopped() throws Exception {
stopAllGrids();
+ GridLogThrottle.throttleTimeout(GridLogThrottle.DFLT_THROTTLE_TIMEOUT);
+
super.afterTestsStopped();
}
@@ -269,10 +275,7 @@ public class WarningOnBigQueryResultsBaseTest extends AbstractIndexingCommonTest
schema = m.group(7);
sql = s.substring(s.indexOf(", sql='") + 7, s.indexOf("', plan="));
- if ("REDUCE".equals(type))
- plan = s.substring(s.indexOf("', plan=") + 8, s.indexOf(", reqId="));
- else
- plan = s.substring(s.indexOf("', plan=") + 8, s.indexOf(", node="));
+ plan = s.substring(s.indexOf("', plan=") + 8, s.indexOf(", reqId="));
assertTrue(sql.contains("SELECT"));
assertTrue(plan.contains("SELECT"));
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/bulkload/BulkLoadProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/bulkload/BulkLoadProcessor.java
index f391772..af863ea 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/bulkload/BulkLoadProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/bulkload/BulkLoadProcessor.java
@@ -57,7 +57,7 @@ public class BulkLoadProcessor implements AutoCloseable {
private final RunningQueryManager runningQryMgr;
/** Query id. */
- private final Long qryId;
+ private final long qryId;
/** Exception, current load process ended with, or {@code null} if in progress or if succeded. */
private Exception failReason;
@@ -80,7 +80,7 @@ public class BulkLoadProcessor implements AutoCloseable {
* @param tracing Tracing processor.
*/
public BulkLoadProcessor(BulkLoadParser inputParser, IgniteClosureX<List<?>, IgniteBiTuple<?, ?>> dataConverter,
- BulkLoadCacheWriter outputStreamer, RunningQueryManager runningQryMgr, Long qryId, Tracing tracing) {
+ BulkLoadCacheWriter outputStreamer, RunningQueryManager runningQryMgr, long qryId, Tracing tracing) {
this.inputParser = inputParser;
this.dataConverter = dataConverter;
this.outputStreamer = outputStreamer;
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/RunningQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/RunningQueryManager.java
index d6ae245..8308e1d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/RunningQueryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/RunningQueryManager.java
@@ -67,6 +67,9 @@ public class RunningQueryManager {
/** */
public static final String SQL_QRY_HIST_VIEW_DESC = "SQL queries history.";
+ /** Undefined query ID value. */
+ public static final long UNDEFINED_QUERY_ID = 0L;
+
/** Keep registered user queries. */
private final ConcurrentMap<Long, GridRunningQueryInfo> runs = new ConcurrentHashMap<>();
@@ -137,16 +140,16 @@ public class RunningQueryManager {
}
/**
- * Register running query.
+ * Registers running query and returns an id associated with the query.
*
* @param qry Query text.
* @param qryType Query type.
* @param schemaName Schema name.
* @param loc Local query flag.
* @param cancel Query cancel. Should be passed in case query is cancelable, or {@code null} otherwise.
- * @return Id of registered query.
+ * @return Id of registered query. Id is a positive number.
*/
- public Long register(String qry, GridCacheQueryType qryType, String schemaName, boolean loc,
+ public long register(String qry, GridCacheQueryType qryType, String schemaName, boolean loc,
@Nullable GridQueryCancel cancel,
String qryInitiatorId) {
long qryId = qryIdGen.incrementAndGet();
@@ -185,8 +188,8 @@ public class RunningQueryManager {
* @param qryId id of the query, which is given by {@link #register register} method.
* @param failReason exception that caused query execution fail, or {@code null} if query succeded.
*/
- public void unregister(Long qryId, @Nullable Throwable failReason) {
- if (qryId == null)
+ public void unregister(long qryId, @Nullable Throwable failReason) {
+ if (qryId <= 0)
return;
boolean failed = failReason != null;
@@ -297,7 +300,7 @@ public class RunningQueryManager {
*
* @param qryId Query id.
*/
- public void cancel(Long qryId) {
+ public void cancel(long qryId) {
GridRunningQueryInfo run = runs.get(qryId);
if (run != null)
@@ -336,10 +339,11 @@ public class RunningQueryManager {
/**
* Gets info about running query by their id.
- * @param qryId
+ *
+ * @param qryId Query Id.
* @return Running query info or {@code null} in case no running query for given id.
*/
- public @Nullable GridRunningQueryInfo runningQueryInfo(Long qryId) {
+ public @Nullable GridRunningQueryInfo runningQueryInfo(long qryId) {
return runs.get(qryId);
}
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/cache/query/RegisteredQueryCursor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/cache/query/RegisteredQueryCursor.java
index 3945fec..4f00b5c 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/cache/query/RegisteredQueryCursor.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/cache/query/RegisteredQueryCursor.java
@@ -50,7 +50,7 @@ public class RegisteredQueryCursor<T> extends QueryCursorImpl<T> {
private RunningQueryManager runningQryMgr;
/** */
- private Long qryId;
+ private long qryId;
/** Exception caused query failed or {@code null} if it succeded. */
private Exception failReason;
@@ -70,11 +70,11 @@ public class RegisteredQueryCursor<T> extends QueryCursorImpl<T> {
* @param tracing Tracing processor.
*/
public RegisteredQueryCursor(Iterable<T> iterExec, GridQueryCancel cancel, RunningQueryManager runningQryMgr,
- boolean lazy, Long qryId, Tracing tracing) {
+ boolean lazy, long qryId, Tracing tracing) {
super(iterExec, cancel, true, lazy);
assert runningQryMgr != null;
- assert qryId != null;
+ assert qryId != RunningQueryManager.UNDEFINED_QUERY_ID;
this.runningQryMgr = runningQryMgr;
this.qryId = qryId;
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/CommandProcessor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/CommandProcessor.java
index 58ceeef..e5bf0b8 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/CommandProcessor.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/CommandProcessor.java
@@ -407,7 +407,7 @@ public class CommandProcessor {
* @return Result.
*/
public CommandResult runCommand(String sql, SqlCommand cmdNative, GridSqlStatement cmdH2,
- QueryParameters params, @Nullable SqlClientContext cliCtx, Long qryId) throws IgniteCheckedException {
+ QueryParameters params, @Nullable SqlClientContext cliCtx, long qryId) throws IgniteCheckedException {
assert cmdNative != null || cmdH2 != null;
// Do execute.
@@ -1410,7 +1410,7 @@ public class CommandProcessor {
* @return The context (which is the result of the first request/response).
* @throws IgniteCheckedException If something failed.
*/
- private FieldsQueryCursor<List<?>> processBulkLoadCommand(SqlBulkLoadCommand cmd, Long qryId)
+ private FieldsQueryCursor<List<?>> processBulkLoadCommand(SqlBulkLoadCommand cmd, long qryId)
throws IgniteCheckedException {
if (cmd.packetSize() == null)
cmd.packetSize(BulkLoadAckClientParameters.DFLT_PACKET_SIZE);
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/H2QueryFetchSizeInterceptor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/H2QueryFetchSizeInterceptor.java
index cf47a7c..ebafc0c 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/H2QueryFetchSizeInterceptor.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/H2QueryFetchSizeInterceptor.java
@@ -68,7 +68,7 @@ public class H2QueryFetchSizeInterceptor {
++fetchedSize;
if (threshold > 0 && fetchedSize >= threshold) {
- qryInfo.printLogMessage(log, "Query produced big result set. ",
+ qryInfo.printLogMessage(log, "Query produced big result set.",
"fetched=" + fetchedSize);
if (thresholdMult > 1)
@@ -85,7 +85,7 @@ public class H2QueryFetchSizeInterceptor {
*/
public void checkOnClose() {
if (bigResults) {
- qryInfo.printLogMessage(log, "Query produced big result set. ",
+ qryInfo.printLogMessage(log, "Query produced big result set.",
"fetched=" + fetchedSize);
}
}
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/H2QueryInfo.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/H2QueryInfo.java
index 2a62fc9..09afcbb 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/H2QueryInfo.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/H2QueryInfo.java
@@ -19,9 +19,12 @@ package org.apache.ignite.internal.processors.query.h2;
import java.sql.PreparedStatement;
import java.sql.SQLException;
+import java.util.UUID;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.internal.processors.cache.query.IgniteQueryErrorCode;
import org.apache.ignite.internal.processors.query.IgniteSQLException;
+import org.apache.ignite.internal.processors.query.QueryUtils;
+import org.apache.ignite.internal.processors.query.RunningQueryManager;
import org.apache.ignite.internal.processors.query.h2.sql.GridSqlQueryParser;
import org.apache.ignite.internal.util.typedef.internal.LT;
import org.apache.ignite.internal.util.typedef.internal.U;
@@ -56,17 +59,27 @@ public class H2QueryInfo {
/** Prepared statement. */
private final Prepared stmt;
+ /** Originator node uid. */
+ private final UUID nodeId;
+
+ /** Query id. */
+ private final long queryId;
+
/**
* @param type Query type.
* @param stmt Query statement.
* @param sql Query statement.
+ * @param nodeId Originator node id.
+ * @param queryId Query id.
*/
- public H2QueryInfo(QueryType type, PreparedStatement stmt, String sql) {
+ public H2QueryInfo(QueryType type, PreparedStatement stmt, String sql, UUID nodeId, long queryId) {
try {
assert stmt != null;
this.type = type;
this.sql = sql;
+ this.nodeId = nodeId;
+ this.queryId = queryId;
beginTs = U.currentTimeMillis();
@@ -106,22 +119,24 @@ public class H2QueryInfo {
* @param additionalInfo Additional query info.
*/
public void printLogMessage(IgniteLogger log, String msg, String additionalInfo) {
- StringBuilder msgSb = new StringBuilder(msg + " [");
-
- if (additionalInfo != null)
- msgSb.append(additionalInfo).append(", ");
-
- msgSb.append("duration=").append(time()).append("ms")
- .append(", type=").append(type)
- .append(", distributedJoin=").append(distributedJoin)
- .append(", enforceJoinOrder=").append(enforceJoinOrder)
- .append(", lazy=").append(lazy)
- .append(", schema=").append(schema);
+ StringBuilder msgSb = new StringBuilder(msg);
- msgSb.append(", sql='")
- .append(sql);
+ if (queryId == RunningQueryManager.UNDEFINED_QUERY_ID)
+ msgSb.append(" [globalQueryId=(undefined), node=").append(nodeId);
+ else
+ msgSb.append(" [globalQueryId=").append(QueryUtils.globalQueryId(nodeId, queryId));
- msgSb.append("', plan=").append(stmt.getPlanSQL());
+ if (additionalInfo != null)
+ msgSb.append(", ").append(additionalInfo);
+
+ msgSb.append(", duration=").append(time()).append("ms")
+ .append(", type=").append(type)
+ .append(", distributedJoin=").append(distributedJoin)
+ .append(", enforceJoinOrder=").append(enforceJoinOrder)
+ .append(", lazy=").append(lazy)
+ .append(", schema=").append(schema)
+ .append(", sql='").append(sql)
+ .append("', plan=").append(stmt.getPlanSQL());
printInfo(msgSb);
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
index 6b5cb0a..9b13d26 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
@@ -514,7 +514,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
H2TableDescriptor tbl = schemaMgr.tableForType(schemaName, cacheName, typeName);
if (tbl != null && tbl.luceneIndex() != null) {
- Long qryId = runningQueryManager().register(qry, TEXT, schemaName, true, null, null);
+ long qryId = runningQueryManager().register(qry, TEXT, schemaName, true, null, null);
try {
return tbl.luceneIndex().query(qry.toUpperCase(), filters, limit);
@@ -530,6 +530,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
/**
* Queries individual fields (generally used by JDBC drivers).
*
+ * @param qryId Query id.
* @param qryDesc Query descriptor.
* @param qryParams Query parameters.
* @param select Select.
@@ -542,6 +543,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
* @throws IgniteCheckedException If failed.
*/
private GridQueryFieldsResult executeSelectLocal(
+ long qryId,
QueryDescriptor qryDesc,
QueryParameters qryParams,
QueryParserResultSelect select,
@@ -608,7 +610,8 @@ public class IgniteH2Indexing implements GridQueryIndexing {
H2Utils.bindParameters(stmt, F.asList(params));
- H2QueryInfo qryInfo = new H2QueryInfo(H2QueryInfo.QueryType.LOCAL, stmt, qry);
+ H2QueryInfo qryInfo = new H2QueryInfo(H2QueryInfo.QueryType.LOCAL, stmt, qry,
+ ctx.localNodeId(), qryId);
ResultSet rs = executeSqlQueryWithTimer(
stmt,
@@ -731,7 +734,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
@SuppressWarnings({"unchecked"})
private long streamQuery0(String qry, String schemaName, IgniteDataStreamer streamer, QueryParserResultDml dml,
final Object[] args, String qryInitiatorId) throws IgniteCheckedException {
- Long qryId = runningQryMgr.register(
+ long qryId = runningQryMgr.register(
QueryUtils.INCLUDE_SENSITIVE ? qry : sqlWithoutConst(dml.statement()),
GridCacheQueryType.SQL_FIELDS,
schemaName,
@@ -745,7 +748,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
try {
UpdatePlan plan = dml.plan();
- Iterator<List<?>> iter = new GridQueryCacheObjectsIterator(updateQueryRows(schemaName, plan, args),
+ Iterator<List<?>> iter = new GridQueryCacheObjectsIterator(updateQueryRows(qryId, schemaName, plan, args),
objectContext(), true);
if (!iter.hasNext())
@@ -789,13 +792,15 @@ public class IgniteH2Indexing implements GridQueryIndexing {
/**
* Calculates rows for update query.
*
+ * @param qryId Query id.
* @param schemaName Schema name.
* @param plan Update plan.
* @param args Statement arguments.
* @return Rows for update.
* @throws IgniteCheckedException If failed.
*/
- private Iterator<List<?>> updateQueryRows(String schemaName, UpdatePlan plan, Object[] args) throws IgniteCheckedException {
+ private Iterator<List<?>> updateQueryRows(long qryId, String schemaName, UpdatePlan plan, Object[] args)
+ throws IgniteCheckedException {
Object[] params = args != null ? args : X.EMPTY_OBJECT_ARRAY;
if (!F.isEmpty(plan.selectQuery())) {
@@ -806,6 +811,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
QueryParserResult selectParseRes = parser.parse(schemaName, selectQry, false);
GridQueryFieldsResult res = executeSelectLocal(
+ qryId,
selectParseRes.queryDescriptor(),
selectParseRes.queryParameters(),
selectParseRes.select(),
@@ -1057,7 +1063,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
IgniteQueryErrorCode.UNSUPPORTED_OPERATION);
}
- Long qryId = registerRunningQuery(qryDesc, qryParams, null, null);
+ long qryId = registerRunningQuery(qryDesc, qryParams, null, null);
CommandResult res = null;
@@ -1238,7 +1244,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
) {
IndexingQueryFilter filter = (qryDesc.local() ? backupFilter(null, qryParams.partitions()) : null);
- Long qryId = registerRunningQuery(qryDesc, qryParams, cancel, dml.statement());
+ long qryId = registerRunningQuery(qryDesc, qryParams, cancel, dml.statement());
Exception failReason = null;
@@ -1252,6 +1258,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
if (!qryDesc.local()) {
return executeUpdateDistributed(
+ qryId,
qryDesc,
qryParams,
dml,
@@ -1260,6 +1267,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
}
else {
UpdateResult updRes = executeUpdate(
+ qryId,
qryDesc,
qryParams,
dml,
@@ -1322,7 +1330,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
assert cancel != null;
// Register query.
- Long qryId = registerRunningQuery(qryDesc, qryParams, cancel, select.statement());
+ long qryId = registerRunningQuery(qryDesc, qryParams, cancel, select.statement());
try (TraceSurroundings ignored = MTC.support(ctx.tracing().create(SQL_CURSOR_OPEN, MTC.span()))) {
GridNearTxLocal tx = null;
@@ -1356,6 +1364,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
int timeout = operationTimeout(qryParams.timeout(), tx);
Iterable<List<?>> iter = executeSelect0(
+ qryId,
qryDesc,
qryParams,
select,
@@ -1396,6 +1405,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
/**
* Execute SELECT statement for DML.
*
+ * @param qryId Query id.
* @param schema Schema.
* @param selectQry Select query.
* @param mvccTracker MVCC tracker.
@@ -1405,6 +1415,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
* @throws IgniteCheckedException On error.
*/
private QueryCursorImpl<List<?>> executeSelectForDml(
+ long qryId,
String schema,
SqlFieldsQuery selectQry,
MvccQueryTracker mvccTracker,
@@ -1418,6 +1429,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
assert select != null;
Iterable<List<?>> iter = executeSelect0(
+ qryId,
parseRes.queryDescriptor(),
parseRes.queryParameters(),
select,
@@ -1440,6 +1452,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
/**
* Execute an all-ready {@link SqlFieldsQuery}.
*
+ * @param qryId Query id.
* @param qryDesc Plan key.
* @param qryParams Parameters.
* @param select Select.
@@ -1452,6 +1465,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
* @throws IgniteCheckedException On error.
*/
private Iterable<List<?>> executeSelect0(
+ long qryId,
QueryDescriptor qryDesc,
QueryParameters qryParams,
QueryParserResultSelect select,
@@ -1477,6 +1491,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
assert twoStepQry != null;
iter = executeSelectDistributed(
+ qryId,
qryDesc,
qryParams,
twoStepQry,
@@ -1491,6 +1506,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
IndexingQueryFilter filter = (qryDesc.local() ? backupFilter(null, qryParams.partitions()) : null);
GridQueryFieldsResult res = executeSelectLocal(
+ qryId,
qryDesc,
qryParams,
select,
@@ -1586,7 +1602,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
* @param stmnt Parsed statement.
* @return Id of registered query or {@code null} if query wasn't registered.
*/
- private Long registerRunningQuery(
+ private long registerRunningQuery(
QueryDescriptor qryDesc,
QueryParameters qryParams,
GridQueryCancel cancel,
@@ -1594,7 +1610,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
) {
String qry = QueryUtils.INCLUDE_SENSITIVE || stmnt == null ? qryDesc.sql() : sqlWithoutConst(stmnt);
- Long res = runningQryMgr.register(
+ long res = runningQryMgr.register(
qry,
GridCacheQueryType.SQL_FIELDS,
qryDesc.schemaName(),
@@ -1726,6 +1742,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
// sub-query and not some dummy stuff like "select 1, 2, 3;"
if (!loc && !plan.isLocalSubquery()) {
cur = executeSelectForDml(
+ RunningQueryManager.UNDEFINED_QUERY_ID,
schema,
selectFieldsQry,
new StaticMvccQueryTracker(planCctx, mvccSnapshot),
@@ -1739,6 +1756,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
QueryParserResult selectParseRes = parser.parse(schema, selectFieldsQry, false);
GridQueryFieldsResult res = executeSelectLocal(
+ RunningQueryManager.UNDEFINED_QUERY_ID,
selectParseRes.queryDescriptor(),
selectParseRes.queryParameters(),
selectParseRes.select(),
@@ -1767,6 +1785,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
/**
* Run distributed query on detected set of partitions.
*
+ * @param qryId Query id.
* @param qryDesc Query descriptor.
* @param qryParams Query parameters.
* @param twoStepQry Two-step query.
@@ -1778,6 +1797,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
*/
@SuppressWarnings("IfMayBeConditional")
private Iterable<List<?>> executeSelectDistributed(
+ final long qryId,
final QueryDescriptor qryDesc,
final QueryParameters qryParams,
final GridCacheTwoStepQuery twoStepQry,
@@ -1789,7 +1809,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
// When explicit partitions are set, there must be an owning cache they should be applied to.
PartitionResult derivedParts = twoStepQry.derivedPartitions();
- final int parts[] = PartitionResult.calculatePartitions(
+ final int[] parts = PartitionResult.calculatePartitions(
qryParams.partitions(),
derivedParts,
qryParams.arguments()
@@ -1821,6 +1841,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
@Override public Iterator<List<?>> iterator() {
try (TraceSurroundings ignored = MTC.support(ctx.tracing().create(SQL_ITER_OPEN, MTC.span()))) {
return IgniteH2Indexing.this.rdcQryExec.query(
+ qryId,
qryDesc.schemaName(),
twoStepQry,
keepBinary,
@@ -1875,6 +1896,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
assert dml != null;
return executeUpdate(
+ RunningQueryManager.UNDEFINED_QUERY_ID,
parseRes.queryDescriptor(),
parseRes.queryParameters(),
dml,
@@ -2099,7 +2121,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
}
/** {@inheritDoc} */
- @SuppressWarnings({"deprecation"})
+ @SuppressWarnings({"deprecation", "AssignmentToStaticFieldFromInstanceMethod"})
@Override public void start(GridKernalContext ctx, GridSpinBusyLock busyLock) throws IgniteCheckedException {
if (log.isDebugEnabled())
log.debug("Starting cache query index...");
@@ -2549,6 +2571,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
}
/**
+ * @param qryId Query id.
* @param qryDesc Query descriptor.
* @param qryParams Query parameters.
* @param dml DML statement.
@@ -2558,6 +2581,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
*/
@SuppressWarnings("unchecked")
private List<QueryCursorImpl<List<?>>> executeUpdateDistributed(
+ long qryId,
QueryDescriptor qryDesc,
QueryParameters qryParams,
QueryParserResultDml dml,
@@ -2601,6 +2625,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
try {
res = executeUpdate(
+ qryId,
qryDesc,
qryParams.toSingleBatchedArguments(args),
dml,
@@ -2647,6 +2672,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
}
else {
UpdateResult res = executeUpdate(
+ qryId,
qryDesc,
qryParams,
dml,
@@ -2671,6 +2697,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
/**
* Execute DML statement, possibly with few re-attempts in case of concurrent data modifications.
*
+ * @param qryId Query id.
* @param qryDesc Query descriptor.
* @param qryParams Query parameters.
* @param dml DML command.
@@ -2682,6 +2709,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
*/
@SuppressWarnings("IfMayBeConditional")
private UpdateResult executeUpdate(
+ long qryId,
QueryDescriptor qryDesc,
QueryParameters qryParams,
QueryParserResultDml dml,
@@ -2709,6 +2737,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
try {
if (transactional)
r = executeUpdateTransactional(
+ qryId,
qryDesc,
qryParams,
dml,
@@ -2717,6 +2746,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
);
else
r = executeUpdateNonTransactional(
+ qryId,
qryDesc,
qryParams,
dml,
@@ -2751,6 +2781,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
/**
* Execute update in non-transactional mode.
*
+ * @param qryId Query id.
* @param qryDesc Query descriptor.
* @param qryParams Query parameters.
* @param dml Plan.
@@ -2761,6 +2792,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
* @throws IgniteCheckedException If failed.
*/
private UpdateResult executeUpdateNonTransactional(
+ long qryId,
QueryDescriptor qryDesc,
QueryParameters qryParams,
QueryParserResultDml dml,
@@ -2825,6 +2857,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
assert !F.isEmpty(plan.selectQuery());
cur = executeSelectForDml(
+ qryId,
qryDesc.schemaName(),
selectFieldsQry,
null,
@@ -2840,6 +2873,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
QueryParserResult selectParseRes = parser.parse(qryDesc.schemaName(), selectFieldsQry, false);
final GridQueryFieldsResult res = executeSelectLocal(
+ qryId,
selectParseRes.queryDescriptor(),
selectParseRes.queryParameters(),
selectParseRes.select(),
@@ -2877,6 +2911,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
/**
* Execute update in transactional mode.
*
+ * @param qryId Query id.
* @param qryDesc Query descriptor.
* @param qryParams Query parameters.
* @param dml Plan.
@@ -2886,6 +2921,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
* @throws IgniteCheckedException If failed.
*/
private UpdateResult executeUpdateTransactional(
+ long qryId,
QueryDescriptor qryDesc,
QueryParameters qryParams,
QueryParserResultDml dml,
@@ -2953,6 +2989,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
.setLazy(qryParams.lazy());
FieldsQueryCursor<List<?>> cur = executeSelectForDml(
+ qryId,
qryDesc.schemaName(),
selectFieldsQry,
MvccUtils.mvccTracker(cctx, tx),
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/MapH2QueryInfo.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/MapH2QueryInfo.java
index 5b8db67..5b1cccd 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/MapH2QueryInfo.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/MapH2QueryInfo.java
@@ -18,15 +18,12 @@
package org.apache.ignite.internal.processors.query.h2;
import java.sql.PreparedStatement;
-import org.apache.ignite.cluster.ClusterNode;
+import java.util.UUID;
/**
* Map query info.
*/
public class MapH2QueryInfo extends H2QueryInfo {
- /** Node. */
- private final ClusterNode node;
-
/** Request id. */
private final long reqId;
@@ -36,23 +33,22 @@ public class MapH2QueryInfo extends H2QueryInfo {
/**
* @param stmt Query statement.
* @param sql Query statement.
- * @param node Originator node ID
+ * @param nodeId Originator node id.
+ * @param qryId Query id.
* @param reqId Request ID.
* @param segment Segment.
*/
- public MapH2QueryInfo(PreparedStatement stmt, String sql,
- ClusterNode node, long reqId, int segment) {
- super(QueryType.MAP, stmt, sql);
+ public MapH2QueryInfo(PreparedStatement stmt, String sql, UUID nodeId, long qryId, long reqId,
+ int segment) {
+ super(QueryType.MAP, stmt, sql, nodeId, qryId);
- this.node = node;
this.reqId = reqId;
this.segment = segment;
}
/** {@inheritDoc} */
@Override protected void printInfo(StringBuilder msg) {
- msg.append(", node=").append(node)
- .append(", reqId=").append(reqId)
+ msg.append(", reqId=").append(reqId)
.append(", segment=").append(segment);
}
}
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/ReduceH2QueryInfo.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/ReduceH2QueryInfo.java
index 9474c6a..ea88594 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/ReduceH2QueryInfo.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/ReduceH2QueryInfo.java
@@ -18,6 +18,7 @@
package org.apache.ignite.internal.processors.query.h2;
import java.sql.PreparedStatement;
+import java.util.UUID;
/**
* Reduce query info.
@@ -29,10 +30,12 @@ public class ReduceH2QueryInfo extends H2QueryInfo {
/**
* @param stmt Query statement.
* @param sql Query statement.
+ * @param nodeId Originator node id.
+ * @param qryId Query id.
* @param reqId Request ID.
*/
- public ReduceH2QueryInfo(PreparedStatement stmt, String sql, long reqId) {
- super(QueryType.REDUCE, stmt, sql);
+ public ReduceH2QueryInfo(PreparedStatement stmt, String sql, UUID nodeId, long qryId, long reqId) {
+ super(QueryType.REDUCE, stmt, sql, nodeId, qryId);
this.reqId = reqId;
}
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
index ccf967b..d8727c2 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
@@ -246,6 +246,7 @@ public class GridMapQueryExecutor {
(GridPlainCallable<Void>)() -> {
try (TraceSurroundings ignored = MTC.supportContinual(span)) {
onQueryRequest0(node,
+ req.queryId(),
req.requestId(),
segment,
req.schemaName(),
@@ -273,6 +274,7 @@ public class GridMapQueryExecutor {
}
onQueryRequest0(node,
+ req.queryId(),
req.requestId(),
singleSegment,
req.schemaName(),
@@ -300,6 +302,7 @@ public class GridMapQueryExecutor {
/**
* @param node Node authored request.
+ * @param qryId Query ID.
* @param reqId Request ID.
* @param segmentId index segment ID.
* @param schemaName Schema name.
@@ -320,6 +323,7 @@ public class GridMapQueryExecutor {
*/
private void onQueryRequest0(
final ClusterNode node,
+ final long qryId,
final long reqId,
final int segmentId,
final String schemaName,
@@ -454,7 +458,7 @@ public class GridMapQueryExecutor {
H2Utils.bindParameters(stmt, params0);
- MapH2QueryInfo qryInfo = new MapH2QueryInfo(stmt, qry.query(), node, reqId, segmentId);
+ MapH2QueryInfo qryInfo = new MapH2QueryInfo(stmt, qry.query(), node.id(), qryId, reqId, segmentId);
ResultSet rs = h2.executeSqlQueryWithTimer(
stmt,
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
index 2564196..4609600 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
@@ -320,6 +320,7 @@ public class GridReduceQueryExecutor {
}
/**
+ * @param qryId Query ID.
* @param schemaName Schema name.
* @param qry Query.
* @param keepBinary Keep binary.
@@ -334,8 +335,9 @@ public class GridReduceQueryExecutor {
* @param pageSize Page size.
* @return Rows iterator.
*/
- @SuppressWarnings({"BusyWait", "IfMayBeConditional"})
+ @SuppressWarnings("IfMayBeConditional")
public Iterator<List<?>> query(
+ long qryId,
String schemaName,
final GridCacheTwoStepQuery qry,
boolean keepBinary,
@@ -434,6 +436,7 @@ public class GridReduceQueryExecutor {
cancel.add(() -> send(nodes, new GridQueryCancelRequest(qryReqId), null, true));
GridH2QueryRequest req = new GridH2QueryRequest()
+ .queryId(qryId)
.requestId(qryReqId)
.topologyVersion(topVer)
.pageSize(pageSize)
@@ -524,7 +527,8 @@ public class GridReduceQueryExecutor {
H2Utils.bindParameters(stmt, F.asList(rdc.parameters(params)));
- ReduceH2QueryInfo qryInfo = new ReduceH2QueryInfo(stmt, qry.originalSql(), qryReqId);
+ ReduceH2QueryInfo qryInfo = new ReduceH2QueryInfo(stmt, qry.originalSql(),
+ ctx.localNodeId(), qryId, qryReqId);
ResultSet res = h2.executeSqlQueryWithTimer(stmt,
conn,
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2QueryRequest.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2QueryRequest.java
index db1c901..bed036d 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2QueryRequest.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2QueryRequest.java
@@ -37,6 +37,7 @@ import org.apache.ignite.internal.processors.cache.mvcc.MvccSnapshot;
import org.apache.ignite.internal.processors.cache.query.GridCacheQueryMarshallable;
import org.apache.ignite.internal.processors.cache.query.GridCacheSqlQuery;
import org.apache.ignite.internal.processors.cache.query.QueryTable;
+import org.apache.ignite.internal.processors.query.RunningQueryManager;
import org.apache.ignite.internal.util.tostring.GridToStringInclude;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
@@ -160,6 +161,9 @@ public class GridH2QueryRequest implements Message, GridCacheQueryMarshallable {
/** TX details holder for {@code SELECT FOR UPDATE}, or {@code null} if not applicable. */
private GridH2SelectForUpdateTxDetails txReq;
+ /** Id of the query assigned by {@link RunningQueryManager} on originator node. */
+ private long qryId;
+
/** */
private boolean explicitTimeout;
@@ -189,6 +193,7 @@ public class GridH2QueryRequest implements Message, GridCacheQueryMarshallable {
schemaName = req.schemaName;
mvccSnapshot = req.mvccSnapshot;
txReq = req.txReq;
+ qryId = req.qryId;
explicitTimeout = req.explicitTimeout;
}
@@ -505,6 +510,27 @@ public class GridH2QueryRequest implements Message, GridCacheQueryMarshallable {
}
/**
+ * Id of the query assigned by {@link RunningQueryManager} on originator node.
+ *
+ * @return Query id.
+ */
+ public long queryId() {
+ return qryId;
+ }
+
+ /**
+ * Sets id of the query assigned by {@link RunningQueryManager}.
+ *
+ * @param queryId Query id.
+ * @return {@code this} for chaining.
+ */
+ public GridH2QueryRequest queryId(long queryId) {
+ this.qryId = queryId;
+
+ return this;
+ }
+
+ /**
* Checks if data page scan enabled.
*
* @return {@code true} If data page scan enabled, {@code false} if not, and {@code null} if not set.
@@ -667,6 +693,11 @@ public class GridH2QueryRequest implements Message, GridCacheQueryMarshallable {
writer.incrementState();
+ case 15:
+ if (!writer.writeLong("qryId", qryId))
+ return false;
+
+ writer.incrementState();
}
return true;
@@ -800,6 +831,13 @@ public class GridH2QueryRequest implements Message, GridCacheQueryMarshallable {
reader.incrementState();
+ case 15:
+ qryId = reader.readLong("qryId");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
}
return reader.afterMessageRead(GridH2QueryRequest.class);
@@ -812,7 +850,7 @@ public class GridH2QueryRequest implements Message, GridCacheQueryMarshallable {
/** {@inheritDoc} */
@Override public byte fieldsCount() {
- return 15;
+ return 16;
}
/** {@inheritDoc} */