You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@ignite.apache.org by GitBox <gi...@apache.org> on 2022/02/07 09:30:28 UTC

[GitHub] [ignite] alex-plekhanov commented on a change in pull request #9767: IGNITE-16317 Add KILL QUERY command support, integrate calcite queries into RunningQueryManager.

alex-plekhanov commented on a change in pull request #9767:
URL: https://github.com/apache/ignite/pull/9767#discussion_r799474817



##########
File path: modules/calcite/src/main/codegen/includes/parserImpls.ftl
##########
@@ -559,3 +559,31 @@ SqlNode SqlKillComputeTask():
         return IgniteSqlKill.createComputeTaskKill(s.end(this), sesId);
     }
 }
+
+boolean IsAsyncOpt() :
+{
+}
+{
+    <ASYNC> { return true; } | { return false; }
+}
+
+SqlNode SqlKillQuery():
+{
+    final Span s;
+    final boolean isAsync;
+}
+{
+    <KILL> { s = span(); } <QUERY>
+    isAsync= IsAsyncOpt()
+    <QUOTED_STRING> {
+        String rawQueryId = SqlParserUtil.parseString(token.image);
+        SqlCharStringLiteral queryIdLiteral = SqlLiteral.createCharString(rawQueryId, getPos());
+        try {
+            Pair<UUID, Long> id = IgniteSqlKill.parseGlobalQueryId(rawQueryId);
+            return IgniteSqlKill.createQueryKill(s.end(this), queryIdLiteral, id.getKey(), id.getValue(), isAsync);
+        }
+        catch (Exception e) {

Review comment:
       We get this exception only on NPE, when accessing `id.getKey`, so it's better to check for null value after parsing and throw illegalGlobalQueryId when parsing result is null.

##########
File path: modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/QueryRegistryImpl.java
##########
@@ -21,50 +21,85 @@
 import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
-
-import org.apache.ignite.IgniteLogger;
+import java.util.stream.Collectors;
+import org.apache.calcite.util.Pair;
+import org.apache.ignite.cache.query.QueryCancelledException;
+import org.apache.ignite.internal.GridKernalContext;
+import org.apache.ignite.internal.processors.cache.query.GridCacheQueryType;
+import org.apache.ignite.internal.processors.query.GridQueryCancel;
+import org.apache.ignite.internal.processors.query.GridRunningQueryManager;
 import org.apache.ignite.internal.processors.query.RunningQuery;
+import org.apache.ignite.internal.processors.query.calcite.exec.ddl.RunningQueryManagerWrapper;
+import org.apache.ignite.internal.processors.query.calcite.util.AbstractService;
 import org.apache.ignite.internal.util.IgniteUtils;
+import org.jetbrains.annotations.Nullable;
 
 /**
  * Registry of the running queries.
  */
-public class QueryRegistryImpl implements QueryRegistry {
+public class QueryRegistryImpl extends AbstractService implements QueryRegistry {
     /** */
-    private final ConcurrentMap<UUID, RunningQuery> runningQrys = new ConcurrentHashMap<>();
+    private final ConcurrentMap<UUID, Pair<Long, RunningQuery>> runningQrys = new ConcurrentHashMap<>();
 
     /** */
-    private final IgniteLogger log;
+    private final GridRunningQueryManager runningQryMgr;
 
     /** */
-    public QueryRegistryImpl(IgniteLogger log) {
-        this.log = log;
+    public QueryRegistryImpl(GridKernalContext ctx) {
+        super(ctx);
+
+        runningQryMgr = new RunningQueryManagerWrapper(ctx);
     }
 
     /** {@inheritDoc} */
-    @Override public RunningQuery register(RunningQuery qry) {
-        return runningQrys.computeIfAbsent(qry.id(), k -> qry);
+    @Override public RunningQuery register(String sql, String schema, RunningQuery qry) {

Review comment:
       Perhaps, it's better to add methods `sql` and `schemaName` to `RunningQuery` interface (`Query` already have all these data) and leave the registration method parameters as is.

##########
File path: modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/AbstractService.java
##########
@@ -27,10 +27,14 @@
     /** */
     protected final IgniteLogger log;
 
+    /** */
+    protected final GridKernalContext kctx;

Review comment:
       Let's store it only in `QueryRegistryImpl`

##########
File path: modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/QueryRegistryImpl.java
##########
@@ -21,50 +21,85 @@
 import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
-
-import org.apache.ignite.IgniteLogger;
+import java.util.stream.Collectors;
+import org.apache.calcite.util.Pair;
+import org.apache.ignite.cache.query.QueryCancelledException;
+import org.apache.ignite.internal.GridKernalContext;
+import org.apache.ignite.internal.processors.cache.query.GridCacheQueryType;
+import org.apache.ignite.internal.processors.query.GridQueryCancel;
+import org.apache.ignite.internal.processors.query.GridRunningQueryManager;
 import org.apache.ignite.internal.processors.query.RunningQuery;
+import org.apache.ignite.internal.processors.query.calcite.exec.ddl.RunningQueryManagerWrapper;
+import org.apache.ignite.internal.processors.query.calcite.util.AbstractService;
 import org.apache.ignite.internal.util.IgniteUtils;
+import org.jetbrains.annotations.Nullable;
 
 /**
  * Registry of the running queries.
  */
-public class QueryRegistryImpl implements QueryRegistry {
+public class QueryRegistryImpl extends AbstractService implements QueryRegistry {
     /** */
-    private final ConcurrentMap<UUID, RunningQuery> runningQrys = new ConcurrentHashMap<>();
+    private final ConcurrentMap<UUID, Pair<Long, RunningQuery>> runningQrys = new ConcurrentHashMap<>();
 
     /** */
-    private final IgniteLogger log;
+    private final GridRunningQueryManager runningQryMgr;
 
     /** */
-    public QueryRegistryImpl(IgniteLogger log) {
-        this.log = log;
+    public QueryRegistryImpl(GridKernalContext ctx) {
+        super(ctx);
+
+        runningQryMgr = new RunningQueryManagerWrapper(ctx);
     }
 
     /** {@inheritDoc} */
-    @Override public RunningQuery register(RunningQuery qry) {
-        return runningQrys.computeIfAbsent(qry.id(), k -> qry);
+    @Override public RunningQuery register(String sql, String schema, RunningQuery qry) {
+        return runningQrys.computeIfAbsent(qry.id(), k -> {
+            String nodeId;
+            if (qry.initiatorNodeId() != null)
+                nodeId = qry.initiatorNodeId().toString();
+            else
+                nodeId = kctx.discovery().localNode().id().toString();

Review comment:
       Let's provide `qry.initiatorNodeId()` by root query instead. Also, `kctx.discovery().localNode().id()` can be simplified to `kctx.localNodeId()`

##########
File path: modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/RootQuery.java
##########
@@ -143,6 +147,11 @@ public String sql() {
         return sql;
     }
 
+    /** */
+    public SchemaPlus schema() {
+        return schema;

Review comment:
       Can be obtained from `ctx.schema()`, no need to store it again.

##########
File path: modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/sql/SqlDdlParserTest.java
##########
@@ -705,6 +707,34 @@ public void killContinuousQuery() throws Exception {
         assertParserThrows("kill continuous", SqlParseException.class);
     }
 
+    /**
+     * Test kill continuous query parsing.
+     */
+    @Test
+    public void killSqlQuery() throws Exception {
+        IgniteSqlKill killTask;
+
+        UUID nodeId = UUID.randomUUID();
+        long queryId = ThreadLocalRandom.current().nextLong();
+
+        killTask = parse("kill query '" + nodeId + "_" + queryId + "'");
+        assertTrue(killTask instanceof IgniteSqlKillQuery);
+        assertEquals(nodeId, ((IgniteSqlKillQuery)killTask).nodeId());
+        assertEquals(queryId, ((IgniteSqlKillQuery)killTask).queryId());
+        assertFalse(((IgniteSqlKillQuery)killTask).isAsync());
+
+        killTask = parse("kill query async '" + nodeId + "_" + queryId + "'");
+        assertTrue(killTask instanceof IgniteSqlKillQuery);
+        assertEquals(nodeId, ((IgniteSqlKillQuery)killTask).nodeId());
+        assertEquals(queryId, ((IgniteSqlKillQuery)killTask).queryId());
+        assertTrue(((IgniteSqlKillQuery)killTask).isAsync());
+
+        assertParserThrows("kill query '1233415'", SqlParseException.class);
+        assertParserThrows("kill query '" + UUID.randomUUID() + "'_a1233415'", SqlParseException.class);

Review comment:
       Looks like redundant first quote in `'_a1233415'`

##########
File path: modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ddl/RunningQueryManagerWrapper.java
##########
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ignite.internal.processors.query.calcite.exec.ddl;
+
+import java.util.Collections;
+import java.util.Map;
+import java.util.UUID;
+import java.util.function.Supplier;
+import org.apache.ignite.internal.GridKernalContext;
+import org.apache.ignite.internal.processors.cache.query.GridCacheQueryType;
+import org.apache.ignite.internal.processors.query.GridQueryCancel;
+import org.apache.ignite.internal.processors.query.GridQueryProcessor;
+import org.apache.ignite.internal.processors.query.GridRunningQueryInfo;
+import org.apache.ignite.internal.processors.query.GridRunningQueryManager;
+import org.apache.ignite.internal.processors.query.QueryHistory;
+import org.apache.ignite.internal.processors.query.QueryHistoryKey;
+import org.apache.ignite.internal.processors.query.h2.IgniteH2Indexing;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Wrapper of running query manager.
+ */
+public class RunningQueryManagerWrapper implements GridRunningQueryManager {
+    /** Running query manager supplier. */
+    private final Supplier<GridRunningQueryManager> qryMgrSupp;
+
+    /**
+     * @param ctx Grid kernal context.
+     */
+    public RunningQueryManagerWrapper(GridKernalContext ctx) {
+        qryMgrSupp = () -> {
+            GridQueryProcessor queryProc = ctx.query();
+            if (queryProc == null)
+                return null;
+
+            IgniteH2Indexing idx = (IgniteH2Indexing)queryProc.getIndexing();

Review comment:
       Additional H2 dependency should be avoided. I propose to remove this wrapper at all, add GridRunningQueryManager getter to GridQueryProcessor, use this method directly from `SqlCommandProcessor` and `QueryRegistryImpl` via `ctx.query.runningQueryManager()`. 

##########
File path: modules/indexing/src/test/java/org/apache/ignite/util/KillCommandsMXBeanTest.java
##########
@@ -217,7 +219,9 @@ public void testCancelUnknownService() {
     /** */
     @Test
     public void testCancelUnknownSQLQuery() {
-        qryMBean.cancelSQL(srvs.get(0).localNode().id().toString() + "_42");
+        GridTestUtils.assertThrowsWithCause(
+            () -> qryMBean.cancelSQL(srvs.get(0).localNode().id().toString() + "_42"),
+            IgniteSQLException.class);

Review comment:
       Throwing Ignite exception by MBean is incorrect since it can't be deserialized by JMX client. I now, such behaviour was before your changes, but asserting this is incorrect too. Let's wrap exception by `cancelSQL` correctly (only message wrapped to `RuntimeException`)

##########
File path: modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/QueryRegistry.java
##########
@@ -47,8 +48,9 @@
      * Unregister query by identifier.
      *
      * @param id Query identifier.
+     * @param failReason exception that caused query execution fail, or {@code null} if query succeded.
      */
-    void unregister(UUID id);
+    void unregister(UUID id, @Nullable Throwable failReason);

Review comment:
       Looks like we use `failReason` only for DDL statements, it's almost useless, so I propose to remove this parameter or to use it more widely, including regular queries.

##########
File path: modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/QueryRegistryImpl.java
##########
@@ -21,50 +21,85 @@
 import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
-
-import org.apache.ignite.IgniteLogger;
+import java.util.stream.Collectors;
+import org.apache.calcite.util.Pair;
+import org.apache.ignite.cache.query.QueryCancelledException;
+import org.apache.ignite.internal.GridKernalContext;
+import org.apache.ignite.internal.processors.cache.query.GridCacheQueryType;
+import org.apache.ignite.internal.processors.query.GridQueryCancel;
+import org.apache.ignite.internal.processors.query.GridRunningQueryManager;
 import org.apache.ignite.internal.processors.query.RunningQuery;
+import org.apache.ignite.internal.processors.query.calcite.exec.ddl.RunningQueryManagerWrapper;
+import org.apache.ignite.internal.processors.query.calcite.util.AbstractService;
 import org.apache.ignite.internal.util.IgniteUtils;
+import org.jetbrains.annotations.Nullable;
 
 /**
  * Registry of the running queries.
  */
-public class QueryRegistryImpl implements QueryRegistry {
+public class QueryRegistryImpl extends AbstractService implements QueryRegistry {
     /** */
-    private final ConcurrentMap<UUID, RunningQuery> runningQrys = new ConcurrentHashMap<>();
+    private final ConcurrentMap<UUID, Pair<Long, RunningQuery>> runningQrys = new ConcurrentHashMap<>();
 
     /** */
-    private final IgniteLogger log;
+    private final GridRunningQueryManager runningQryMgr;
 
     /** */
-    public QueryRegistryImpl(IgniteLogger log) {
-        this.log = log;
+    public QueryRegistryImpl(GridKernalContext ctx) {
+        super(ctx);
+
+        runningQryMgr = new RunningQueryManagerWrapper(ctx);
     }
 
     /** {@inheritDoc} */
-    @Override public RunningQuery register(RunningQuery qry) {
-        return runningQrys.computeIfAbsent(qry.id(), k -> qry);
+    @Override public RunningQuery register(String sql, String schema, RunningQuery qry) {
+        return runningQrys.computeIfAbsent(qry.id(), k -> {
+            String nodeId;
+            if (qry.initiatorNodeId() != null)
+                nodeId = qry.initiatorNodeId().toString();
+            else
+                nodeId = kctx.discovery().localNode().id().toString();
+
+            Long locId = runningQryMgr.register(sql, GridCacheQueryType.SQL_FIELDS, schema, false,
+                createCancelToken(qry), nodeId);

Review comment:
       `qryInitiatorId` here has another meaning, not node id.

##########
File path: modules/core/src/main/java/org/apache/ignite/internal/processors/query/RunningQueryManager.java
##########
@@ -324,29 +394,235 @@ public void stop() {
         }
     }
 
+    /** {@inheritDoc} */
+    @Override public void cancelQuery(long queryId, @Nullable UUID nodeId, boolean async) {
+        GridFutureAdapter<String> fut = new GridFutureAdapter<>();
+
+        lock.readLock().lock();
+
+        try {
+            if (stopped)
+                throw new IgniteSQLException("Failed to cancel query due to node is stopped [nodeId=" + nodeId +
+                    ", qryId=" + queryId + "]");
+
+            final ClusterNode node = nodeId != null ? ctx.discovery().node(nodeId) : ctx.discovery().localNode();
+
+            if (node != null) {
+                KillQueryRun qryRun = new KillQueryRun(nodeId, queryId, fut);
+
+                long reqId = qryCancelReqCntr.incrementAndGet();
+
+                cancellationRuns.put(reqId, qryRun);
+
+                final GridQueryKillRequest request = new GridQueryKillRequest(reqId, queryId, async);
+
+                if (node.isLocal() && !async) {
+                    locNodeMsgHnd.apply(node, request);
+                }
+                else {
+                    try {
+                        if (node.isLocal()) {
+                            ctx.closure().runLocal(new GridPlainRunnable() {
+                                @Override public void run() {
+                                    if (!busyLock.enterBusy())
+                                        return;
+
+                                    try {
+                                        locNodeMsgHnd.apply(node, request);
+                                    }
+                                    finally {
+                                        busyLock.leaveBusy();
+                                    }
+                                }
+                            }, GridIoPolicy.MANAGEMENT_POOL);
+                        }
+                        else {
+                            ctx.io().sendGeneric(node, GridTopic.TOPIC_QUERY, GridTopic.TOPIC_QUERY.ordinal(), request,
+                                GridIoPolicy.MANAGEMENT_POOL);
+                        }
+                    }
+                    catch (IgniteCheckedException e) {
+                        cancellationRuns.remove(reqId);
+
+                        throw new IgniteSQLException("Failed to cancel query due communication problem " +
+                            "[nodeId=" + node.id() + ",qryId=" + queryId + ", errMsg=" + e.getMessage() + "]");
+                    }
+                }
+            }
+            else
+                throw new IgniteSQLException("Failed to cancel query, node is not alive [nodeId=" + nodeId + ", qryId="
+                    + queryId + "]");
+        }
+        finally {
+            lock.readLock().unlock();
+        }
+
+        try {
+            String err = fut.get();
+
+            if (err != null)
+                throw new IgniteSQLException("Failed to cancel query [nodeId=" + nodeId + ", qryId="
+                    + queryId + ", err=" + err + "]");
+        }
+        catch (IgniteCheckedException e) {
+            throw new IgniteSQLException("Failed to cancel query [nodeId=" + nodeId + ", qryId="
+                + queryId + ", err=" + e + "]", e);
+        }
+    }
+
     /**
-     * Gets query history statistics. Size of history could be configured via {@link
-     * SqlConfiguration#setSqlQueryHistorySize(int)}
+     * Client disconnected callback.
+     */
+    public void onDisconnected() {
+        completeCancellationFutures("Failed to cancel query because local client node has been disconnected from the cluster");
+    }
+
+    /**
+     * @param err Text of error to complete futures.
+     */
+    private void completeCancellationFutures(@Nullable String err) {
+        lock.writeLock().lock();
+
+        try {
+            Iterator<KillQueryRun> it = cancellationRuns.values().iterator();
+
+            while (it.hasNext()) {
+                KillQueryRun qryRun = it.next();
+
+                qryRun.cancelFuture().onDone(err);
+
+                it.remove();
+            }
+        }
+        finally {
+            lock.writeLock().unlock();
+        }
+    }
+
+    /**
+     * @param nodeId Node ID.
+     * @param msg Message.
+     */
+    public void onMessage(UUID nodeId, Object msg) {

Review comment:
       private

##########
File path: modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridRunningQueryManager.java
##########
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ignite.internal.processors.query;
+
+import java.util.Map;
+import java.util.UUID;
+import org.apache.ignite.configuration.SqlConfiguration;
+import org.apache.ignite.internal.processors.cache.query.GridCacheQueryType;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Keep information about all running queries.
+ */
+public interface GridRunningQueryManager {

Review comment:
       `RunningQueryManager` is already in the core module, do we really need an additional interface? Looks redundant. 

##########
File path: modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
##########
@@ -2472,12 +2472,17 @@ public void awaitForReadyTopologyVersion(AffinityTopologyVersion topVer) throws
     }
 
     /** {@inheritDoc} */
-    @Override public Collection<GridRunningQueryInfo> runningQueries(long duration) {
+    @Override public Collection<GridRunningQueryInfo> runningLocalQueries(long duration) {
         return runningQryMgr.longRunningQueries(duration);
     }
 
     /** {@inheritDoc} */
-    @Override public void cancelQueries(Collection<Long> queries) {
+    @Override public void cancelQuery(long queryId, @Nullable UUID nodeId, boolean async) {
+        runningQryMgr.cancelQuery(queryId, nodeId, async);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void cancelLocalQueries(Collection<Long> queries) {
         if (!F.isEmpty(queries)) {
             for (Long qryId : queries)
                 runningQryMgr.cancel(qryId);

Review comment:
       There are methods `cancel` and `cancelQuery` in `RunningQueryManager`, let's align naming. Both should contain "query" or both should not. `cancel` with id should also contain "local". 

##########
File path: modules/core/src/main/java/org/apache/ignite/internal/processors/query/RunningQueryManager.java
##########
@@ -324,29 +394,235 @@ public void stop() {
         }
     }
 
+    /** {@inheritDoc} */
+    @Override public void cancelQuery(long queryId, @Nullable UUID nodeId, boolean async) {
+        GridFutureAdapter<String> fut = new GridFutureAdapter<>();
+
+        lock.readLock().lock();
+
+        try {
+            if (stopped)
+                throw new IgniteSQLException("Failed to cancel query due to node is stopped [nodeId=" + nodeId +
+                    ", qryId=" + queryId + "]");
+
+            final ClusterNode node = nodeId != null ? ctx.discovery().node(nodeId) : ctx.discovery().localNode();
+
+            if (node != null) {
+                KillQueryRun qryRun = new KillQueryRun(nodeId, queryId, fut);
+
+                long reqId = qryCancelReqCntr.incrementAndGet();
+
+                cancellationRuns.put(reqId, qryRun);
+
+                final GridQueryKillRequest request = new GridQueryKillRequest(reqId, queryId, async);
+
+                if (node.isLocal() && !async) {
+                    locNodeMsgHnd.apply(node, request);
+                }
+                else {
+                    try {
+                        if (node.isLocal()) {
+                            ctx.closure().runLocal(new GridPlainRunnable() {
+                                @Override public void run() {
+                                    if (!busyLock.enterBusy())
+                                        return;
+
+                                    try {
+                                        locNodeMsgHnd.apply(node, request);
+                                    }
+                                    finally {
+                                        busyLock.leaveBusy();
+                                    }
+                                }
+                            }, GridIoPolicy.MANAGEMENT_POOL);
+                        }
+                        else {
+                            ctx.io().sendGeneric(node, GridTopic.TOPIC_QUERY, GridTopic.TOPIC_QUERY.ordinal(), request,
+                                GridIoPolicy.MANAGEMENT_POOL);
+                        }
+                    }
+                    catch (IgniteCheckedException e) {
+                        cancellationRuns.remove(reqId);
+
+                        throw new IgniteSQLException("Failed to cancel query due communication problem " +
+                            "[nodeId=" + node.id() + ",qryId=" + queryId + ", errMsg=" + e.getMessage() + "]");
+                    }
+                }
+            }
+            else
+                throw new IgniteSQLException("Failed to cancel query, node is not alive [nodeId=" + nodeId + ", qryId="
+                    + queryId + "]");
+        }
+        finally {
+            lock.readLock().unlock();
+        }
+
+        try {
+            String err = fut.get();
+
+            if (err != null)
+                throw new IgniteSQLException("Failed to cancel query [nodeId=" + nodeId + ", qryId="

Review comment:
       `{}`

##########
File path: modules/core/src/main/java/org/apache/ignite/internal/processors/query/RunningQueryManager.java
##########
@@ -324,29 +394,235 @@ public void stop() {
         }
     }
 
+    /** {@inheritDoc} */
+    @Override public void cancelQuery(long queryId, @Nullable UUID nodeId, boolean async) {
+        GridFutureAdapter<String> fut = new GridFutureAdapter<>();
+
+        lock.readLock().lock();
+
+        try {
+            if (stopped)
+                throw new IgniteSQLException("Failed to cancel query due to node is stopped [nodeId=" + nodeId +
+                    ", qryId=" + queryId + "]");
+
+            final ClusterNode node = nodeId != null ? ctx.discovery().node(nodeId) : ctx.discovery().localNode();
+
+            if (node != null) {
+                KillQueryRun qryRun = new KillQueryRun(nodeId, queryId, fut);
+
+                long reqId = qryCancelReqCntr.incrementAndGet();
+
+                cancellationRuns.put(reqId, qryRun);
+
+                final GridQueryKillRequest request = new GridQueryKillRequest(reqId, queryId, async);
+
+                if (node.isLocal() && !async) {

Review comment:
       Redundant braces

##########
File path: modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/QueryRegistryImpl.java
##########
@@ -21,50 +21,85 @@
 import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
-
-import org.apache.ignite.IgniteLogger;
+import java.util.stream.Collectors;
+import org.apache.calcite.util.Pair;
+import org.apache.ignite.cache.query.QueryCancelledException;
+import org.apache.ignite.internal.GridKernalContext;
+import org.apache.ignite.internal.processors.cache.query.GridCacheQueryType;
+import org.apache.ignite.internal.processors.query.GridQueryCancel;
+import org.apache.ignite.internal.processors.query.GridRunningQueryManager;
 import org.apache.ignite.internal.processors.query.RunningQuery;
+import org.apache.ignite.internal.processors.query.calcite.exec.ddl.RunningQueryManagerWrapper;
+import org.apache.ignite.internal.processors.query.calcite.util.AbstractService;
 import org.apache.ignite.internal.util.IgniteUtils;
+import org.jetbrains.annotations.Nullable;
 
 /**
  * Registry of the running queries.
  */
-public class QueryRegistryImpl implements QueryRegistry {
+public class QueryRegistryImpl extends AbstractService implements QueryRegistry {
     /** */
-    private final ConcurrentMap<UUID, RunningQuery> runningQrys = new ConcurrentHashMap<>();
+    private final ConcurrentMap<UUID, Pair<Long, RunningQuery>> runningQrys = new ConcurrentHashMap<>();
 
     /** */
-    private final IgniteLogger log;
+    private final GridRunningQueryManager runningQryMgr;
 
     /** */
-    public QueryRegistryImpl(IgniteLogger log) {
-        this.log = log;
+    public QueryRegistryImpl(GridKernalContext ctx) {
+        super(ctx);
+
+        runningQryMgr = new RunningQueryManagerWrapper(ctx);
     }
 
     /** {@inheritDoc} */
-    @Override public RunningQuery register(RunningQuery qry) {
-        return runningQrys.computeIfAbsent(qry.id(), k -> qry);
+    @Override public RunningQuery register(String sql, String schema, RunningQuery qry) {
+        return runningQrys.computeIfAbsent(qry.id(), k -> {
+            String nodeId;
+            if (qry.initiatorNodeId() != null)
+                nodeId = qry.initiatorNodeId().toString();
+            else
+                nodeId = kctx.discovery().localNode().id().toString();
+
+            Long locId = runningQryMgr.register(sql, GridCacheQueryType.SQL_FIELDS, schema, false,
+                createCancelToken(qry), nodeId);
+
+            return Pair.of(locId, qry);

Review comment:
       Perhaps we can store `locId` in query, instead of creating Pair. WDYT?

##########
File path: modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/KillQueryCommandDdlIntegrationTest.java
##########
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ignite.internal.processors.query.calcite.integration;
+
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.Optional;
+import java.util.UUID;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Function;
+import java.util.function.Predicate;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+import org.apache.calcite.util.ImmutableBitSet;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.processors.query.IgniteSQLException;
+import org.apache.ignite.internal.processors.query.calcite.CalciteQueryProcessor;
+import org.apache.ignite.internal.processors.query.calcite.exec.ExecutionContext;
+import org.apache.ignite.internal.processors.query.calcite.metadata.ColocationGroup;
+import org.apache.ignite.internal.processors.query.calcite.schema.CacheTableImpl;
+import org.apache.ignite.internal.processors.query.calcite.schema.IgniteCacheTable;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+/**
+ * Tests `KILL QUERY` command.
+ */
+@RunWith(Parameterized.class)
+public class KillQueryCommandDdlIntegrationTest extends AbstractDdlIntegrationTest {
+    /** Operations timeout. */
+    public static final int TIMEOUT = 10_000;
+
+    /** If {@code true}, cancel asynchronously. */
+    @Parameterized.Parameter(0)
+    public boolean isAsync;
+
+    /** If {@code true}, cancel on client(initiator), otherwise on server. */
+    @Parameterized.Parameter(1)
+    public boolean cancelOnClient;
+
+    /** @return Test parameters. */
+    @Parameterized.Parameters(name = "isAsync={0},cancelOnClient={1}")
+    public static Collection<?> parameters() {
+        return Stream.of(true, false).flatMap(p -> Stream.of(new Object[] {p, true}, new Object[] {p, false}))
+            .collect(Collectors.toList());
+    }
+
+    /** */
+    @Test
+    public void testCancelUnknownSqlQuery() {
+        IgniteEx srv = grid(0);
+        UUID nodeId = cancelOnClient ? client.localNode().id() : srv.localNode().id();
+        Long queryId = ThreadLocalRandom.current().nextLong(10, 10000);
+        GridTestUtils.assertThrows(log, () -> {
+                sql(cancelOnClient ? client : srv, "KILL QUERY" + (isAsync ? " ASYNC '" : " '") + nodeId + "_"
+                    + queryId + "'");
+            },
+            IgniteException.class,
+            String.format("Failed to cancel query [nodeId=%s, qryId=%d, err=Query with provided ID doesn't exist " +
+                    "[nodeId=%s, qryId=%d]]", nodeId, queryId, nodeId, queryId)
+        );
+    }
+
+    /** */
+    @Test
+    public void testCancelSqlQuery() throws Exception {
+        IgniteEx srv = grid(0);
+        CalciteQueryProcessor srvEngine = queryProcessor(srv);
+
+        sql("CREATE TABLE person (id int, val varchar)");
+        sql("INSERT INTO person (id, val) VALUES (?, ?)", 0, "val0");
+
+        IgniteCacheTable oldTbl = (IgniteCacheTable)srvEngine.schemaHolder().schema("PUBLIC").getTable("PERSON");
+
+        CountDownLatch scanLatch = new CountDownLatch(1);
+        AtomicBoolean stop = new AtomicBoolean();
+
+        IgniteCacheTable newTbl = new CacheTableImpl(srv.context(), oldTbl.descriptor()) {
+            @Override public <Row> Iterable<Row> scan(
+                ExecutionContext<Row> execCtx,
+                ColocationGroup grp,
+                Predicate<Row> filter,
+                Function<Row, Row> rowTransformer,
+                @Nullable ImmutableBitSet usedColumns
+            ) {
+                return new Iterable<Row>() {
+                    @NotNull @Override public Iterator<Row> iterator() {
+                        scanLatch.countDown();
+
+                        return new Iterator<Row>() {
+                            @Override public boolean hasNext() {
+                                // Produce rows until stopped.
+                                return !stop.get();
+                            }
+
+                            @Override public Row next() {
+                                if (stop.get())
+                                    throw new NoSuchElementException();
+
+                                return execCtx.rowHandler().factory().create();
+                            }
+                        };
+                    }
+                };
+            }
+        };
+
+        srvEngine.schemaHolder().schema("PUBLIC").add("PERSON", newTbl);
+
+        IgniteInternalFuture<List<List<?>>> fut = GridTestUtils.runAsync(() -> sql("SELECT * FROM person"));
+
+        try {
+            scanLatch.await(TIMEOUT, TimeUnit.MILLISECONDS);
+
+            Optional<String> srvQueryId = sql(srv, "SELECT * FROM SYS.SQL_QUERIES ORDER BY START_TIME")
+                .stream()
+                .filter(q -> ((String)q.get(1)).contains("PERSON")).map(q -> (String)q.get(0))
+                .findFirst();
+
+            assertTrue(srvQueryId.isPresent());
+
+            Optional<String> clientQueryId = sql(srv, "SELECT * FROM SYS.SQL_QUERIES ORDER BY START_TIME")
+                .stream()
+                .filter(q -> ((String)q.get(1)).contains("PERSON")).map(q -> (String)q.get(0))
+                .findFirst();
+
+            assertTrue(clientQueryId.isPresent());
+
+            GridTestUtils.runAsync(() -> sql(cancelOnClient ? client : srv,
+                "KILL QUERY" + (isAsync ? " ASYNC '" : " '") + clientQueryId.get() + "'"));
+
+            Assert.assertTrue(GridTestUtils.waitForCondition(
+                () -> !sql(srv, "SELECT * FROM SYS.SQL_QUERIES ORDER BY START_TIME")
+                    .stream()
+                    .filter(q -> ((String)q.get(1)).contains("PERSON")).map(q -> (String)q.get(0))
+                    .findAny().isPresent(), TIMEOUT));
+
+            Assert.assertTrue(GridTestUtils.waitForCondition(
+                () -> !sql(client, "SELECT * FROM SYS.SQL_QUERIES ORDER BY START_TIME")
+                    .stream()
+                    .filter(q -> ((String)q.get(1)).contains("PERSON")).map(q -> (String)q.get(0))
+                    .findAny().isPresent(), TIMEOUT));

Review comment:
       ```suggestion
               SystemView<SqlQueryView> srvView = srv.context().systemView().view(SQL_QRY_VIEW);
   
               assertFalse(F.isEmpty(srvView));
   
               SystemView<SqlQueryView> clientView = client.context().systemView().view(SQL_QRY_VIEW);
   
               assertFalse(F.isEmpty(clientView));
   
               String clientQryId = F.first(clientView).queryId();
   
               GridTestUtils.runAsync(() -> sql(cancelOnClient ? client : srv,
                   "KILL QUERY" + (isAsync ? " ASYNC '" : " '") + clientQryId + "'"));
   
               assertTrue(GridTestUtils.waitForCondition(() -> F.isEmpty(srvView), TIMEOUT));
   
               assertTrue(GridTestUtils.waitForCondition(() -> F.isEmpty(clientView), TIMEOUT));
   ```

##########
File path: modules/core/src/main/java/org/apache/ignite/internal/QueryMXBeanImpl.java
##########
@@ -105,8 +104,7 @@ public QueryMXBeanImpl(GridKernalContext ctx) {
      *
      */
     public void cancelSQL(UUID originNodeId, long qryId) {
-        ctx.grid().compute(ctx.grid().cluster().forNodeId(originNodeId))
-            .broadcast(new CancelSQLOnInitiator(), qryId);
+        ctx.query().getIndexing().cancelQuery(qryId, originNodeId, false);

Review comment:
       `ctx.query().cancelQuery(qryId, originNodeId, false);`? Or perhaps `ctx.query().runningQueryManager().cancelQuery(qryId, originNodeId, false);` (remove `cancelQuery` from `GridQueryProcessor`)

##########
File path: modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
##########
@@ -2472,12 +2472,17 @@ public void awaitForReadyTopologyVersion(AffinityTopologyVersion topVer) throws
     }
 
     /** {@inheritDoc} */
-    @Override public Collection<GridRunningQueryInfo> runningQueries(long duration) {
+    @Override public Collection<GridRunningQueryInfo> runningLocalQueries(long duration) {

Review comment:
       Why "local" here? This method shows all the registered queries. Ignite-indexing engine registers only local queries, but Calcite-based engine registers queries from other initiators too. Let's keep the name as is.

##########
File path: modules/core/src/main/java/org/apache/ignite/internal/processors/query/KillQueryRun.java
##########
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.ignite.internal.processors.query;
+
+import java.util.UUID;
+import org.apache.ignite.internal.util.future.GridFutureAdapter;
+
+/**
+ * Kill Query run context.
+ */
+class KillQueryRun {

Review comment:
       The old class (in indexing module) is not removed. Strange class name. I think it should extend the future adapter, and should be named something like `KillQueryFuture`. Also, perhaps should be the inner class of `RunningQueryManager` since not used in other places.

##########
File path: modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/QueryRegistryImpl.java
##########
@@ -21,50 +21,85 @@
 import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
-
-import org.apache.ignite.IgniteLogger;
+import java.util.stream.Collectors;
+import org.apache.calcite.util.Pair;
+import org.apache.ignite.cache.query.QueryCancelledException;
+import org.apache.ignite.internal.GridKernalContext;
+import org.apache.ignite.internal.processors.cache.query.GridCacheQueryType;
+import org.apache.ignite.internal.processors.query.GridQueryCancel;
+import org.apache.ignite.internal.processors.query.GridRunningQueryManager;
 import org.apache.ignite.internal.processors.query.RunningQuery;
+import org.apache.ignite.internal.processors.query.calcite.exec.ddl.RunningQueryManagerWrapper;
+import org.apache.ignite.internal.processors.query.calcite.util.AbstractService;
 import org.apache.ignite.internal.util.IgniteUtils;
+import org.jetbrains.annotations.Nullable;
 
 /**
  * Registry of the running queries.
  */
-public class QueryRegistryImpl implements QueryRegistry {
+public class QueryRegistryImpl extends AbstractService implements QueryRegistry {
     /** */
-    private final ConcurrentMap<UUID, RunningQuery> runningQrys = new ConcurrentHashMap<>();
+    private final ConcurrentMap<UUID, Pair<Long, RunningQuery>> runningQrys = new ConcurrentHashMap<>();
 
     /** */
-    private final IgniteLogger log;
+    private final GridRunningQueryManager runningQryMgr;
 
     /** */
-    public QueryRegistryImpl(IgniteLogger log) {
-        this.log = log;
+    public QueryRegistryImpl(GridKernalContext ctx) {
+        super(ctx);
+
+        runningQryMgr = new RunningQueryManagerWrapper(ctx);
     }
 
     /** {@inheritDoc} */
-    @Override public RunningQuery register(RunningQuery qry) {
-        return runningQrys.computeIfAbsent(qry.id(), k -> qry);
+    @Override public RunningQuery register(String sql, String schema, RunningQuery qry) {
+        return runningQrys.computeIfAbsent(qry.id(), k -> {
+            String nodeId;
+            if (qry.initiatorNodeId() != null)
+                nodeId = qry.initiatorNodeId().toString();
+            else
+                nodeId = kctx.discovery().localNode().id().toString();
+
+            Long locId = runningQryMgr.register(sql, GridCacheQueryType.SQL_FIELDS, schema, false,
+                createCancelToken(qry), nodeId);
+
+            return Pair.of(locId, qry);
+        }).right;
     }
 
     /** {@inheritDoc} */
     @Override public RunningQuery query(UUID id) {
-        return runningQrys.get(id);
+        Pair<Long, RunningQuery> value = runningQrys.get(id);
+        return value != null ? value.right : null;
     }
 
     /** {@inheritDoc} */
-    @Override public void unregister(UUID id) {
-        runningQrys.remove(id);
+    @Override public void unregister(UUID id, @Nullable Throwable failReason) {
+        Pair<Long, RunningQuery> value = runningQrys.remove(id);
+        if (value != null)
+            runningQryMgr.unregister(value.left, failReason);
     }
 
     /** {@inheritDoc} */
     @Override public Collection<? extends RunningQuery> runningQueries() {
-        return runningQrys.values();
+        return runningQrys.values().stream().map(Pair::getValue).collect(Collectors.toList());
     }
 
     /** {@inheritDoc} */
     @Override public void tearDown() {
-        runningQrys.values().forEach(q -> IgniteUtils.close(q::cancel, log));
-
+        runningQrys.values().forEach(q -> IgniteUtils.close(q.right::cancel, log));
         runningQrys.clear();
     }
+
+    /** */
+    private static GridQueryCancel createCancelToken(RunningQuery qry) {
+        GridQueryCancel token = new GridQueryCancel();
+        try {
+            token.add(qry::cancel);
+        }
+        catch (QueryCancelledException e) {

Review comment:
       `e` -> `ignored` to avoid warnings in IDEA




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org