You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@ignite.apache.org by GitBox <gi...@apache.org> on 2021/09/17 10:44:14 UTC

[GitHub] [ignite] alex-plekhanov commented on a change in pull request #9412: IGNITE-15423 Support for kill queries.

alex-plekhanov commented on a change in pull request #9412:
URL: https://github.com/apache/ignite/pull/9412#discussion_r710885509



##########
File path: modules/calcite/src/main/codegen/includes/parserImpls.ftl
##########
@@ -391,3 +391,111 @@ SqlDrop SqlDropUser(Span s, boolean replace) :
         return new IgniteSqlDropUser(s.end(this), user);
     }
 }
+
+SqlNumericLiteral QueryIdLiteral() :
+{
+    final Span s;
+}
+{
+    <PLUS> <UNSIGNED_INTEGER_LITERAL> {
+        return SqlLiteral.createExactNumeric(token.image, getPos());
+    }
+|
+    <MINUS> { s = span(); } <UNSIGNED_INTEGER_LITERAL> {
+        return SqlLiteral.createNegative(SqlLiteral.createExactNumeric(token.image, getPos()), s.end(this));
+    }
+|
+    <UNSIGNED_INTEGER_LITERAL> {
+        return SqlLiteral.createExactNumeric(token.image, getPos());
+    }
+}
+
+SqlNode SqlKillScanQuery():
+{
+    final Span s;
+    final SqlCharStringLiteral originNodeId;
+    final SqlCharStringLiteral cacheName;
+    final SqlNumericLiteral queryId;
+    final String rawUuid;
+}{
+    <KILL> { s = span(); } <SCAN>
+    <QUOTED_STRING> {
+        rawUuid = SqlParserUtil.parseString(token.image);
+        if (!IgniteSqlKill.isUuid(rawUuid)) {
+            throw SqlUtil.newContextException(getPos(), IgniteResource.INSTANCE.illegalUuid(rawUuid));
+        }
+        originNodeId = SqlLiteral.createCharString(rawUuid, getPos());
+    }
+    <QUOTED_STRING> {
+        cacheName = SqlLiteral.createCharString(SqlParserUtil.parseString(token.image), getPos());
+    }
+    queryId = QueryIdLiteral() {
+        return IgniteSqlKill.createScanQueryKill(s.end(this), originNodeId, cacheName, queryId);
+    }
+}
+
+SqlNode SqlKillContinuousQuery():
+{
+    final Span s;
+    final SqlCharStringLiteral originNodeId;
+    final SqlCharStringLiteral routineId;
+    String rawUuid;
+}{
+    <KILL> { s = span(); } <CONTINUOUS>
+    <QUOTED_STRING> {
+        rawUuid = SqlParserUtil.parseString(token.image);

Review comment:
       Perhaps we can create a new method and deduplicate a little bit (also inline `IgniteSqlKill.isUuid` and get rid of this method in `IgniteSqlKill`)

##########
File path: modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/KillCommandDdlIntegrationTest.java
##########
@@ -0,0 +1,286 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ignite.internal.processors.query.calcite.integration;
+
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
+import javax.cache.Cache;
+import javax.cache.event.CacheEntryEvent;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.cache.query.ContinuousQuery;
+import org.apache.ignite.cache.query.QueryCursor;
+import org.apache.ignite.cache.query.ScanQuery;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.internal.processors.cache.query.GridCacheQueryManager;
+import org.apache.ignite.internal.util.typedef.G;
+import org.apache.ignite.lang.IgniteFuture;
+import org.apache.ignite.lang.IgniteUuid;
+import org.apache.ignite.services.Service;
+import org.apache.ignite.services.ServiceConfiguration;
+import org.apache.ignite.services.ServiceContext;
+import org.apache.ignite.transactions.Transaction;
+import org.junit.Test;
+
+import static org.apache.ignite.testframework.GridTestUtils.assertThrowsWithCause;
+import static org.apache.ignite.testframework.GridTestUtils.getFieldValue;
+import static org.apache.ignite.testframework.GridTestUtils.waitForCondition;
+
+/**
+ * Tests for KILL queries.
+ */
+public class KillCommandDdlIntegrationTest extends AbstractDdlIntegrationTest {
+    /** Page size. */
+    public static final int PAGE_SZ = 5;
+
+    /** Number of pages to insert. */
+    public static final int PAGES_CNT = 1000;
+
+    /** Operations timeout. */
+    public static final int TIMEOUT = 10_000;
+
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTestsStarted() throws Exception {
+        super.beforeTestsStarted();
+
+        IgniteCache<Object, Object> cache = client.getOrCreateCache(
+            new CacheConfiguration<>(DEFAULT_CACHE_NAME).setIndexedTypes(Integer.class, Integer.class)
+                .setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL));
+
+        // There must be enough cache entries to keep scan query cursor opened.
+        // Cursor may be concurrently closed when all the data retrieved.
+        for (int i = 0; i < PAGES_CNT * PAGE_SZ; i++)
+            cache.put(i, i);
+    }
+
+    /** */
+    @Override public void cleanUp() {
+        // No-op.
+    }
+
+    /** */
+    @Test
+    public void testCancelScanQuery() {
+        IgniteCache<Object, Object> cache = client.cache(DEFAULT_CACHE_NAME);
+
+        QueryCursor<Cache.Entry<Object, Object>> scanQry = cache.query(new ScanQuery<>().setPageSize(PAGE_SZ));
+        Iterator<Cache.Entry<Object, Object>> scanQryIter = scanQry.iterator();
+
+        // Fetch first entry and therefore caching first page.
+        assertNotNull(scanQryIter.next());
+
+        ConcurrentMap<UUID, GridCacheQueryManager<Object, Object>.RequestFutureMap> qryIters =
+            grid(0).context().cache().cache(DEFAULT_CACHE_NAME).context().queries().queryIterators();
+
+        assertEquals(qryIters.values().size(), 1);
+
+        long qryId = qryIters.values().iterator().next().keySet().iterator().next();
+        UUID originNodeId = client.cluster().localNode().id();
+
+        executeSql(client, "KILL SCAN '" + originNodeId + "' '" + DEFAULT_CACHE_NAME + "' " + qryId);
+
+        // Fetch all cached entries.
+        for (int i = 0; i < PAGE_SZ * servers().size() - 1; i++)
+            assertNotNull(scanQryIter.next());
+
+        // Fetch of the next page should throw the exception.
+        assertThrowsWithCause(scanQryIter::next, IgniteCheckedException.class);
+    }
+
+    /** */
+    @Test
+    public void testCancelComputeTask() {
+        CountDownLatch computeLatch = new CountDownLatch(1);
+
+        IgniteFuture<Collection<Integer>> fut = client.compute().broadcastAsync(() -> {
+            computeLatch.await();
+
+            return 1;
+        });
+
+        try {
+            IgniteUuid taskId = client.compute().activeTaskFutures().keySet().iterator().next();
+
+            executeSql(client, "KILL COMPUTE '" + taskId + "'");
+
+            assertThrowsWithCause(() -> fut.get(TIMEOUT), IgniteException.class);
+        } finally {

Review comment:
       `finally` should be on the next line according to codestyle

##########
File path: modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/KillCommandDdlIntegrationTest.java
##########
@@ -0,0 +1,286 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ignite.internal.processors.query.calcite.integration;
+
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
+import javax.cache.Cache;
+import javax.cache.event.CacheEntryEvent;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.cache.query.ContinuousQuery;
+import org.apache.ignite.cache.query.QueryCursor;
+import org.apache.ignite.cache.query.ScanQuery;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.internal.processors.cache.query.GridCacheQueryManager;
+import org.apache.ignite.internal.util.typedef.G;
+import org.apache.ignite.lang.IgniteFuture;
+import org.apache.ignite.lang.IgniteUuid;
+import org.apache.ignite.services.Service;
+import org.apache.ignite.services.ServiceConfiguration;
+import org.apache.ignite.services.ServiceContext;
+import org.apache.ignite.transactions.Transaction;
+import org.junit.Test;
+
+import static org.apache.ignite.testframework.GridTestUtils.assertThrowsWithCause;
+import static org.apache.ignite.testframework.GridTestUtils.getFieldValue;
+import static org.apache.ignite.testframework.GridTestUtils.waitForCondition;
+
+/**
+ * Tests for KILL queries.
+ */
+public class KillCommandDdlIntegrationTest extends AbstractDdlIntegrationTest {
+    /** Page size. */
+    public static final int PAGE_SZ = 5;
+
+    /** Number of pages to insert. */
+    public static final int PAGES_CNT = 1000;
+
+    /** Operations timeout. */
+    public static final int TIMEOUT = 10_000;
+
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTestsStarted() throws Exception {
+        super.beforeTestsStarted();
+
+        IgniteCache<Object, Object> cache = client.getOrCreateCache(
+            new CacheConfiguration<>(DEFAULT_CACHE_NAME).setIndexedTypes(Integer.class, Integer.class)
+                .setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL));
+
+        // There must be enough cache entries to keep scan query cursor opened.
+        // Cursor may be concurrently closed when all the data retrieved.
+        for (int i = 0; i < PAGES_CNT * PAGE_SZ; i++)
+            cache.put(i, i);
+    }
+
+    /** */
+    @Override public void cleanUp() {
+        // No-op.
+    }
+
+    /** */
+    @Test
+    public void testCancelScanQuery() {
+        IgniteCache<Object, Object> cache = client.cache(DEFAULT_CACHE_NAME);
+
+        QueryCursor<Cache.Entry<Object, Object>> scanQry = cache.query(new ScanQuery<>().setPageSize(PAGE_SZ));
+        Iterator<Cache.Entry<Object, Object>> scanQryIter = scanQry.iterator();
+
+        // Fetch first entry and therefore caching first page.
+        assertNotNull(scanQryIter.next());
+
+        ConcurrentMap<UUID, GridCacheQueryManager<Object, Object>.RequestFutureMap> qryIters =
+            grid(0).context().cache().cache(DEFAULT_CACHE_NAME).context().queries().queryIterators();
+
+        assertEquals(qryIters.values().size(), 1);
+
+        long qryId = qryIters.values().iterator().next().keySet().iterator().next();
+        UUID originNodeId = client.cluster().localNode().id();
+
+        executeSql(client, "KILL SCAN '" + originNodeId + "' '" + DEFAULT_CACHE_NAME + "' " + qryId);
+
+        // Fetch all cached entries.
+        for (int i = 0; i < PAGE_SZ * servers().size() - 1; i++)
+            assertNotNull(scanQryIter.next());
+
+        // Fetch of the next page should throw the exception.
+        assertThrowsWithCause(scanQryIter::next, IgniteCheckedException.class);
+    }
+
+    /** */
+    @Test
+    public void testCancelComputeTask() {
+        CountDownLatch computeLatch = new CountDownLatch(1);
+
+        IgniteFuture<Collection<Integer>> fut = client.compute().broadcastAsync(() -> {
+            computeLatch.await();
+
+            return 1;
+        });
+
+        try {
+            IgniteUuid taskId = client.compute().activeTaskFutures().keySet().iterator().next();
+
+            executeSql(client, "KILL COMPUTE '" + taskId + "'");
+
+            assertThrowsWithCause(() -> fut.get(TIMEOUT), IgniteException.class);
+        } finally {
+            computeLatch.countDown();
+        }
+    }
+
+    /** */
+    @Test
+    public void testCancelTx() {
+        IgniteCache<Object, Object> cache = client.cache(DEFAULT_CACHE_NAME);
+
+        int testKey = PAGES_CNT * (PAGE_SZ + 1);
+
+        try (Transaction tx = client.transactions().txStart()) {
+            cache.put(testKey, 1);
+
+            executeSql(client, "KILL TRANSACTION '" + tx.xid() + "'");
+
+            assertThrowsWithCause(tx::commit, IgniteException.class);
+        }
+
+        assertNull(cache.get(testKey));
+    }
+
+    /** @throws Exception If failed. */
+    @Test
+    public void testCancelService() throws Exception {
+        String serviceName = "MY_SERVICE";
+
+        ServiceConfiguration scfg = new ServiceConfiguration();
+        scfg.setName(serviceName);
+        scfg.setMaxPerNodeCount(1);
+        scfg.setNodeFilter(grid(0).cluster().predicate());
+        scfg.setService(new TestServiceImpl());
+
+        client.services().deploy(scfg);
+
+        TestService svc = client.services().serviceProxy(serviceName, TestService.class, true);
+        assertNotNull(svc);
+
+        executeSql(client, "KILL SERVICE '" + serviceName + "'");
+        boolean res = waitForCondition(() -> grid(0).services().serviceDescriptors().isEmpty(), TIMEOUT);
+        assertTrue(res);
+    }
+
+    /** @throws Exception If failed. */
+    @Test
+    public void testCancelContinuousQuery() throws Exception {
+        IgniteCache<Object, Object> cache = client.cache(DEFAULT_CACHE_NAME);
+
+        ContinuousQuery<Integer, Integer> cq = new ContinuousQuery<>();
+
+        AtomicInteger cntr = new AtomicInteger();
+
+        cq.setInitialQuery(new ScanQuery<>());
+        cq.setTimeInterval(1_000L);
+        cq.setPageSize(PAGE_SZ);
+        cq.setLocalListener(events -> {
+            for (CacheEntryEvent<? extends Integer, ? extends Integer> e : events) {
+                assertNotNull(e);
+
+                cntr.incrementAndGet();
+            }
+        });
+
+        cache.query(cq);
+
+        for (int i = 0; i < PAGE_SZ * PAGE_SZ; i++)
+            cache.put(i, i);
+
+        boolean res = waitForCondition(() -> cntr.get() == PAGE_SZ * PAGE_SZ, TIMEOUT);
+        assertTrue(res);
+
+        Map<UUID, Object> routines = getFieldValue(grid(0).context().continuous(), "rmtInfos");

Review comment:
       Let's use continues queries system view instead of reflection, for example:
   ```
           SystemView<ContinuousQueryView> qrys = grid(0).context().systemView().view(CQ_SYS_VIEW);
           assertEquals(1, qrys.size());
           ContinuousQueryView view = qrys.iterator().next();
   
           UUID nodeId = view.nodeId();
           UUID routineId = view.routineId();
   ```

##########
File path: modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/KillCommandDdlIntegrationTest.java
##########
@@ -0,0 +1,286 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ignite.internal.processors.query.calcite.integration;
+
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
+import javax.cache.Cache;
+import javax.cache.event.CacheEntryEvent;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.cache.query.ContinuousQuery;
+import org.apache.ignite.cache.query.QueryCursor;
+import org.apache.ignite.cache.query.ScanQuery;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.internal.processors.cache.query.GridCacheQueryManager;
+import org.apache.ignite.internal.util.typedef.G;
+import org.apache.ignite.lang.IgniteFuture;
+import org.apache.ignite.lang.IgniteUuid;
+import org.apache.ignite.services.Service;
+import org.apache.ignite.services.ServiceConfiguration;
+import org.apache.ignite.services.ServiceContext;
+import org.apache.ignite.transactions.Transaction;
+import org.junit.Test;
+
+import static org.apache.ignite.testframework.GridTestUtils.assertThrowsWithCause;
+import static org.apache.ignite.testframework.GridTestUtils.getFieldValue;
+import static org.apache.ignite.testframework.GridTestUtils.waitForCondition;
+
+/**
+ * Tests for KILL queries.
+ */
+public class KillCommandDdlIntegrationTest extends AbstractDdlIntegrationTest {
+    /** Page size. */
+    public static final int PAGE_SZ = 5;
+
+    /** Number of pages to insert. */
+    public static final int PAGES_CNT = 1000;
+
+    /** Operations timeout. */
+    public static final int TIMEOUT = 10_000;
+
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTestsStarted() throws Exception {
+        super.beforeTestsStarted();
+
+        IgniteCache<Object, Object> cache = client.getOrCreateCache(
+            new CacheConfiguration<>(DEFAULT_CACHE_NAME).setIndexedTypes(Integer.class, Integer.class)
+                .setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL));
+
+        // There must be enough cache entries to keep scan query cursor opened.
+        // Cursor may be concurrently closed when all the data retrieved.
+        for (int i = 0; i < PAGES_CNT * PAGE_SZ; i++)
+            cache.put(i, i);
+    }
+
+    /** */
+    @Override public void cleanUp() {
+        // No-op.
+    }
+
+    /** */
+    @Test
+    public void testCancelScanQuery() {
+        IgniteCache<Object, Object> cache = client.cache(DEFAULT_CACHE_NAME);
+
+        QueryCursor<Cache.Entry<Object, Object>> scanQry = cache.query(new ScanQuery<>().setPageSize(PAGE_SZ));
+        Iterator<Cache.Entry<Object, Object>> scanQryIter = scanQry.iterator();
+
+        // Fetch first entry and therefore caching first page.
+        assertNotNull(scanQryIter.next());
+
+        ConcurrentMap<UUID, GridCacheQueryManager<Object, Object>.RequestFutureMap> qryIters =
+            grid(0).context().cache().cache(DEFAULT_CACHE_NAME).context().queries().queryIterators();
+
+        assertEquals(qryIters.values().size(), 1);
+
+        long qryId = qryIters.values().iterator().next().keySet().iterator().next();
+        UUID originNodeId = client.cluster().localNode().id();
+
+        executeSql(client, "KILL SCAN '" + originNodeId + "' '" + DEFAULT_CACHE_NAME + "' " + qryId);
+
+        // Fetch all cached entries.
+        for (int i = 0; i < PAGE_SZ * servers().size() - 1; i++)
+            assertNotNull(scanQryIter.next());
+
+        // Fetch of the next page should throw the exception.
+        assertThrowsWithCause(scanQryIter::next, IgniteCheckedException.class);
+    }
+
+    /** */
+    @Test
+    public void testCancelComputeTask() {
+        CountDownLatch computeLatch = new CountDownLatch(1);
+
+        IgniteFuture<Collection<Integer>> fut = client.compute().broadcastAsync(() -> {
+            computeLatch.await();
+
+            return 1;
+        });
+
+        try {
+            IgniteUuid taskId = client.compute().activeTaskFutures().keySet().iterator().next();
+
+            executeSql(client, "KILL COMPUTE '" + taskId + "'");
+
+            assertThrowsWithCause(() -> fut.get(TIMEOUT), IgniteException.class);
+        } finally {
+            computeLatch.countDown();
+        }
+    }
+
+    /** */
+    @Test
+    public void testCancelTx() {
+        IgniteCache<Object, Object> cache = client.cache(DEFAULT_CACHE_NAME);
+
+        int testKey = PAGES_CNT * (PAGE_SZ + 1);
+
+        try (Transaction tx = client.transactions().txStart()) {
+            cache.put(testKey, 1);
+
+            executeSql(client, "KILL TRANSACTION '" + tx.xid() + "'");
+
+            assertThrowsWithCause(tx::commit, IgniteException.class);
+        }
+
+        assertNull(cache.get(testKey));
+    }
+
+    /** @throws Exception If failed. */
+    @Test
+    public void testCancelService() throws Exception {
+        String serviceName = "MY_SERVICE";
+
+        ServiceConfiguration scfg = new ServiceConfiguration();
+        scfg.setName(serviceName);
+        scfg.setMaxPerNodeCount(1);
+        scfg.setNodeFilter(grid(0).cluster().predicate());
+        scfg.setService(new TestServiceImpl());
+
+        client.services().deploy(scfg);
+
+        TestService svc = client.services().serviceProxy(serviceName, TestService.class, true);
+        assertNotNull(svc);
+
+        executeSql(client, "KILL SERVICE '" + serviceName + "'");
+        boolean res = waitForCondition(() -> grid(0).services().serviceDescriptors().isEmpty(), TIMEOUT);
+        assertTrue(res);
+    }
+
+    /** @throws Exception If failed. */
+    @Test
+    public void testCancelContinuousQuery() throws Exception {
+        IgniteCache<Object, Object> cache = client.cache(DEFAULT_CACHE_NAME);
+
+        ContinuousQuery<Integer, Integer> cq = new ContinuousQuery<>();
+
+        AtomicInteger cntr = new AtomicInteger();
+
+        cq.setInitialQuery(new ScanQuery<>());
+        cq.setTimeInterval(1_000L);
+        cq.setPageSize(PAGE_SZ);
+        cq.setLocalListener(events -> {
+            for (CacheEntryEvent<? extends Integer, ? extends Integer> e : events) {
+                assertNotNull(e);
+
+                cntr.incrementAndGet();
+            }
+        });
+
+        cache.query(cq);
+
+        for (int i = 0; i < PAGE_SZ * PAGE_SZ; i++)
+            cache.put(i, i);
+
+        boolean res = waitForCondition(() -> cntr.get() == PAGE_SZ * PAGE_SZ, TIMEOUT);
+        assertTrue(res);
+
+        Map<UUID, Object> routines = getFieldValue(grid(0).context().continuous(), "rmtInfos");
+        assertEquals(1, routines.size());
+        Map.Entry<UUID, Object> entry = routines.entrySet().iterator().next();
+
+        UUID nodeId = getFieldValue(entry.getValue(), "nodeId");
+        UUID routineId = entry.getKey();
+
+        executeSql(client, "KILL CONTINUOUS '" + nodeId + "' '" + routineId + "'");
+
+        long cnt = cntr.get();
+
+        for (int i = 0; i < PAGE_SZ * PAGE_SZ; i++)
+            cache.put(i, i);
+
+        res = waitForCondition(() -> cntr.get() > cnt, TIMEOUT);

Review comment:
       Too large timeout to wait for something not happening.
   Let's just wait for CQ system view become empty and check the counter after that:
   ```
           assertTrue(waitForCondition(() -> F.isEmpty(qrys), TIMEOUT));
           assertEquals(cnt, cntr.get());
   ```

##########
File path: modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/ddl/SqlToNativeCommandConverter.java
##########
@@ -165,4 +182,56 @@ private static SqlAlterUserCommand convertAlterUser(IgniteSqlAlterUser sqlCmd, P
     private static SqlDropUserCommand convertDropUser(IgniteSqlDropUser sqlCmd, PlanningContext ctx) {
         return new SqlDropUserCommand(sqlCmd.user().getSimple());
     }
+
+    /**
+     * Converts KILL ... command.
+     */
+    private static SqlCommand convertKill(IgniteSqlKill cmd, PlanningContext pctx) {
+        if (cmd instanceof IgniteSqlKillScanQuery) {
+            IgniteSqlKillScanQuery cmd0 = (IgniteSqlKillScanQuery)cmd;
+            return new SqlKillScanQueryCommand(
+                UUID.fromString(cmd0.nodeId().getValueAs(String.class)),
+                cmd0.cacheName().getValueAs(String.class),
+                cmd0.queryId().longValue(true)
+            );
+        }
+        else if (cmd instanceof IgniteSqlKillContinuousQuery) {
+            IgniteSqlKillContinuousQuery cmd0 = (IgniteSqlKillContinuousQuery)cmd;
+            return new SqlKillContinuousQueryCommand(
+                UUID.fromString(cmd0.nodeId().getValueAs(String.class)),
+                UUID.fromString(cmd0.routineId().getValueAs(String.class))
+            );
+        }
+        else if (cmd instanceof IgniteSqlKillService) {
+            IgniteSqlKillService cmd0 = (IgniteSqlKillService)cmd;
+            return new SqlKillServiceCommand(cmd0.serviceName().getValueAs(String.class));
+        }
+        else if (cmd instanceof IgniteSqlKillTransaction) {
+            IgniteSqlKillTransaction cmd0 = (IgniteSqlKillTransaction)cmd;
+            return new SqlKillTransactionCommand(cmd0.xid().getValueAs(String.class));
+        }
+        else if (cmd instanceof IgniteSqlKillComputeTask) {
+            IgniteSqlKillComputeTask cmd0 = ( IgniteSqlKillComputeTask)cmd;
+            IgniteUuid sessId = IgniteUuid.fromString(cmd0.sessionId().getValueAs(String.class));
+            return new SqlKillComputeTaskCommand(sessId);
+        }
+        else {
+            throw new IgniteSQLException("Unsupported native operation [" +
+                "cmdName=" + (cmd == null ? null : cmd.getClass().getSimpleName()) + "; " +
+                "querySql=\"" + pctx.query() + "\"]", IgniteQueryErrorCode.UNSUPPORTED_OPERATION);
+        }
+    }
+
+    /**
+     * Convert string literal to UUID.
+     */
+    private static UUID convertToUuid(SqlLiteral literal) {

Review comment:
       Not used




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org