You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@ignite.apache.org by GitBox <gi...@apache.org> on 2021/09/15 15:30:45 UTC

[GitHub] [ignite] ivandasch opened a new pull request #9412: IGNITE-15423 Support for kill queries.

ivandasch opened a new pull request #9412:
URL: https://github.com/apache/ignite/pull/9412


   Thank you for submitting the pull request to the Apache Ignite.
   
   In order to streamline the review of the contribution 
   we ask you to ensure the following steps have been taken:
   
   ### The Contribution Checklist
   - [ ] There is a single JIRA ticket related to the pull request. 
   - [ ] The web-link to the pull request is attached to the JIRA ticket.
   - [ ] The JIRA ticket has the _Patch Available_ state.
   - [ ] The pull request body describes changes that have been made. 
   The description explains _WHAT_ and _WHY_ was made instead of _HOW_.
   - [ ] The pull request title is treated as the final commit message. 
   The following pattern must be used: `IGNITE-XXXX Change summary` where `XXXX` - number of JIRA issue.
   - [ ] A reviewer has been mentioned through the JIRA comments 
   (see [the Maintainers list](https://cwiki.apache.org/confluence/display/IGNITE/How+to+Contribute#HowtoContribute-ReviewProcessandMaintainers)) 
   - [ ] The pull request has been checked by the Teamcity Bot and 
   the `green visa` attached to the JIRA ticket (see [TC.Bot: Check PR](https://mtcga.gridgain.com/prs.html))
   
   ### Notes
   - [How to Contribute](https://cwiki.apache.org/confluence/display/IGNITE/How+to+Contribute)
   - [Coding abbreviation rules](https://cwiki.apache.org/confluence/display/IGNITE/Abbreviation+Rules)
   - [Coding Guidelines](https://cwiki.apache.org/confluence/display/IGNITE/Coding+Guidelines)
   - [Apache Ignite Teamcity Bot](https://cwiki.apache.org/confluence/display/IGNITE/Apache+Ignite+Teamcity+Bot)
   
   If you need any help, please email dev@ignite.apache.org or ask anу advice on http://asf.slack.com _#ignite_ channel.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite] ivandasch closed pull request #9412: IGNITE-15423 Support for kill queries.

Posted by GitBox <gi...@apache.org>.
ivandasch closed pull request #9412:
URL: https://github.com/apache/ignite/pull/9412


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite] ivandasch commented on a change in pull request #9412: IGNITE-15423 Support for kill queries.

Posted by GitBox <gi...@apache.org>.
ivandasch commented on a change in pull request #9412:
URL: https://github.com/apache/ignite/pull/9412#discussion_r710966716



##########
File path: modules/calcite/src/main/codegen/includes/parserImpls.ftl
##########
@@ -391,3 +391,111 @@ SqlDrop SqlDropUser(Span s, boolean replace) :
         return new IgniteSqlDropUser(s.end(this), user);
     }
 }
+
+SqlNumericLiteral QueryIdLiteral() :
+{
+    final Span s;
+}
+{
+    <PLUS> <UNSIGNED_INTEGER_LITERAL> {
+        return SqlLiteral.createExactNumeric(token.image, getPos());
+    }
+|
+    <MINUS> { s = span(); } <UNSIGNED_INTEGER_LITERAL> {
+        return SqlLiteral.createNegative(SqlLiteral.createExactNumeric(token.image, getPos()), s.end(this));
+    }
+|
+    <UNSIGNED_INTEGER_LITERAL> {
+        return SqlLiteral.createExactNumeric(token.image, getPos());
+    }
+}
+
+SqlNode SqlKillScanQuery():
+{
+    final Span s;
+    final SqlCharStringLiteral originNodeId;
+    final SqlCharStringLiteral cacheName;
+    final SqlNumericLiteral queryId;
+    final String rawUuid;
+}{
+    <KILL> { s = span(); } <SCAN>
+    <QUOTED_STRING> {
+        rawUuid = SqlParserUtil.parseString(token.image);
+        if (!IgniteSqlKill.isUuid(rawUuid)) {
+            throw SqlUtil.newContextException(getPos(), IgniteResource.INSTANCE.illegalUuid(rawUuid));
+        }
+        originNodeId = SqlLiteral.createCharString(rawUuid, getPos());
+    }
+    <QUOTED_STRING> {
+        cacheName = SqlLiteral.createCharString(SqlParserUtil.parseString(token.image), getPos());
+    }
+    queryId = QueryIdLiteral() {
+        return IgniteSqlKill.createScanQueryKill(s.end(this), originNodeId, cacheName, queryId);
+    }
+}
+
+SqlNode SqlKillContinuousQuery():
+{
+    final Span s;
+    final SqlCharStringLiteral originNodeId;
+    final SqlCharStringLiteral routineId;
+    String rawUuid;
+}{
+    <KILL> { s = span(); } <CONTINUOUS>
+    <QUOTED_STRING> {
+        rawUuid = SqlParserUtil.parseString(token.image);

Review comment:
       What is inline? Java is not C++




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite] alex-plekhanov commented on a change in pull request #9412: IGNITE-15423 Support for kill queries.

Posted by GitBox <gi...@apache.org>.
alex-plekhanov commented on a change in pull request #9412:
URL: https://github.com/apache/ignite/pull/9412#discussion_r710885509



##########
File path: modules/calcite/src/main/codegen/includes/parserImpls.ftl
##########
@@ -391,3 +391,111 @@ SqlDrop SqlDropUser(Span s, boolean replace) :
         return new IgniteSqlDropUser(s.end(this), user);
     }
 }
+
+SqlNumericLiteral QueryIdLiteral() :
+{
+    final Span s;
+}
+{
+    <PLUS> <UNSIGNED_INTEGER_LITERAL> {
+        return SqlLiteral.createExactNumeric(token.image, getPos());
+    }
+|
+    <MINUS> { s = span(); } <UNSIGNED_INTEGER_LITERAL> {
+        return SqlLiteral.createNegative(SqlLiteral.createExactNumeric(token.image, getPos()), s.end(this));
+    }
+|
+    <UNSIGNED_INTEGER_LITERAL> {
+        return SqlLiteral.createExactNumeric(token.image, getPos());
+    }
+}
+
+SqlNode SqlKillScanQuery():
+{
+    final Span s;
+    final SqlCharStringLiteral originNodeId;
+    final SqlCharStringLiteral cacheName;
+    final SqlNumericLiteral queryId;
+    final String rawUuid;
+}{
+    <KILL> { s = span(); } <SCAN>
+    <QUOTED_STRING> {
+        rawUuid = SqlParserUtil.parseString(token.image);
+        if (!IgniteSqlKill.isUuid(rawUuid)) {
+            throw SqlUtil.newContextException(getPos(), IgniteResource.INSTANCE.illegalUuid(rawUuid));
+        }
+        originNodeId = SqlLiteral.createCharString(rawUuid, getPos());
+    }
+    <QUOTED_STRING> {
+        cacheName = SqlLiteral.createCharString(SqlParserUtil.parseString(token.image), getPos());
+    }
+    queryId = QueryIdLiteral() {
+        return IgniteSqlKill.createScanQueryKill(s.end(this), originNodeId, cacheName, queryId);
+    }
+}
+
+SqlNode SqlKillContinuousQuery():
+{
+    final Span s;
+    final SqlCharStringLiteral originNodeId;
+    final SqlCharStringLiteral routineId;
+    String rawUuid;
+}{
+    <KILL> { s = span(); } <CONTINUOUS>
+    <QUOTED_STRING> {
+        rawUuid = SqlParserUtil.parseString(token.image);

Review comment:
       Perhaps we can create a new method and deduplicate a little bit (also inline `IgniteSqlKill.isUuid` and get rid of this method in `IgniteSqlKill`)

##########
File path: modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/KillCommandDdlIntegrationTest.java
##########
@@ -0,0 +1,286 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ignite.internal.processors.query.calcite.integration;
+
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
+import javax.cache.Cache;
+import javax.cache.event.CacheEntryEvent;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.cache.query.ContinuousQuery;
+import org.apache.ignite.cache.query.QueryCursor;
+import org.apache.ignite.cache.query.ScanQuery;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.internal.processors.cache.query.GridCacheQueryManager;
+import org.apache.ignite.internal.util.typedef.G;
+import org.apache.ignite.lang.IgniteFuture;
+import org.apache.ignite.lang.IgniteUuid;
+import org.apache.ignite.services.Service;
+import org.apache.ignite.services.ServiceConfiguration;
+import org.apache.ignite.services.ServiceContext;
+import org.apache.ignite.transactions.Transaction;
+import org.junit.Test;
+
+import static org.apache.ignite.testframework.GridTestUtils.assertThrowsWithCause;
+import static org.apache.ignite.testframework.GridTestUtils.getFieldValue;
+import static org.apache.ignite.testframework.GridTestUtils.waitForCondition;
+
+/**
+ * Tests for KILL queries.
+ */
+public class KillCommandDdlIntegrationTest extends AbstractDdlIntegrationTest {
+    /** Page size. */
+    public static final int PAGE_SZ = 5;
+
+    /** Number of pages to insert. */
+    public static final int PAGES_CNT = 1000;
+
+    /** Operations timeout. */
+    public static final int TIMEOUT = 10_000;
+
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTestsStarted() throws Exception {
+        super.beforeTestsStarted();
+
+        IgniteCache<Object, Object> cache = client.getOrCreateCache(
+            new CacheConfiguration<>(DEFAULT_CACHE_NAME).setIndexedTypes(Integer.class, Integer.class)
+                .setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL));
+
+        // There must be enough cache entries to keep scan query cursor opened.
+        // Cursor may be concurrently closed when all the data retrieved.
+        for (int i = 0; i < PAGES_CNT * PAGE_SZ; i++)
+            cache.put(i, i);
+    }
+
+    /** */
+    @Override public void cleanUp() {
+        // No-op.
+    }
+
+    /** */
+    @Test
+    public void testCancelScanQuery() {
+        IgniteCache<Object, Object> cache = client.cache(DEFAULT_CACHE_NAME);
+
+        QueryCursor<Cache.Entry<Object, Object>> scanQry = cache.query(new ScanQuery<>().setPageSize(PAGE_SZ));
+        Iterator<Cache.Entry<Object, Object>> scanQryIter = scanQry.iterator();
+
+        // Fetch first entry and therefore caching first page.
+        assertNotNull(scanQryIter.next());
+
+        ConcurrentMap<UUID, GridCacheQueryManager<Object, Object>.RequestFutureMap> qryIters =
+            grid(0).context().cache().cache(DEFAULT_CACHE_NAME).context().queries().queryIterators();
+
+        assertEquals(qryIters.values().size(), 1);
+
+        long qryId = qryIters.values().iterator().next().keySet().iterator().next();
+        UUID originNodeId = client.cluster().localNode().id();
+
+        executeSql(client, "KILL SCAN '" + originNodeId + "' '" + DEFAULT_CACHE_NAME + "' " + qryId);
+
+        // Fetch all cached entries.
+        for (int i = 0; i < PAGE_SZ * servers().size() - 1; i++)
+            assertNotNull(scanQryIter.next());
+
+        // Fetch of the next page should throw the exception.
+        assertThrowsWithCause(scanQryIter::next, IgniteCheckedException.class);
+    }
+
+    /** */
+    @Test
+    public void testCancelComputeTask() {
+        CountDownLatch computeLatch = new CountDownLatch(1);
+
+        IgniteFuture<Collection<Integer>> fut = client.compute().broadcastAsync(() -> {
+            computeLatch.await();
+
+            return 1;
+        });
+
+        try {
+            IgniteUuid taskId = client.compute().activeTaskFutures().keySet().iterator().next();
+
+            executeSql(client, "KILL COMPUTE '" + taskId + "'");
+
+            assertThrowsWithCause(() -> fut.get(TIMEOUT), IgniteException.class);
+        } finally {

Review comment:
       `finally` should be on the next line according to codestyle

##########
File path: modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/KillCommandDdlIntegrationTest.java
##########
@@ -0,0 +1,286 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ignite.internal.processors.query.calcite.integration;
+
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
+import javax.cache.Cache;
+import javax.cache.event.CacheEntryEvent;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.cache.query.ContinuousQuery;
+import org.apache.ignite.cache.query.QueryCursor;
+import org.apache.ignite.cache.query.ScanQuery;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.internal.processors.cache.query.GridCacheQueryManager;
+import org.apache.ignite.internal.util.typedef.G;
+import org.apache.ignite.lang.IgniteFuture;
+import org.apache.ignite.lang.IgniteUuid;
+import org.apache.ignite.services.Service;
+import org.apache.ignite.services.ServiceConfiguration;
+import org.apache.ignite.services.ServiceContext;
+import org.apache.ignite.transactions.Transaction;
+import org.junit.Test;
+
+import static org.apache.ignite.testframework.GridTestUtils.assertThrowsWithCause;
+import static org.apache.ignite.testframework.GridTestUtils.getFieldValue;
+import static org.apache.ignite.testframework.GridTestUtils.waitForCondition;
+
+/**
+ * Tests for KILL queries.
+ */
+public class KillCommandDdlIntegrationTest extends AbstractDdlIntegrationTest {
+    /** Page size. */
+    public static final int PAGE_SZ = 5;
+
+    /** Number of pages to insert. */
+    public static final int PAGES_CNT = 1000;
+
+    /** Operations timeout. */
+    public static final int TIMEOUT = 10_000;
+
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTestsStarted() throws Exception {
+        super.beforeTestsStarted();
+
+        IgniteCache<Object, Object> cache = client.getOrCreateCache(
+            new CacheConfiguration<>(DEFAULT_CACHE_NAME).setIndexedTypes(Integer.class, Integer.class)
+                .setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL));
+
+        // There must be enough cache entries to keep scan query cursor opened.
+        // Cursor may be concurrently closed when all the data retrieved.
+        for (int i = 0; i < PAGES_CNT * PAGE_SZ; i++)
+            cache.put(i, i);
+    }
+
+    /** */
+    @Override public void cleanUp() {
+        // No-op.
+    }
+
+    /** */
+    @Test
+    public void testCancelScanQuery() {
+        IgniteCache<Object, Object> cache = client.cache(DEFAULT_CACHE_NAME);
+
+        QueryCursor<Cache.Entry<Object, Object>> scanQry = cache.query(new ScanQuery<>().setPageSize(PAGE_SZ));
+        Iterator<Cache.Entry<Object, Object>> scanQryIter = scanQry.iterator();
+
+        // Fetch first entry and therefore caching first page.
+        assertNotNull(scanQryIter.next());
+
+        ConcurrentMap<UUID, GridCacheQueryManager<Object, Object>.RequestFutureMap> qryIters =
+            grid(0).context().cache().cache(DEFAULT_CACHE_NAME).context().queries().queryIterators();
+
+        assertEquals(qryIters.values().size(), 1);
+
+        long qryId = qryIters.values().iterator().next().keySet().iterator().next();
+        UUID originNodeId = client.cluster().localNode().id();
+
+        executeSql(client, "KILL SCAN '" + originNodeId + "' '" + DEFAULT_CACHE_NAME + "' " + qryId);
+
+        // Fetch all cached entries.
+        for (int i = 0; i < PAGE_SZ * servers().size() - 1; i++)
+            assertNotNull(scanQryIter.next());
+
+        // Fetch of the next page should throw the exception.
+        assertThrowsWithCause(scanQryIter::next, IgniteCheckedException.class);
+    }
+
+    /** */
+    @Test
+    public void testCancelComputeTask() {
+        CountDownLatch computeLatch = new CountDownLatch(1);
+
+        IgniteFuture<Collection<Integer>> fut = client.compute().broadcastAsync(() -> {
+            computeLatch.await();
+
+            return 1;
+        });
+
+        try {
+            IgniteUuid taskId = client.compute().activeTaskFutures().keySet().iterator().next();
+
+            executeSql(client, "KILL COMPUTE '" + taskId + "'");
+
+            assertThrowsWithCause(() -> fut.get(TIMEOUT), IgniteException.class);
+        } finally {
+            computeLatch.countDown();
+        }
+    }
+
+    /** */
+    @Test
+    public void testCancelTx() {
+        IgniteCache<Object, Object> cache = client.cache(DEFAULT_CACHE_NAME);
+
+        int testKey = PAGES_CNT * (PAGE_SZ + 1);
+
+        try (Transaction tx = client.transactions().txStart()) {
+            cache.put(testKey, 1);
+
+            executeSql(client, "KILL TRANSACTION '" + tx.xid() + "'");
+
+            assertThrowsWithCause(tx::commit, IgniteException.class);
+        }
+
+        assertNull(cache.get(testKey));
+    }
+
+    /** @throws Exception If failed. */
+    @Test
+    public void testCancelService() throws Exception {
+        String serviceName = "MY_SERVICE";
+
+        ServiceConfiguration scfg = new ServiceConfiguration();
+        scfg.setName(serviceName);
+        scfg.setMaxPerNodeCount(1);
+        scfg.setNodeFilter(grid(0).cluster().predicate());
+        scfg.setService(new TestServiceImpl());
+
+        client.services().deploy(scfg);
+
+        TestService svc = client.services().serviceProxy(serviceName, TestService.class, true);
+        assertNotNull(svc);
+
+        executeSql(client, "KILL SERVICE '" + serviceName + "'");
+        boolean res = waitForCondition(() -> grid(0).services().serviceDescriptors().isEmpty(), TIMEOUT);
+        assertTrue(res);
+    }
+
+    /** @throws Exception If failed. */
+    @Test
+    public void testCancelContinuousQuery() throws Exception {
+        IgniteCache<Object, Object> cache = client.cache(DEFAULT_CACHE_NAME);
+
+        ContinuousQuery<Integer, Integer> cq = new ContinuousQuery<>();
+
+        AtomicInteger cntr = new AtomicInteger();
+
+        cq.setInitialQuery(new ScanQuery<>());
+        cq.setTimeInterval(1_000L);
+        cq.setPageSize(PAGE_SZ);
+        cq.setLocalListener(events -> {
+            for (CacheEntryEvent<? extends Integer, ? extends Integer> e : events) {
+                assertNotNull(e);
+
+                cntr.incrementAndGet();
+            }
+        });
+
+        cache.query(cq);
+
+        for (int i = 0; i < PAGE_SZ * PAGE_SZ; i++)
+            cache.put(i, i);
+
+        boolean res = waitForCondition(() -> cntr.get() == PAGE_SZ * PAGE_SZ, TIMEOUT);
+        assertTrue(res);
+
+        Map<UUID, Object> routines = getFieldValue(grid(0).context().continuous(), "rmtInfos");

Review comment:
       Let's use continues queries system view instead of reflection, for example:
   ```
           SystemView<ContinuousQueryView> qrys = grid(0).context().systemView().view(CQ_SYS_VIEW);
           assertEquals(1, qrys.size());
           ContinuousQueryView view = qrys.iterator().next();
   
           UUID nodeId = view.nodeId();
           UUID routineId = view.routineId();
   ```

##########
File path: modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/KillCommandDdlIntegrationTest.java
##########
@@ -0,0 +1,286 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ignite.internal.processors.query.calcite.integration;
+
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
+import javax.cache.Cache;
+import javax.cache.event.CacheEntryEvent;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.cache.query.ContinuousQuery;
+import org.apache.ignite.cache.query.QueryCursor;
+import org.apache.ignite.cache.query.ScanQuery;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.internal.processors.cache.query.GridCacheQueryManager;
+import org.apache.ignite.internal.util.typedef.G;
+import org.apache.ignite.lang.IgniteFuture;
+import org.apache.ignite.lang.IgniteUuid;
+import org.apache.ignite.services.Service;
+import org.apache.ignite.services.ServiceConfiguration;
+import org.apache.ignite.services.ServiceContext;
+import org.apache.ignite.transactions.Transaction;
+import org.junit.Test;
+
+import static org.apache.ignite.testframework.GridTestUtils.assertThrowsWithCause;
+import static org.apache.ignite.testframework.GridTestUtils.getFieldValue;
+import static org.apache.ignite.testframework.GridTestUtils.waitForCondition;
+
+/**
+ * Tests for KILL queries.
+ */
+public class KillCommandDdlIntegrationTest extends AbstractDdlIntegrationTest {
+    /** Page size. */
+    public static final int PAGE_SZ = 5;
+
+    /** Number of pages to insert. */
+    public static final int PAGES_CNT = 1000;
+
+    /** Operations timeout. */
+    public static final int TIMEOUT = 10_000;
+
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTestsStarted() throws Exception {
+        super.beforeTestsStarted();
+
+        IgniteCache<Object, Object> cache = client.getOrCreateCache(
+            new CacheConfiguration<>(DEFAULT_CACHE_NAME).setIndexedTypes(Integer.class, Integer.class)
+                .setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL));
+
+        // There must be enough cache entries to keep scan query cursor opened.
+        // Cursor may be concurrently closed when all the data retrieved.
+        for (int i = 0; i < PAGES_CNT * PAGE_SZ; i++)
+            cache.put(i, i);
+    }
+
+    /** */
+    @Override public void cleanUp() {
+        // No-op.
+    }
+
+    /** */
+    @Test
+    public void testCancelScanQuery() {
+        IgniteCache<Object, Object> cache = client.cache(DEFAULT_CACHE_NAME);
+
+        QueryCursor<Cache.Entry<Object, Object>> scanQry = cache.query(new ScanQuery<>().setPageSize(PAGE_SZ));
+        Iterator<Cache.Entry<Object, Object>> scanQryIter = scanQry.iterator();
+
+        // Fetch first entry and therefore caching first page.
+        assertNotNull(scanQryIter.next());
+
+        ConcurrentMap<UUID, GridCacheQueryManager<Object, Object>.RequestFutureMap> qryIters =
+            grid(0).context().cache().cache(DEFAULT_CACHE_NAME).context().queries().queryIterators();
+
+        assertEquals(qryIters.values().size(), 1);
+
+        long qryId = qryIters.values().iterator().next().keySet().iterator().next();
+        UUID originNodeId = client.cluster().localNode().id();
+
+        executeSql(client, "KILL SCAN '" + originNodeId + "' '" + DEFAULT_CACHE_NAME + "' " + qryId);
+
+        // Fetch all cached entries.
+        for (int i = 0; i < PAGE_SZ * servers().size() - 1; i++)
+            assertNotNull(scanQryIter.next());
+
+        // Fetch of the next page should throw the exception.
+        assertThrowsWithCause(scanQryIter::next, IgniteCheckedException.class);
+    }
+
+    /** */
+    @Test
+    public void testCancelComputeTask() {
+        CountDownLatch computeLatch = new CountDownLatch(1);
+
+        IgniteFuture<Collection<Integer>> fut = client.compute().broadcastAsync(() -> {
+            computeLatch.await();
+
+            return 1;
+        });
+
+        try {
+            IgniteUuid taskId = client.compute().activeTaskFutures().keySet().iterator().next();
+
+            executeSql(client, "KILL COMPUTE '" + taskId + "'");
+
+            assertThrowsWithCause(() -> fut.get(TIMEOUT), IgniteException.class);
+        } finally {
+            computeLatch.countDown();
+        }
+    }
+
+    /** */
+    @Test
+    public void testCancelTx() {
+        IgniteCache<Object, Object> cache = client.cache(DEFAULT_CACHE_NAME);
+
+        int testKey = PAGES_CNT * (PAGE_SZ + 1);
+
+        try (Transaction tx = client.transactions().txStart()) {
+            cache.put(testKey, 1);
+
+            executeSql(client, "KILL TRANSACTION '" + tx.xid() + "'");
+
+            assertThrowsWithCause(tx::commit, IgniteException.class);
+        }
+
+        assertNull(cache.get(testKey));
+    }
+
+    /** @throws Exception If failed. */
+    @Test
+    public void testCancelService() throws Exception {
+        String serviceName = "MY_SERVICE";
+
+        ServiceConfiguration scfg = new ServiceConfiguration();
+        scfg.setName(serviceName);
+        scfg.setMaxPerNodeCount(1);
+        scfg.setNodeFilter(grid(0).cluster().predicate());
+        scfg.setService(new TestServiceImpl());
+
+        client.services().deploy(scfg);
+
+        TestService svc = client.services().serviceProxy(serviceName, TestService.class, true);
+        assertNotNull(svc);
+
+        executeSql(client, "KILL SERVICE '" + serviceName + "'");
+        boolean res = waitForCondition(() -> grid(0).services().serviceDescriptors().isEmpty(), TIMEOUT);
+        assertTrue(res);
+    }
+
+    /** @throws Exception If failed. */
+    @Test
+    public void testCancelContinuousQuery() throws Exception {
+        IgniteCache<Object, Object> cache = client.cache(DEFAULT_CACHE_NAME);
+
+        ContinuousQuery<Integer, Integer> cq = new ContinuousQuery<>();
+
+        AtomicInteger cntr = new AtomicInteger();
+
+        cq.setInitialQuery(new ScanQuery<>());
+        cq.setTimeInterval(1_000L);
+        cq.setPageSize(PAGE_SZ);
+        cq.setLocalListener(events -> {
+            for (CacheEntryEvent<? extends Integer, ? extends Integer> e : events) {
+                assertNotNull(e);
+
+                cntr.incrementAndGet();
+            }
+        });
+
+        cache.query(cq);
+
+        for (int i = 0; i < PAGE_SZ * PAGE_SZ; i++)
+            cache.put(i, i);
+
+        boolean res = waitForCondition(() -> cntr.get() == PAGE_SZ * PAGE_SZ, TIMEOUT);
+        assertTrue(res);
+
+        Map<UUID, Object> routines = getFieldValue(grid(0).context().continuous(), "rmtInfos");
+        assertEquals(1, routines.size());
+        Map.Entry<UUID, Object> entry = routines.entrySet().iterator().next();
+
+        UUID nodeId = getFieldValue(entry.getValue(), "nodeId");
+        UUID routineId = entry.getKey();
+
+        executeSql(client, "KILL CONTINUOUS '" + nodeId + "' '" + routineId + "'");
+
+        long cnt = cntr.get();
+
+        for (int i = 0; i < PAGE_SZ * PAGE_SZ; i++)
+            cache.put(i, i);
+
+        res = waitForCondition(() -> cntr.get() > cnt, TIMEOUT);

Review comment:
       Too large timeout to wait for something not happening.
   Let's just wait for CQ system view become empty and check the counter after that:
   ```
           assertTrue(waitForCondition(() -> F.isEmpty(qrys), TIMEOUT));
           assertEquals(cnt, cntr.get());
   ```

##########
File path: modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/ddl/SqlToNativeCommandConverter.java
##########
@@ -165,4 +182,56 @@ private static SqlAlterUserCommand convertAlterUser(IgniteSqlAlterUser sqlCmd, P
     private static SqlDropUserCommand convertDropUser(IgniteSqlDropUser sqlCmd, PlanningContext ctx) {
         return new SqlDropUserCommand(sqlCmd.user().getSimple());
     }
+
+    /**
+     * Converts KILL ... command.
+     */
+    private static SqlCommand convertKill(IgniteSqlKill cmd, PlanningContext pctx) {
+        if (cmd instanceof IgniteSqlKillScanQuery) {
+            IgniteSqlKillScanQuery cmd0 = (IgniteSqlKillScanQuery)cmd;
+            return new SqlKillScanQueryCommand(
+                UUID.fromString(cmd0.nodeId().getValueAs(String.class)),
+                cmd0.cacheName().getValueAs(String.class),
+                cmd0.queryId().longValue(true)
+            );
+        }
+        else if (cmd instanceof IgniteSqlKillContinuousQuery) {
+            IgniteSqlKillContinuousQuery cmd0 = (IgniteSqlKillContinuousQuery)cmd;
+            return new SqlKillContinuousQueryCommand(
+                UUID.fromString(cmd0.nodeId().getValueAs(String.class)),
+                UUID.fromString(cmd0.routineId().getValueAs(String.class))
+            );
+        }
+        else if (cmd instanceof IgniteSqlKillService) {
+            IgniteSqlKillService cmd0 = (IgniteSqlKillService)cmd;
+            return new SqlKillServiceCommand(cmd0.serviceName().getValueAs(String.class));
+        }
+        else if (cmd instanceof IgniteSqlKillTransaction) {
+            IgniteSqlKillTransaction cmd0 = (IgniteSqlKillTransaction)cmd;
+            return new SqlKillTransactionCommand(cmd0.xid().getValueAs(String.class));
+        }
+        else if (cmd instanceof IgniteSqlKillComputeTask) {
+            IgniteSqlKillComputeTask cmd0 = ( IgniteSqlKillComputeTask)cmd;
+            IgniteUuid sessId = IgniteUuid.fromString(cmd0.sessionId().getValueAs(String.class));
+            return new SqlKillComputeTaskCommand(sessId);
+        }
+        else {
+            throw new IgniteSQLException("Unsupported native operation [" +
+                "cmdName=" + (cmd == null ? null : cmd.getClass().getSimpleName()) + "; " +
+                "querySql=\"" + pctx.query() + "\"]", IgniteQueryErrorCode.UNSUPPORTED_OPERATION);
+        }
+    }
+
+    /**
+     * Convert string literal to UUID.
+     */
+    private static UUID convertToUuid(SqlLiteral literal) {

Review comment:
       Not used




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite] ivandasch commented on a change in pull request #9412: IGNITE-15423 Support for kill queries.

Posted by GitBox <gi...@apache.org>.
ivandasch commented on a change in pull request #9412:
URL: https://github.com/apache/ignite/pull/9412#discussion_r710966386



##########
File path: modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/KillCommandDdlIntegrationTest.java
##########
@@ -0,0 +1,286 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ignite.internal.processors.query.calcite.integration;
+
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
+import javax.cache.Cache;
+import javax.cache.event.CacheEntryEvent;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.cache.query.ContinuousQuery;
+import org.apache.ignite.cache.query.QueryCursor;
+import org.apache.ignite.cache.query.ScanQuery;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.internal.processors.cache.query.GridCacheQueryManager;
+import org.apache.ignite.internal.util.typedef.G;
+import org.apache.ignite.lang.IgniteFuture;
+import org.apache.ignite.lang.IgniteUuid;
+import org.apache.ignite.services.Service;
+import org.apache.ignite.services.ServiceConfiguration;
+import org.apache.ignite.services.ServiceContext;
+import org.apache.ignite.transactions.Transaction;
+import org.junit.Test;
+
+import static org.apache.ignite.testframework.GridTestUtils.assertThrowsWithCause;
+import static org.apache.ignite.testframework.GridTestUtils.getFieldValue;
+import static org.apache.ignite.testframework.GridTestUtils.waitForCondition;
+
+/**
+ * Tests for KILL queries.
+ */
+public class KillCommandDdlIntegrationTest extends AbstractDdlIntegrationTest {
+    /** Page size. */
+    public static final int PAGE_SZ = 5;
+
+    /** Number of pages to insert. */
+    public static final int PAGES_CNT = 1000;
+
+    /** Operations timeout. */
+    public static final int TIMEOUT = 10_000;
+
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTestsStarted() throws Exception {
+        super.beforeTestsStarted();
+
+        IgniteCache<Object, Object> cache = client.getOrCreateCache(
+            new CacheConfiguration<>(DEFAULT_CACHE_NAME).setIndexedTypes(Integer.class, Integer.class)
+                .setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL));
+
+        // There must be enough cache entries to keep scan query cursor opened.
+        // Cursor may be concurrently closed when all the data retrieved.
+        for (int i = 0; i < PAGES_CNT * PAGE_SZ; i++)
+            cache.put(i, i);
+    }
+
+    /** */
+    @Override public void cleanUp() {
+        // No-op.
+    }
+
+    /** */
+    @Test
+    public void testCancelScanQuery() {
+        IgniteCache<Object, Object> cache = client.cache(DEFAULT_CACHE_NAME);
+
+        QueryCursor<Cache.Entry<Object, Object>> scanQry = cache.query(new ScanQuery<>().setPageSize(PAGE_SZ));
+        Iterator<Cache.Entry<Object, Object>> scanQryIter = scanQry.iterator();
+
+        // Fetch first entry and therefore caching first page.
+        assertNotNull(scanQryIter.next());
+
+        ConcurrentMap<UUID, GridCacheQueryManager<Object, Object>.RequestFutureMap> qryIters =
+            grid(0).context().cache().cache(DEFAULT_CACHE_NAME).context().queries().queryIterators();
+
+        assertEquals(qryIters.values().size(), 1);
+
+        long qryId = qryIters.values().iterator().next().keySet().iterator().next();
+        UUID originNodeId = client.cluster().localNode().id();
+
+        executeSql(client, "KILL SCAN '" + originNodeId + "' '" + DEFAULT_CACHE_NAME + "' " + qryId);
+
+        // Fetch all cached entries.
+        for (int i = 0; i < PAGE_SZ * servers().size() - 1; i++)
+            assertNotNull(scanQryIter.next());
+
+        // Fetch of the next page should throw the exception.
+        assertThrowsWithCause(scanQryIter::next, IgniteCheckedException.class);
+    }
+
+    /** */
+    @Test
+    public void testCancelComputeTask() {
+        CountDownLatch computeLatch = new CountDownLatch(1);
+
+        IgniteFuture<Collection<Integer>> fut = client.compute().broadcastAsync(() -> {
+            computeLatch.await();
+
+            return 1;
+        });
+
+        try {
+            IgniteUuid taskId = client.compute().activeTaskFutures().keySet().iterator().next();
+
+            executeSql(client, "KILL COMPUTE '" + taskId + "'");
+
+            assertThrowsWithCause(() -> fut.get(TIMEOUT), IgniteException.class);
+        } finally {
+            computeLatch.countDown();
+        }
+    }
+
+    /** */
+    @Test
+    public void testCancelTx() {
+        IgniteCache<Object, Object> cache = client.cache(DEFAULT_CACHE_NAME);
+
+        int testKey = PAGES_CNT * (PAGE_SZ + 1);
+
+        try (Transaction tx = client.transactions().txStart()) {
+            cache.put(testKey, 1);
+
+            executeSql(client, "KILL TRANSACTION '" + tx.xid() + "'");
+
+            assertThrowsWithCause(tx::commit, IgniteException.class);
+        }
+
+        assertNull(cache.get(testKey));
+    }
+
+    /** @throws Exception If failed. */
+    @Test
+    public void testCancelService() throws Exception {
+        String serviceName = "MY_SERVICE";
+
+        ServiceConfiguration scfg = new ServiceConfiguration();
+        scfg.setName(serviceName);
+        scfg.setMaxPerNodeCount(1);
+        scfg.setNodeFilter(grid(0).cluster().predicate());
+        scfg.setService(new TestServiceImpl());
+
+        client.services().deploy(scfg);
+
+        TestService svc = client.services().serviceProxy(serviceName, TestService.class, true);
+        assertNotNull(svc);
+
+        executeSql(client, "KILL SERVICE '" + serviceName + "'");
+        boolean res = waitForCondition(() -> grid(0).services().serviceDescriptors().isEmpty(), TIMEOUT);
+        assertTrue(res);
+    }
+
+    /** @throws Exception If failed. */
+    @Test
+    public void testCancelContinuousQuery() throws Exception {
+        IgniteCache<Object, Object> cache = client.cache(DEFAULT_CACHE_NAME);
+
+        ContinuousQuery<Integer, Integer> cq = new ContinuousQuery<>();
+
+        AtomicInteger cntr = new AtomicInteger();
+
+        cq.setInitialQuery(new ScanQuery<>());
+        cq.setTimeInterval(1_000L);
+        cq.setPageSize(PAGE_SZ);
+        cq.setLocalListener(events -> {
+            for (CacheEntryEvent<? extends Integer, ? extends Integer> e : events) {
+                assertNotNull(e);
+
+                cntr.incrementAndGet();
+            }
+        });
+
+        cache.query(cq);
+
+        for (int i = 0; i < PAGE_SZ * PAGE_SZ; i++)
+            cache.put(i, i);
+
+        boolean res = waitForCondition(() -> cntr.get() == PAGE_SZ * PAGE_SZ, TIMEOUT);
+        assertTrue(res);
+
+        Map<UUID, Object> routines = getFieldValue(grid(0).context().continuous(), "rmtInfos");

Review comment:
       System views are not available in CALCITE currently :(




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite] ivandasch commented on a change in pull request #9412: IGNITE-15423 Support for kill queries.

Posted by GitBox <gi...@apache.org>.
ivandasch commented on a change in pull request #9412:
URL: https://github.com/apache/ignite/pull/9412#discussion_r710976786



##########
File path: modules/calcite/src/main/codegen/includes/parserImpls.ftl
##########
@@ -391,3 +391,111 @@ SqlDrop SqlDropUser(Span s, boolean replace) :
         return new IgniteSqlDropUser(s.end(this), user);
     }
 }
+
+SqlNumericLiteral QueryIdLiteral() :
+{
+    final Span s;
+}
+{
+    <PLUS> <UNSIGNED_INTEGER_LITERAL> {
+        return SqlLiteral.createExactNumeric(token.image, getPos());
+    }
+|
+    <MINUS> { s = span(); } <UNSIGNED_INTEGER_LITERAL> {
+        return SqlLiteral.createNegative(SqlLiteral.createExactNumeric(token.image, getPos()), s.end(this));
+    }
+|
+    <UNSIGNED_INTEGER_LITERAL> {
+        return SqlLiteral.createExactNumeric(token.image, getPos());
+    }
+}
+
+SqlNode SqlKillScanQuery():
+{
+    final Span s;
+    final SqlCharStringLiteral originNodeId;
+    final SqlCharStringLiteral cacheName;
+    final SqlNumericLiteral queryId;
+    final String rawUuid;
+}{
+    <KILL> { s = span(); } <SCAN>
+    <QUOTED_STRING> {
+        rawUuid = SqlParserUtil.parseString(token.image);
+        if (!IgniteSqlKill.isUuid(rawUuid)) {
+            throw SqlUtil.newContextException(getPos(), IgniteResource.INSTANCE.illegalUuid(rawUuid));
+        }
+        originNodeId = SqlLiteral.createCharString(rawUuid, getPos());
+    }
+    <QUOTED_STRING> {
+        cacheName = SqlLiteral.createCharString(SqlParserUtil.parseString(token.image), getPos());
+    }
+    queryId = QueryIdLiteral() {
+        return IgniteSqlKill.createScanQueryKill(s.end(this), originNodeId, cacheName, queryId);
+    }
+}
+
+SqlNode SqlKillContinuousQuery():
+{
+    final Span s;
+    final SqlCharStringLiteral originNodeId;
+    final SqlCharStringLiteral routineId;
+    String rawUuid;
+}{
+    <KILL> { s = span(); } <CONTINUOUS>
+    <QUOTED_STRING> {
+        rawUuid = SqlParserUtil.parseString(token.image);

Review comment:
       `SqlParserUtil` is internal Calcite class, we have `SqlParserUtils`. Is it OK to import it?
   If ok, great, I'll add to `SqlParserUtils` these methods




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite] ivandasch commented on a change in pull request #9412: IGNITE-15423 Support for kill queries.

Posted by GitBox <gi...@apache.org>.
ivandasch commented on a change in pull request #9412:
URL: https://github.com/apache/ignite/pull/9412#discussion_r710970224



##########
File path: modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/KillCommandDdlIntegrationTest.java
##########
@@ -0,0 +1,286 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ignite.internal.processors.query.calcite.integration;
+
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
+import javax.cache.Cache;
+import javax.cache.event.CacheEntryEvent;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.cache.query.ContinuousQuery;
+import org.apache.ignite.cache.query.QueryCursor;
+import org.apache.ignite.cache.query.ScanQuery;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.internal.processors.cache.query.GridCacheQueryManager;
+import org.apache.ignite.internal.util.typedef.G;
+import org.apache.ignite.lang.IgniteFuture;
+import org.apache.ignite.lang.IgniteUuid;
+import org.apache.ignite.services.Service;
+import org.apache.ignite.services.ServiceConfiguration;
+import org.apache.ignite.services.ServiceContext;
+import org.apache.ignite.transactions.Transaction;
+import org.junit.Test;
+
+import static org.apache.ignite.testframework.GridTestUtils.assertThrowsWithCause;
+import static org.apache.ignite.testframework.GridTestUtils.getFieldValue;
+import static org.apache.ignite.testframework.GridTestUtils.waitForCondition;
+
+/**
+ * Tests for KILL queries.
+ */
+public class KillCommandDdlIntegrationTest extends AbstractDdlIntegrationTest {
+    /** Page size. */
+    public static final int PAGE_SZ = 5;
+
+    /** Number of pages to insert. */
+    public static final int PAGES_CNT = 1000;
+
+    /** Operations timeout. */
+    public static final int TIMEOUT = 10_000;
+
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTestsStarted() throws Exception {
+        super.beforeTestsStarted();
+
+        IgniteCache<Object, Object> cache = client.getOrCreateCache(
+            new CacheConfiguration<>(DEFAULT_CACHE_NAME).setIndexedTypes(Integer.class, Integer.class)
+                .setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL));
+
+        // There must be enough cache entries to keep scan query cursor opened.
+        // Cursor may be concurrently closed when all the data retrieved.
+        for (int i = 0; i < PAGES_CNT * PAGE_SZ; i++)
+            cache.put(i, i);
+    }
+
+    /** */
+    @Override public void cleanUp() {
+        // No-op.
+    }
+
+    /** */
+    @Test
+    public void testCancelScanQuery() {
+        IgniteCache<Object, Object> cache = client.cache(DEFAULT_CACHE_NAME);
+
+        QueryCursor<Cache.Entry<Object, Object>> scanQry = cache.query(new ScanQuery<>().setPageSize(PAGE_SZ));
+        Iterator<Cache.Entry<Object, Object>> scanQryIter = scanQry.iterator();
+
+        // Fetch first entry and therefore caching first page.
+        assertNotNull(scanQryIter.next());
+
+        ConcurrentMap<UUID, GridCacheQueryManager<Object, Object>.RequestFutureMap> qryIters =
+            grid(0).context().cache().cache(DEFAULT_CACHE_NAME).context().queries().queryIterators();
+
+        assertEquals(qryIters.values().size(), 1);
+
+        long qryId = qryIters.values().iterator().next().keySet().iterator().next();
+        UUID originNodeId = client.cluster().localNode().id();
+
+        executeSql(client, "KILL SCAN '" + originNodeId + "' '" + DEFAULT_CACHE_NAME + "' " + qryId);
+
+        // Fetch all cached entries.
+        for (int i = 0; i < PAGE_SZ * servers().size() - 1; i++)
+            assertNotNull(scanQryIter.next());
+
+        // Fetch of the next page should throw the exception.
+        assertThrowsWithCause(scanQryIter::next, IgniteCheckedException.class);
+    }
+
+    /** */
+    @Test
+    public void testCancelComputeTask() {
+        CountDownLatch computeLatch = new CountDownLatch(1);
+
+        IgniteFuture<Collection<Integer>> fut = client.compute().broadcastAsync(() -> {
+            computeLatch.await();
+
+            return 1;
+        });
+
+        try {
+            IgniteUuid taskId = client.compute().activeTaskFutures().keySet().iterator().next();
+
+            executeSql(client, "KILL COMPUTE '" + taskId + "'");
+
+            assertThrowsWithCause(() -> fut.get(TIMEOUT), IgniteException.class);
+        } finally {

Review comment:
       Ops, sorry. Need to create codestyle check for it.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org