You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by am...@apache.org on 2022/01/10 13:05:41 UTC

[ignite] branch master updated: IGNITE-16223 Log running query id. (#9704)

This is an automated email from the ASF dual-hosted git repository.

amashenkov pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/master by this push:
     new 77b1357c IGNITE-16223 Log running query id. (#9704)
77b1357c is described below

commit 77b1357c913984a43facc654200dc6053ed8b736
Author: Andrew V. Mashenkov <AM...@users.noreply.github.com>
AuthorDate: Mon Jan 10 16:05:20 2022 +0300

    IGNITE-16223 Log running query id. (#9704)
---
 .../qa/query/WarningOnBigQueryResultsBaseTest.java | 15 +++---
 .../processors/bulkload/BulkLoadProcessor.java     |  4 +-
 .../processors/query/RunningQueryManager.java      | 20 ++++---
 .../cache/query/RegisteredQueryCursor.java         |  6 +--
 .../processors/query/h2/CommandProcessor.java      |  4 +-
 .../query/h2/H2QueryFetchSizeInterceptor.java      |  4 +-
 .../internal/processors/query/h2/H2QueryInfo.java  | 45 ++++++++++------
 .../processors/query/h2/IgniteH2Indexing.java      | 61 +++++++++++++++++-----
 .../processors/query/h2/MapH2QueryInfo.java        | 18 +++----
 .../processors/query/h2/ReduceH2QueryInfo.java     |  7 ++-
 .../query/h2/twostep/GridMapQueryExecutor.java     |  6 ++-
 .../query/h2/twostep/GridReduceQueryExecutor.java  |  8 ++-
 .../query/h2/twostep/msg/GridH2QueryRequest.java   | 40 +++++++++++++-
 13 files changed, 171 insertions(+), 67 deletions(-)

diff --git a/modules/clients/src/test/java/org/apache/ignite/qa/query/WarningOnBigQueryResultsBaseTest.java b/modules/clients/src/test/java/org/apache/ignite/qa/query/WarningOnBigQueryResultsBaseTest.java
index 70f06c4..e463cb6 100644
--- a/modules/clients/src/test/java/org/apache/ignite/qa/query/WarningOnBigQueryResultsBaseTest.java
+++ b/modules/clients/src/test/java/org/apache/ignite/qa/query/WarningOnBigQueryResultsBaseTest.java
@@ -33,6 +33,7 @@ import org.apache.ignite.configuration.ClientConnectorConfiguration;
 import org.apache.ignite.configuration.IgniteConfiguration;
 import org.apache.ignite.internal.IgniteEx;
 import org.apache.ignite.internal.processors.cache.index.AbstractIndexingCommonTest;
+import org.apache.ignite.internal.util.GridLogThrottle;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.lang.IgnitePredicate;
 import org.apache.ignite.testframework.GridTestUtils;
@@ -63,8 +64,8 @@ public class WarningOnBigQueryResultsBaseTest extends AbstractIndexingCommonTest
 
     /** Log message pattern. */
     private static final Pattern logPtrn = Pattern.compile(
-        "fetched=([0-9]+), duration=([0-9]+)ms, type=(MAP|LOCAL|REDUCE), distributedJoin=(true|false), enforceJoinOrder=(true|false), " +
-            "lazy=(true|false), schema=(\\S+), sql");
+        "fetched=([0-9]+), duration=([0-9]+)ms, type=(MAP|LOCAL|REDUCE), distributedJoin=(true|false), " +
+            "enforceJoinOrder=(true|false), lazy=(true|false), schema=(\\S+), sql");
 
     /** Test log. */
     private static Map<String, BigResultsLogListener> logListeners = new HashMap<>();
@@ -124,6 +125,9 @@ public class WarningOnBigQueryResultsBaseTest extends AbstractIndexingCommonTest
     @Override protected void beforeTestsStarted() throws Exception {
         super.beforeTestsStarted();
 
+        // Negative timeout to disable a throttling of the huge results warning messages.
+        GridLogThrottle.throttleTimeout(-1);
+
         // Starts the first node.
         startGrid(0);
 
@@ -162,6 +166,8 @@ public class WarningOnBigQueryResultsBaseTest extends AbstractIndexingCommonTest
     @Override protected void afterTestsStopped() throws Exception {
         stopAllGrids();
 
+        GridLogThrottle.throttleTimeout(GridLogThrottle.DFLT_THROTTLE_TIMEOUT);
+
         super.afterTestsStopped();
     }
 
@@ -269,10 +275,7 @@ public class WarningOnBigQueryResultsBaseTest extends AbstractIndexingCommonTest
                 schema = m.group(7);
 
                 sql = s.substring(s.indexOf(", sql='") + 7, s.indexOf("', plan="));
-                if ("REDUCE".equals(type))
-                    plan = s.substring(s.indexOf("', plan=") + 8, s.indexOf(", reqId="));
-                else
-                    plan = s.substring(s.indexOf("', plan=") + 8, s.indexOf(", node="));
+                plan = s.substring(s.indexOf("', plan=") + 8, s.indexOf(", reqId="));
 
                 assertTrue(sql.contains("SELECT"));
                 assertTrue(plan.contains("SELECT"));
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/bulkload/BulkLoadProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/bulkload/BulkLoadProcessor.java
index f391772..af863ea 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/bulkload/BulkLoadProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/bulkload/BulkLoadProcessor.java
@@ -57,7 +57,7 @@ public class BulkLoadProcessor implements AutoCloseable {
     private final RunningQueryManager runningQryMgr;
 
     /** Query id. */
-    private final Long qryId;
+    private final long qryId;
 
     /** Exception, current load process ended with, or {@code null} if in progress or if succeded. */
     private Exception failReason;
@@ -80,7 +80,7 @@ public class BulkLoadProcessor implements AutoCloseable {
      * @param tracing Tracing processor.
      */
     public BulkLoadProcessor(BulkLoadParser inputParser, IgniteClosureX<List<?>, IgniteBiTuple<?, ?>> dataConverter,
-        BulkLoadCacheWriter outputStreamer, RunningQueryManager runningQryMgr, Long qryId, Tracing tracing) {
+        BulkLoadCacheWriter outputStreamer, RunningQueryManager runningQryMgr, long qryId, Tracing tracing) {
         this.inputParser = inputParser;
         this.dataConverter = dataConverter;
         this.outputStreamer = outputStreamer;
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/RunningQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/RunningQueryManager.java
index d6ae245..8308e1d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/RunningQueryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/RunningQueryManager.java
@@ -67,6 +67,9 @@ public class RunningQueryManager {
     /** */
     public static final String SQL_QRY_HIST_VIEW_DESC = "SQL queries history.";
 
+    /** Undefined query ID value. */
+    public static final long UNDEFINED_QUERY_ID = 0L;
+
     /** Keep registered user queries. */
     private final ConcurrentMap<Long, GridRunningQueryInfo> runs = new ConcurrentHashMap<>();
 
@@ -137,16 +140,16 @@ public class RunningQueryManager {
     }
 
     /**
-     * Register running query.
+     * Registers running query and returns an id associated with the query.
      *
      * @param qry Query text.
      * @param qryType Query type.
      * @param schemaName Schema name.
      * @param loc Local query flag.
      * @param cancel Query cancel. Should be passed in case query is cancelable, or {@code null} otherwise.
-     * @return Id of registered query.
+     * @return Id of registered query. Id is a positive number.
      */
-    public Long register(String qry, GridCacheQueryType qryType, String schemaName, boolean loc,
+    public long register(String qry, GridCacheQueryType qryType, String schemaName, boolean loc,
         @Nullable GridQueryCancel cancel,
         String qryInitiatorId) {
         long qryId = qryIdGen.incrementAndGet();
@@ -185,8 +188,8 @@ public class RunningQueryManager {
      * @param qryId id of the query, which is given by {@link #register register} method.
      * @param failReason exception that caused query execution fail, or {@code null} if query succeded.
      */
-    public void unregister(Long qryId, @Nullable Throwable failReason) {
-        if (qryId == null)
+    public void unregister(long qryId, @Nullable Throwable failReason) {
+        if (qryId <= 0)
             return;
 
         boolean failed = failReason != null;
@@ -297,7 +300,7 @@ public class RunningQueryManager {
      *
      * @param qryId Query id.
      */
-    public void cancel(Long qryId) {
+    public void cancel(long qryId) {
         GridRunningQueryInfo run = runs.get(qryId);
 
         if (run != null)
@@ -336,10 +339,11 @@ public class RunningQueryManager {
 
     /**
      * Gets info about running query by their id.
-     * @param qryId
+     *
+     * @param qryId Query Id.
      * @return Running query info or {@code null} in case no running query for given id.
      */
-    public @Nullable GridRunningQueryInfo runningQueryInfo(Long qryId) {
+    public @Nullable GridRunningQueryInfo runningQueryInfo(long qryId) {
         return runs.get(qryId);
     }
 
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/cache/query/RegisteredQueryCursor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/cache/query/RegisteredQueryCursor.java
index 3945fec..4f00b5c 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/cache/query/RegisteredQueryCursor.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/cache/query/RegisteredQueryCursor.java
@@ -50,7 +50,7 @@ public class RegisteredQueryCursor<T> extends QueryCursorImpl<T> {
     private RunningQueryManager runningQryMgr;
 
     /** */
-    private Long qryId;
+    private long qryId;
 
     /** Exception caused query failed or {@code null} if it succeded. */
     private Exception failReason;
@@ -70,11 +70,11 @@ public class RegisteredQueryCursor<T> extends QueryCursorImpl<T> {
      * @param tracing Tracing processor.
      */
     public RegisteredQueryCursor(Iterable<T> iterExec, GridQueryCancel cancel, RunningQueryManager runningQryMgr,
-        boolean lazy, Long qryId, Tracing tracing) {
+        boolean lazy, long qryId, Tracing tracing) {
         super(iterExec, cancel, true, lazy);
 
         assert runningQryMgr != null;
-        assert qryId != null;
+        assert qryId != RunningQueryManager.UNDEFINED_QUERY_ID;
 
         this.runningQryMgr = runningQryMgr;
         this.qryId = qryId;
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/CommandProcessor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/CommandProcessor.java
index 58ceeef..e5bf0b8 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/CommandProcessor.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/CommandProcessor.java
@@ -407,7 +407,7 @@ public class CommandProcessor {
      * @return Result.
      */
     public CommandResult runCommand(String sql, SqlCommand cmdNative, GridSqlStatement cmdH2,
-        QueryParameters params, @Nullable SqlClientContext cliCtx, Long qryId) throws IgniteCheckedException {
+        QueryParameters params, @Nullable SqlClientContext cliCtx, long qryId) throws IgniteCheckedException {
         assert cmdNative != null || cmdH2 != null;
 
         // Do execute.
@@ -1410,7 +1410,7 @@ public class CommandProcessor {
      * @return The context (which is the result of the first request/response).
      * @throws IgniteCheckedException If something failed.
      */
-    private FieldsQueryCursor<List<?>> processBulkLoadCommand(SqlBulkLoadCommand cmd, Long qryId)
+    private FieldsQueryCursor<List<?>> processBulkLoadCommand(SqlBulkLoadCommand cmd, long qryId)
         throws IgniteCheckedException {
         if (cmd.packetSize() == null)
             cmd.packetSize(BulkLoadAckClientParameters.DFLT_PACKET_SIZE);
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/H2QueryFetchSizeInterceptor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/H2QueryFetchSizeInterceptor.java
index cf47a7c..ebafc0c 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/H2QueryFetchSizeInterceptor.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/H2QueryFetchSizeInterceptor.java
@@ -68,7 +68,7 @@ public class H2QueryFetchSizeInterceptor {
         ++fetchedSize;
 
         if (threshold > 0 && fetchedSize >= threshold) {
-            qryInfo.printLogMessage(log, "Query produced big result set. ",
+            qryInfo.printLogMessage(log, "Query produced big result set.",
                 "fetched=" + fetchedSize);
 
             if (thresholdMult > 1)
@@ -85,7 +85,7 @@ public class H2QueryFetchSizeInterceptor {
      */
     public void checkOnClose() {
         if (bigResults) {
-            qryInfo.printLogMessage(log, "Query produced big result set. ",
+            qryInfo.printLogMessage(log, "Query produced big result set.",
                 "fetched=" + fetchedSize);
         }
     }
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/H2QueryInfo.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/H2QueryInfo.java
index 2a62fc9..09afcbb 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/H2QueryInfo.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/H2QueryInfo.java
@@ -19,9 +19,12 @@ package org.apache.ignite.internal.processors.query.h2;
 
 import java.sql.PreparedStatement;
 import java.sql.SQLException;
+import java.util.UUID;
 import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.internal.processors.cache.query.IgniteQueryErrorCode;
 import org.apache.ignite.internal.processors.query.IgniteSQLException;
+import org.apache.ignite.internal.processors.query.QueryUtils;
+import org.apache.ignite.internal.processors.query.RunningQueryManager;
 import org.apache.ignite.internal.processors.query.h2.sql.GridSqlQueryParser;
 import org.apache.ignite.internal.util.typedef.internal.LT;
 import org.apache.ignite.internal.util.typedef.internal.U;
@@ -56,17 +59,27 @@ public class H2QueryInfo {
     /** Prepared statement. */
     private final Prepared stmt;
 
+    /** Originator node uid. */
+    private final UUID nodeId;
+
+    /** Query id. */
+    private final long queryId;
+
     /**
      * @param type Query type.
      * @param stmt Query statement.
      * @param sql Query statement.
+     * @param nodeId Originator node id.
+     * @param queryId Query id.
      */
-    public H2QueryInfo(QueryType type, PreparedStatement stmt, String sql) {
+    public H2QueryInfo(QueryType type, PreparedStatement stmt, String sql, UUID nodeId, long queryId) {
         try {
             assert stmt != null;
 
             this.type = type;
             this.sql = sql;
+            this.nodeId = nodeId;
+            this.queryId = queryId;
 
             beginTs = U.currentTimeMillis();
 
@@ -106,22 +119,24 @@ public class H2QueryInfo {
      * @param additionalInfo Additional query info.
      */
     public void printLogMessage(IgniteLogger log, String msg, String additionalInfo) {
-        StringBuilder msgSb = new StringBuilder(msg + " [");
-
-        if (additionalInfo != null)
-            msgSb.append(additionalInfo).append(", ");
-
-        msgSb.append("duration=").append(time()).append("ms")
-            .append(", type=").append(type)
-            .append(", distributedJoin=").append(distributedJoin)
-            .append(", enforceJoinOrder=").append(enforceJoinOrder)
-            .append(", lazy=").append(lazy)
-            .append(", schema=").append(schema);
+        StringBuilder msgSb = new StringBuilder(msg);
 
-        msgSb.append(", sql='")
-            .append(sql);
+        if (queryId == RunningQueryManager.UNDEFINED_QUERY_ID)
+            msgSb.append(" [globalQueryId=(undefined), node=").append(nodeId);
+        else
+            msgSb.append(" [globalQueryId=").append(QueryUtils.globalQueryId(nodeId, queryId));
 
-        msgSb.append("', plan=").append(stmt.getPlanSQL());
+        if (additionalInfo != null)
+            msgSb.append(", ").append(additionalInfo);
+
+        msgSb.append(", duration=").append(time()).append("ms")
+                .append(", type=").append(type)
+                .append(", distributedJoin=").append(distributedJoin)
+                .append(", enforceJoinOrder=").append(enforceJoinOrder)
+                .append(", lazy=").append(lazy)
+                .append(", schema=").append(schema)
+                .append(", sql='").append(sql)
+                .append("', plan=").append(stmt.getPlanSQL());
 
         printInfo(msgSb);
 
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
index 6b5cb0a..9b13d26 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
@@ -514,7 +514,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
         H2TableDescriptor tbl = schemaMgr.tableForType(schemaName, cacheName, typeName);
 
         if (tbl != null && tbl.luceneIndex() != null) {
-            Long qryId = runningQueryManager().register(qry, TEXT, schemaName, true, null, null);
+            long qryId = runningQueryManager().register(qry, TEXT, schemaName, true, null, null);
 
             try {
                 return tbl.luceneIndex().query(qry.toUpperCase(), filters, limit);
@@ -530,6 +530,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
     /**
      * Queries individual fields (generally used by JDBC drivers).
      *
+     * @param qryId Query id.
      * @param qryDesc Query descriptor.
      * @param qryParams Query parameters.
      * @param select Select.
@@ -542,6 +543,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
      * @throws IgniteCheckedException If failed.
      */
     private GridQueryFieldsResult executeSelectLocal(
+        long qryId,
         QueryDescriptor qryDesc,
         QueryParameters qryParams,
         QueryParserResultSelect select,
@@ -608,7 +610,8 @@ public class IgniteH2Indexing implements GridQueryIndexing {
 
                         H2Utils.bindParameters(stmt, F.asList(params));
 
-                        H2QueryInfo qryInfo = new H2QueryInfo(H2QueryInfo.QueryType.LOCAL, stmt, qry);
+                        H2QueryInfo qryInfo = new H2QueryInfo(H2QueryInfo.QueryType.LOCAL, stmt, qry,
+                            ctx.localNodeId(), qryId);
 
                         ResultSet rs = executeSqlQueryWithTimer(
                             stmt,
@@ -731,7 +734,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
     @SuppressWarnings({"unchecked"})
     private long streamQuery0(String qry, String schemaName, IgniteDataStreamer streamer, QueryParserResultDml dml,
         final Object[] args, String qryInitiatorId) throws IgniteCheckedException {
-        Long qryId = runningQryMgr.register(
+        long qryId = runningQryMgr.register(
             QueryUtils.INCLUDE_SENSITIVE ? qry : sqlWithoutConst(dml.statement()),
             GridCacheQueryType.SQL_FIELDS,
             schemaName,
@@ -745,7 +748,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
         try {
             UpdatePlan plan = dml.plan();
 
-            Iterator<List<?>> iter = new GridQueryCacheObjectsIterator(updateQueryRows(schemaName, plan, args),
+            Iterator<List<?>> iter = new GridQueryCacheObjectsIterator(updateQueryRows(qryId, schemaName, plan, args),
                 objectContext(), true);
 
             if (!iter.hasNext())
@@ -789,13 +792,15 @@ public class IgniteH2Indexing implements GridQueryIndexing {
     /**
      * Calculates rows for update query.
      *
+     * @param qryId Query id.
      * @param schemaName Schema name.
      * @param plan Update plan.
      * @param args Statement arguments.
      * @return Rows for update.
      * @throws IgniteCheckedException If failed.
      */
-    private Iterator<List<?>> updateQueryRows(String schemaName, UpdatePlan plan, Object[] args) throws IgniteCheckedException {
+    private Iterator<List<?>> updateQueryRows(long qryId, String schemaName, UpdatePlan plan, Object[] args)
+        throws IgniteCheckedException {
         Object[] params = args != null ? args : X.EMPTY_OBJECT_ARRAY;
 
         if (!F.isEmpty(plan.selectQuery())) {
@@ -806,6 +811,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
             QueryParserResult selectParseRes = parser.parse(schemaName, selectQry, false);
 
             GridQueryFieldsResult res = executeSelectLocal(
+                qryId,
                 selectParseRes.queryDescriptor(),
                 selectParseRes.queryParameters(),
                 selectParseRes.select(),
@@ -1057,7 +1063,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
                 IgniteQueryErrorCode.UNSUPPORTED_OPERATION);
         }
 
-        Long qryId = registerRunningQuery(qryDesc, qryParams, null, null);
+        long qryId = registerRunningQuery(qryDesc, qryParams, null, null);
 
         CommandResult res = null;
 
@@ -1238,7 +1244,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
     ) {
         IndexingQueryFilter filter = (qryDesc.local() ? backupFilter(null, qryParams.partitions()) : null);
 
-        Long qryId = registerRunningQuery(qryDesc, qryParams, cancel, dml.statement());
+        long qryId = registerRunningQuery(qryDesc, qryParams, cancel, dml.statement());
 
         Exception failReason = null;
 
@@ -1252,6 +1258,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
 
             if (!qryDesc.local()) {
                 return executeUpdateDistributed(
+                    qryId,
                     qryDesc,
                     qryParams,
                     dml,
@@ -1260,6 +1267,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
             }
             else {
                 UpdateResult updRes = executeUpdate(
+                    qryId,
                     qryDesc,
                     qryParams,
                     dml,
@@ -1322,7 +1330,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
         assert cancel != null;
 
         // Register query.
-        Long qryId = registerRunningQuery(qryDesc, qryParams, cancel, select.statement());
+        long qryId = registerRunningQuery(qryDesc, qryParams, cancel, select.statement());
 
         try (TraceSurroundings ignored = MTC.support(ctx.tracing().create(SQL_CURSOR_OPEN, MTC.span()))) {
             GridNearTxLocal tx = null;
@@ -1356,6 +1364,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
             int timeout = operationTimeout(qryParams.timeout(), tx);
 
             Iterable<List<?>> iter = executeSelect0(
+                qryId,
                 qryDesc,
                 qryParams,
                 select,
@@ -1396,6 +1405,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
     /**
      * Execute SELECT statement for DML.
      *
+     * @param qryId Query id.
      * @param schema Schema.
      * @param selectQry Select query.
      * @param mvccTracker MVCC tracker.
@@ -1405,6 +1415,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
      * @throws IgniteCheckedException On error.
      */
     private QueryCursorImpl<List<?>> executeSelectForDml(
+        long qryId,
         String schema,
         SqlFieldsQuery selectQry,
         MvccQueryTracker mvccTracker,
@@ -1418,6 +1429,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
         assert select != null;
 
         Iterable<List<?>> iter = executeSelect0(
+            qryId,
             parseRes.queryDescriptor(),
             parseRes.queryParameters(),
             select,
@@ -1440,6 +1452,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
     /**
      * Execute an all-ready {@link SqlFieldsQuery}.
      *
+     * @param qryId Query id.
      * @param qryDesc Plan key.
      * @param qryParams Parameters.
      * @param select Select.
@@ -1452,6 +1465,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
      * @throws IgniteCheckedException On error.
      */
     private Iterable<List<?>> executeSelect0(
+        long qryId,
         QueryDescriptor qryDesc,
         QueryParameters qryParams,
         QueryParserResultSelect select,
@@ -1477,6 +1491,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
             assert twoStepQry != null;
 
             iter = executeSelectDistributed(
+                qryId,
                 qryDesc,
                 qryParams,
                 twoStepQry,
@@ -1491,6 +1506,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
             IndexingQueryFilter filter = (qryDesc.local() ? backupFilter(null, qryParams.partitions()) : null);
 
             GridQueryFieldsResult res = executeSelectLocal(
+                qryId,
                 qryDesc,
                 qryParams,
                 select,
@@ -1586,7 +1602,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
      * @param stmnt Parsed statement.
      * @return Id of registered query or {@code null} if query wasn't registered.
      */
-    private Long registerRunningQuery(
+    private long registerRunningQuery(
         QueryDescriptor qryDesc,
         QueryParameters qryParams,
         GridQueryCancel cancel,
@@ -1594,7 +1610,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
     ) {
         String qry = QueryUtils.INCLUDE_SENSITIVE || stmnt == null ? qryDesc.sql() : sqlWithoutConst(stmnt);
 
-        Long res = runningQryMgr.register(
+        long res = runningQryMgr.register(
             qry,
             GridCacheQueryType.SQL_FIELDS,
             qryDesc.schemaName(),
@@ -1726,6 +1742,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
         // sub-query and not some dummy stuff like "select 1, 2, 3;"
         if (!loc && !plan.isLocalSubquery()) {
             cur = executeSelectForDml(
+                RunningQueryManager.UNDEFINED_QUERY_ID,
                 schema,
                 selectFieldsQry,
                 new StaticMvccQueryTracker(planCctx, mvccSnapshot),
@@ -1739,6 +1756,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
             QueryParserResult selectParseRes = parser.parse(schema, selectFieldsQry, false);
 
             GridQueryFieldsResult res = executeSelectLocal(
+                RunningQueryManager.UNDEFINED_QUERY_ID,
                 selectParseRes.queryDescriptor(),
                 selectParseRes.queryParameters(),
                 selectParseRes.select(),
@@ -1767,6 +1785,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
     /**
      * Run distributed query on detected set of partitions.
      *
+     * @param qryId Query id.
      * @param qryDesc Query descriptor.
      * @param qryParams Query parameters.
      * @param twoStepQry Two-step query.
@@ -1778,6 +1797,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
      */
     @SuppressWarnings("IfMayBeConditional")
     private Iterable<List<?>> executeSelectDistributed(
+        final long qryId,
         final QueryDescriptor qryDesc,
         final QueryParameters qryParams,
         final GridCacheTwoStepQuery twoStepQry,
@@ -1789,7 +1809,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
         // When explicit partitions are set, there must be an owning cache they should be applied to.
         PartitionResult derivedParts = twoStepQry.derivedPartitions();
 
-        final int parts[] = PartitionResult.calculatePartitions(
+        final int[] parts = PartitionResult.calculatePartitions(
             qryParams.partitions(),
             derivedParts,
             qryParams.arguments()
@@ -1821,6 +1841,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
                 @Override public Iterator<List<?>> iterator() {
                     try (TraceSurroundings ignored = MTC.support(ctx.tracing().create(SQL_ITER_OPEN, MTC.span()))) {
                         return IgniteH2Indexing.this.rdcQryExec.query(
+                            qryId,
                             qryDesc.schemaName(),
                             twoStepQry,
                             keepBinary,
@@ -1875,6 +1896,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
         assert dml != null;
 
         return executeUpdate(
+            RunningQueryManager.UNDEFINED_QUERY_ID,
             parseRes.queryDescriptor(),
             parseRes.queryParameters(),
             dml,
@@ -2099,7 +2121,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
     }
 
     /** {@inheritDoc} */
-    @SuppressWarnings({"deprecation"})
+    @SuppressWarnings({"deprecation", "AssignmentToStaticFieldFromInstanceMethod"})
     @Override public void start(GridKernalContext ctx, GridSpinBusyLock busyLock) throws IgniteCheckedException {
         if (log.isDebugEnabled())
             log.debug("Starting cache query index...");
@@ -2549,6 +2571,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
     }
 
     /**
+     * @param qryId Query id.
      * @param qryDesc Query descriptor.
      * @param qryParams Query parameters.
      * @param dml DML statement.
@@ -2558,6 +2581,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
      */
     @SuppressWarnings("unchecked")
     private List<QueryCursorImpl<List<?>>> executeUpdateDistributed(
+        long qryId,
         QueryDescriptor qryDesc,
         QueryParameters qryParams,
         QueryParserResultDml dml,
@@ -2601,6 +2625,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
 
                     try {
                         res = executeUpdate(
+                            qryId,
                             qryDesc,
                             qryParams.toSingleBatchedArguments(args),
                             dml,
@@ -2647,6 +2672,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
         }
         else {
             UpdateResult res = executeUpdate(
+                qryId,
                 qryDesc,
                 qryParams,
                 dml,
@@ -2671,6 +2697,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
     /**
      * Execute DML statement, possibly with few re-attempts in case of concurrent data modifications.
      *
+     * @param qryId Query id.
      * @param qryDesc Query descriptor.
      * @param qryParams Query parameters.
      * @param dml DML command.
@@ -2682,6 +2709,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
      */
     @SuppressWarnings("IfMayBeConditional")
     private UpdateResult executeUpdate(
+        long qryId,
         QueryDescriptor qryDesc,
         QueryParameters qryParams,
         QueryParserResultDml dml,
@@ -2709,6 +2737,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
             try {
                 if (transactional)
                     r = executeUpdateTransactional(
+                        qryId,
                         qryDesc,
                         qryParams,
                         dml,
@@ -2717,6 +2746,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
                     );
                 else
                     r = executeUpdateNonTransactional(
+                        qryId,
                         qryDesc,
                         qryParams,
                         dml,
@@ -2751,6 +2781,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
     /**
      * Execute update in non-transactional mode.
      *
+     * @param qryId Query id.
      * @param qryDesc Query descriptor.
      * @param qryParams Query parameters.
      * @param dml Plan.
@@ -2761,6 +2792,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
      * @throws IgniteCheckedException If failed.
      */
     private UpdateResult executeUpdateNonTransactional(
+        long qryId,
         QueryDescriptor qryDesc,
         QueryParameters qryParams,
         QueryParserResultDml dml,
@@ -2825,6 +2857,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
             assert !F.isEmpty(plan.selectQuery());
 
             cur = executeSelectForDml(
+                qryId,
                 qryDesc.schemaName(),
                 selectFieldsQry,
                 null,
@@ -2840,6 +2873,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
             QueryParserResult selectParseRes = parser.parse(qryDesc.schemaName(), selectFieldsQry, false);
 
             final GridQueryFieldsResult res = executeSelectLocal(
+                qryId,
                 selectParseRes.queryDescriptor(),
                 selectParseRes.queryParameters(),
                 selectParseRes.select(),
@@ -2877,6 +2911,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
     /**
      * Execute update in transactional mode.
      *
+     * @param qryId Query id.
      * @param qryDesc Query descriptor.
      * @param qryParams Query parameters.
      * @param dml Plan.
@@ -2886,6 +2921,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
      * @throws IgniteCheckedException If failed.
      */
     private UpdateResult executeUpdateTransactional(
+        long qryId,
         QueryDescriptor qryDesc,
         QueryParameters qryParams,
         QueryParserResultDml dml,
@@ -2953,6 +2989,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
                         .setLazy(qryParams.lazy());
 
                     FieldsQueryCursor<List<?>> cur = executeSelectForDml(
+                        qryId,
                         qryDesc.schemaName(),
                         selectFieldsQry,
                         MvccUtils.mvccTracker(cctx, tx),
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/MapH2QueryInfo.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/MapH2QueryInfo.java
index 5b8db67..5b1cccd 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/MapH2QueryInfo.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/MapH2QueryInfo.java
@@ -18,15 +18,12 @@
 package org.apache.ignite.internal.processors.query.h2;
 
 import java.sql.PreparedStatement;
-import org.apache.ignite.cluster.ClusterNode;
+import java.util.UUID;
 
 /**
  * Map query info.
  */
 public class MapH2QueryInfo extends H2QueryInfo {
-    /** Node. */
-    private final ClusterNode node;
-
     /** Request id. */
     private final long reqId;
 
@@ -36,23 +33,22 @@ public class MapH2QueryInfo extends H2QueryInfo {
     /**
      * @param stmt Query statement.
      * @param sql Query statement.
-     * @param node Originator node ID
+     * @param nodeId Originator node id.
+     * @param qryId Query id.
      * @param reqId Request ID.
      * @param segment Segment.
      */
-    public MapH2QueryInfo(PreparedStatement stmt, String sql,
-        ClusterNode node, long reqId, int segment) {
-        super(QueryType.MAP, stmt, sql);
+    public MapH2QueryInfo(PreparedStatement stmt, String sql, UUID nodeId, long qryId, long reqId,
+        int segment) {
+        super(QueryType.MAP, stmt, sql, nodeId, qryId);
 
-        this.node = node;
         this.reqId = reqId;
         this.segment = segment;
     }
 
     /** {@inheritDoc} */
     @Override protected void printInfo(StringBuilder msg) {
-        msg.append(", node=").append(node)
-            .append(", reqId=").append(reqId)
+        msg.append(", reqId=").append(reqId)
             .append(", segment=").append(segment);
     }
 }
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/ReduceH2QueryInfo.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/ReduceH2QueryInfo.java
index 9474c6a..ea88594 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/ReduceH2QueryInfo.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/ReduceH2QueryInfo.java
@@ -18,6 +18,7 @@
 package org.apache.ignite.internal.processors.query.h2;
 
 import java.sql.PreparedStatement;
+import java.util.UUID;
 
 /**
  * Reduce query info.
@@ -29,10 +30,12 @@ public class ReduceH2QueryInfo extends H2QueryInfo {
     /**
      * @param stmt Query statement.
      * @param sql Query statement.
+     * @param nodeId Originator node id.
+     * @param qryId Query id.
      * @param reqId Request ID.
      */
-    public ReduceH2QueryInfo(PreparedStatement stmt, String sql, long reqId) {
-        super(QueryType.REDUCE, stmt, sql);
+    public ReduceH2QueryInfo(PreparedStatement stmt, String sql, UUID nodeId, long qryId, long reqId) {
+        super(QueryType.REDUCE, stmt, sql, nodeId, qryId);
 
         this.reqId = reqId;
     }
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
index ccf967b..d8727c2 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
@@ -246,6 +246,7 @@ public class GridMapQueryExecutor {
                     (GridPlainCallable<Void>)() -> {
                         try (TraceSurroundings ignored = MTC.supportContinual(span)) {
                             onQueryRequest0(node,
+                                req.queryId(),
                                 req.requestId(),
                                 segment,
                                 req.schemaName(),
@@ -273,6 +274,7 @@ public class GridMapQueryExecutor {
             }
 
             onQueryRequest0(node,
+                req.queryId(),
                 req.requestId(),
                 singleSegment,
                 req.schemaName(),
@@ -300,6 +302,7 @@ public class GridMapQueryExecutor {
 
     /**
      * @param node Node authored request.
+     * @param qryId Query ID.
      * @param reqId Request ID.
      * @param segmentId index segment ID.
      * @param schemaName Schema name.
@@ -320,6 +323,7 @@ public class GridMapQueryExecutor {
      */
     private void onQueryRequest0(
         final ClusterNode node,
+        final long qryId,
         final long reqId,
         final int segmentId,
         final String schemaName,
@@ -454,7 +458,7 @@ public class GridMapQueryExecutor {
 
                         H2Utils.bindParameters(stmt, params0);
 
-                        MapH2QueryInfo qryInfo = new MapH2QueryInfo(stmt, qry.query(), node, reqId, segmentId);
+                        MapH2QueryInfo qryInfo = new MapH2QueryInfo(stmt, qry.query(), node.id(), qryId, reqId, segmentId);
 
                         ResultSet rs = h2.executeSqlQueryWithTimer(
                             stmt,
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
index 2564196..4609600 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
@@ -320,6 +320,7 @@ public class GridReduceQueryExecutor {
     }
 
     /**
+     * @param qryId Query ID.
      * @param schemaName Schema name.
      * @param qry Query.
      * @param keepBinary Keep binary.
@@ -334,8 +335,9 @@ public class GridReduceQueryExecutor {
      * @param pageSize Page size.
      * @return Rows iterator.
      */
-    @SuppressWarnings({"BusyWait", "IfMayBeConditional"})
+    @SuppressWarnings("IfMayBeConditional")
     public Iterator<List<?>> query(
+        long qryId,
         String schemaName,
         final GridCacheTwoStepQuery qry,
         boolean keepBinary,
@@ -434,6 +436,7 @@ public class GridReduceQueryExecutor {
                     cancel.add(() -> send(nodes, new GridQueryCancelRequest(qryReqId), null, true));
 
                     GridH2QueryRequest req = new GridH2QueryRequest()
+                        .queryId(qryId)
                         .requestId(qryReqId)
                         .topologyVersion(topVer)
                         .pageSize(pageSize)
@@ -524,7 +527,8 @@ public class GridReduceQueryExecutor {
 
                             H2Utils.bindParameters(stmt, F.asList(rdc.parameters(params)));
 
-                            ReduceH2QueryInfo qryInfo = new ReduceH2QueryInfo(stmt, qry.originalSql(), qryReqId);
+                            ReduceH2QueryInfo qryInfo = new ReduceH2QueryInfo(stmt, qry.originalSql(),
+                                ctx.localNodeId(), qryId, qryReqId);
 
                             ResultSet res = h2.executeSqlQueryWithTimer(stmt,
                                 conn,
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2QueryRequest.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2QueryRequest.java
index db1c901..bed036d 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2QueryRequest.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2QueryRequest.java
@@ -37,6 +37,7 @@ import org.apache.ignite.internal.processors.cache.mvcc.MvccSnapshot;
 import org.apache.ignite.internal.processors.cache.query.GridCacheQueryMarshallable;
 import org.apache.ignite.internal.processors.cache.query.GridCacheSqlQuery;
 import org.apache.ignite.internal.processors.cache.query.QueryTable;
+import org.apache.ignite.internal.processors.query.RunningQueryManager;
 import org.apache.ignite.internal.util.tostring.GridToStringInclude;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.U;
@@ -160,6 +161,9 @@ public class GridH2QueryRequest implements Message, GridCacheQueryMarshallable {
     /** TX details holder for {@code SELECT FOR UPDATE}, or {@code null} if not applicable. */
     private GridH2SelectForUpdateTxDetails txReq;
 
+    /** Id of the query assigned by {@link RunningQueryManager} on originator node. */
+    private long qryId;
+
     /** */
     private boolean explicitTimeout;
 
@@ -189,6 +193,7 @@ public class GridH2QueryRequest implements Message, GridCacheQueryMarshallable {
         schemaName = req.schemaName;
         mvccSnapshot = req.mvccSnapshot;
         txReq = req.txReq;
+        qryId = req.qryId;
         explicitTimeout = req.explicitTimeout;
     }
 
@@ -505,6 +510,27 @@ public class GridH2QueryRequest implements Message, GridCacheQueryMarshallable {
     }
 
     /**
+     * Id of the query assigned by {@link RunningQueryManager} on originator node.
+     *
+     * @return Query id.
+     */
+    public long queryId() {
+        return qryId;
+    }
+
+    /**
+     * Sets id of the query assigned by {@link RunningQueryManager}.
+     *
+     * @param queryId Query id.
+     * @return {@code this} for chaining.
+     */
+    public GridH2QueryRequest queryId(long queryId) {
+        this.qryId = queryId;
+
+        return this;
+    }
+
+    /**
      * Checks if data page scan enabled.
      *
      * @return {@code true} If data page scan enabled, {@code false} if not, and {@code null} if not set.
@@ -667,6 +693,11 @@ public class GridH2QueryRequest implements Message, GridCacheQueryMarshallable {
 
                 writer.incrementState();
 
+            case 15:
+                if (!writer.writeLong("qryId", qryId))
+                    return false;
+
+                writer.incrementState();
         }
 
         return true;
@@ -800,6 +831,13 @@ public class GridH2QueryRequest implements Message, GridCacheQueryMarshallable {
 
                 reader.incrementState();
 
+            case 15:
+                qryId = reader.readLong("qryId");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
         }
 
         return reader.afterMessageRead(GridH2QueryRequest.class);
@@ -812,7 +850,7 @@ public class GridH2QueryRequest implements Message, GridCacheQueryMarshallable {
 
     /** {@inheritDoc} */
     @Override public byte fieldsCount() {
-        return 15;
+        return 16;
     }
 
     /** {@inheritDoc} */