You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vo...@apache.org on 2019/03/27 14:03:15 UTC

[ignite] branch ignite-11564 updated: IGNITE-11452: Throw exception when cancel is attempted on a command which do not support it.

This is an automated email from the ASF dual-hosted git repository.

vozerov pushed a commit to branch ignite-11564
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/ignite-11564 by this push:
     new 03066a5  IGNITE-11452: Throw exception when cancel is attempted on a command which do not support it.
03066a5 is described below

commit 03066a510a0c3cd2980cf57e353d3d37001a42d1
Author: Yuriy Gerzhedovich <yg...@gridgain.com>
AuthorDate: Wed Mar 27 17:03:05 2019 +0300

    IGNITE-11452: Throw exception when cancel is attempted on a command which do not support it.
---
 .../processors/query/h2/CommandProcessor.java      |   7 +-
 .../processors/query/h2/IgniteH2Indexing.java      |   7 +-
 .../internal/processors/query/KillQueryTest.java   | 363 ++++++++++++++++++---
 3 files changed, 323 insertions(+), 54 deletions(-)

diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/CommandProcessor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/CommandProcessor.java
index f360e28..9f2ae7e 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/CommandProcessor.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/CommandProcessor.java
@@ -299,8 +299,9 @@ public class CommandProcessor {
         GridRunningQueryInfo runningQryInfo = idx.runningQueryManager().runningQueryInfo(qryId);
 
         if (runningQryInfo == null)
-            err = "Failed to cancel query due to query doesn't exist " +
-                "[nodeId=" + ctx.localNodeId() + ",qryId=" + qryId + "]";
+            err = "Query with provided ID doesn't exist [nodeId=" + ctx.localNodeId() + ", qryId=" + qryId + "]";
+        else if (!runningQryInfo.cancelable())
+            err = "Query doesn't support cancellation [nodeId=" + ctx.localNodeId() + ", qryId=" + qryId + "]";
 
         if (msg.asyncResponse()) {
             snd = idx.send(GridTopic.TOPIC_QUERY,
@@ -314,7 +315,7 @@ public class CommandProcessor {
         }
 
         try {
-            if (runningQryInfo != null)
+            if (err == null)
                 runningQryInfo.cancel();
         }
         catch (Exception e) {
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
index 39a606c..b5e807b 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
@@ -1339,9 +1339,6 @@ public class IgniteH2Indexing implements GridQueryIndexing {
         boolean keepBinary,
         GridQueryCancel cancel
     ) {
-        if (cancel == null)
-            cancel = new GridQueryCancel();
-
         // Check security.
         if (ctx.security().enabled())
             checkSecurity(select.cacheIds());
@@ -2304,6 +2301,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
                 try {
                     List<List<List<?>>> cur = plan.createRows(argss);
 
+                    //https://issues.apache.org/jira/browse/IGNITE-11176 - Need to support cancellation
                     ress = DmlUtils.processSelectResultBatched(plan, cur, qryParams.pageSize());
                 }
                 finally {
@@ -2573,6 +2571,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
 
         int pageSize = loc ? 0 : qryParams.pageSize();
 
+        //https://issues.apache.org/jira/browse/IGNITE-11176 - Need to support cancellation
         return DmlUtils.processSelectResult(plan, cur, pageSize);
     }
 
@@ -2662,6 +2661,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
                     it = plan.iteratorForTransaction(connMgr, cur);
                 }
 
+                //https://issues.apache.org/jira/browse/IGNITE-11176 - Need to support cancellation
                 IgniteInternalFuture<Long> fut = tx.updateAsync(
                     cctx,
                     it,
@@ -2700,6 +2700,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
             if (parts != null && parts.length == 0)
                 return new UpdateResult(0, X.EMPTY_OBJECT_ARRAY);
             else {
+                //https://issues.apache.org/jira/browse/IGNITE-11176 - Need to support cancellation
                 IgniteInternalFuture<Long> fut = tx.updateAsync(
                     cctx,
                     ids,
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/KillQueryTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/KillQueryTest.java
index 369099a..aeb6da1 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/KillQueryTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/KillQueryTest.java
@@ -24,14 +24,23 @@ import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.sql.Statement;
 import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
 import java.util.List;
 import java.util.Objects;
 import java.util.UUID;
+import java.util.concurrent.BrokenBarrierException;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.CyclicBarrier;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
 import javax.cache.CacheException;
-import junit.framework.Assert;
+import org.apache.ignite.Ignite;
+import org.junit.Assert;
 import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteException;
 import org.apache.ignite.cache.query.FieldsQueryCursor;
 import org.apache.ignite.cache.query.SqlFieldsQuery;
 import org.apache.ignite.cache.query.annotations.QuerySqlField;
@@ -40,7 +49,12 @@ import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.configuration.IgniteConfiguration;
 import org.apache.ignite.internal.IgniteEx;
 import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.managers.discovery.CustomMessageWrapper;
+import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage;
+import org.apache.ignite.internal.processors.cache.DynamicCacheChangeBatch;
+import org.apache.ignite.internal.processors.query.schema.message.SchemaProposeDiscoveryMessage;
 import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.spi.discovery.DiscoverySpiCustomMessage;
 import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
 import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
 import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
@@ -99,6 +113,12 @@ public class KillQueryTest extends GridCommonAbstractTest {
     /** Node configration conter. */
     private static int cntr;
 
+    /** Table count. */
+    private static AtomicInteger tblCnt = new AtomicInteger();
+
+    /** Barrier. */
+    private static volatile CyclicBarrier barrier;
+
     /** {@inheritDoc} */
     @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
         IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
@@ -113,14 +133,38 @@ public class KillQueryTest extends GridCommonAbstractTest {
 
         cfg.setCacheConfiguration(cache);
 
-        TcpDiscoverySpi disco = new TcpDiscoverySpi();
+        if (++cntr == NODES_COUNT)
+            cfg.setClientMode(true);
 
-        disco.setIpFinder(IP_FINDER);
+        cfg.setDiscoverySpi(new TcpDiscoverySpi() {
 
-        cfg.setDiscoverySpi(disco);
+            @Override public void sendCustomEvent(DiscoverySpiCustomMessage msg) throws IgniteException {
+                if (CustomMessageWrapper.class.isAssignableFrom(msg.getClass())) {
+                    DiscoveryCustomMessage delegate = ((CustomMessageWrapper)msg).delegate();
 
-        if (++cntr == NODES_COUNT)
-            cfg.setClientMode(true);
+                    if (DynamicCacheChangeBatch.class.isAssignableFrom(delegate.getClass())) {
+                        try {
+                            awaitTimeout();
+                        }
+                        catch (Exception e) {
+                            log.error(e.getMessage(), e);
+                        }
+
+                    }
+                    else
+                        if (SchemaProposeDiscoveryMessage.class.isAssignableFrom(delegate.getClass())) {
+                        try {
+                            awaitTimeout();
+                        }
+                        catch (Exception e) {
+                            log.error(e.getMessage(), e);
+                        }
+                    }
+                }
+
+                super.sendCustomEvent(msg);
+            }
+        }.setIpFinder(IP_FINDER));
 
         return cfg;
     }
@@ -156,6 +200,10 @@ public class KillQueryTest extends GridCommonAbstractTest {
     public void before() throws Exception {
         TestSQLFunctions.init();
 
+        newBarrier(1);
+
+        tblCnt.incrementAndGet();
+
         conn = GridTestUtils.connect(grid(0), null);
 
         conn.setSchema('"' + GridAbstractTest.DEFAULT_CACHE_NAME + '"');
@@ -186,6 +234,216 @@ public class KillQueryTest extends GridCommonAbstractTest {
     }
 
     /**
+     * Trying to cancel COPY FROM command.
+     *
+     * @throws Exception In case of failure.
+     */
+    @Test
+    public void testBulkLoadCancellationUnsupported() throws Exception {
+        testCreateTableCancellationUnsupported(false);
+    }
+
+    /**
+     * Trying to async cancel COPY FROM command.
+     *
+     * @throws Exception In case of failure.
+     */
+    @Test
+    public void testAsyncBulkLoadCancellationUnsupported() throws Exception {
+        testCreateTableCancellationUnsupported(true);
+    }
+
+    /**
+     * Trying to cancel COPY FROM command.
+     *
+     * @param async execute cancellation in ASYNC mode.
+     * @throws Exception In case of failure.
+     */
+    public void testBulkLoadCancellationUnsupported(boolean async) throws Exception {
+        String path = Objects.requireNonNull(resolveIgnitePath("/modules/clients/src/test/resources/bulkload1.csv"))
+            .getAbsolutePath();
+
+        String sqlPrepare = "CREATE TABLE " + currentTestTableName() +
+            "(id integer primary key, age integer, firstName varchar, lastname varchar)";
+        String sqlCmd = "COPY FROM '" + path + "'" +
+            " INTO " + currentTestTableName() +
+            " (_key, age, firstName, lastName)" +
+            " format csv charset 'ascii'";
+
+        testCancellationUnsupported(
+            Arrays.asList(sqlPrepare),
+            sqlCmd,
+            async);
+    }
+
+    /**
+     * Trying to cancel CREATE TABLE command.
+     *
+     * @throws Exception In case of failure.
+     */
+    @Test
+    public void testCreateTableCancellationUnsupported() throws Exception {
+        testCreateTableCancellationUnsupported(false);
+    }
+
+    /**
+     * Trying to async cancel CREATE TABLE command.
+     *
+     * @throws Exception In case of failure.
+     */
+    @Test
+    public void testAsyncCreateTableCancellationUnsupported() throws Exception {
+        testCreateTableCancellationUnsupported(true);
+    }
+
+    /**
+     * Trying to cancel CREATE TABLE command.
+     *
+     * @param async execute cancellation in ASYNC mode.
+     * @throws Exception In case of failure.
+     */
+    public void testCreateTableCancellationUnsupported(boolean async) throws Exception {
+        testCancellationUnsupported(Collections.<String>emptyList(),
+            "CREATE TABLE " + currentTestTableName() + " (id INTEGER PRIMARY KEY, name VARCHAR)",
+            async);
+    }
+
+    /**
+     * Trying to cancel ALTER TABLE command.
+     *
+     * @throws Exception In case of failure.
+     */
+    @Test
+    public void testAlterTableCancellationUnsupported() throws Exception {
+        testAlterTableCancellationUnsupported(false);
+    }
+
+    /**
+     * Trying to async cancel ALTER TABLE command.
+     *
+     * @throws Exception In case of failure.
+     */
+    @Test
+    public void testAsyncAlterTableCancellationUnsupported() throws Exception {
+        testAlterTableCancellationUnsupported(true);
+    }
+
+    /**
+     * Trying to cancel ALTER TABLE command.
+     *
+     * @param async execute cancellation in ASYNC mode.
+     * @throws Exception In case of failure.
+     */
+    private void testAlterTableCancellationUnsupported(boolean async) throws Exception {
+        testCancellationUnsupported(Arrays.asList("CREATE TABLE " + currentTestTableName() + " (id INTEGER PRIMARY KEY, name VARCHAR)"),
+            "ALTER TABLE " + currentTestTableName() + " ADD COLUMN COL VARCHAR",
+            async);
+    }
+
+    /**
+     * Trying to cancel CREATE INDEX command.
+     *
+     * @throws Exception In case of failure.
+     */
+    @Test
+    public void testCreateIndexCancellationUnsupported() throws Exception {
+        testCreateIndexCancellationUnsupported(false);
+    }
+
+    /**
+     * Trying to async cancel CREATE INDEX command.
+     *
+     * @throws Exception In case of failure.
+     */
+    @Test
+    public void testAsyncCreateIndexCancellationUnsupported() throws Exception {
+        testCreateIndexCancellationUnsupported(true);
+    }
+
+    /**
+     * Trying to cancel CREATE INDEX command.
+     *
+     * @param async execute cancellation in ASYNC mode.
+     * @throws Exception In case of failure.
+     */
+    private void testCreateIndexCancellationUnsupported(boolean async) throws Exception {
+        testCancellationUnsupported(Arrays.asList("CREATE TABLE " + currentTestTableName() + " (id INTEGER PRIMARY KEY, name VARCHAR)"),
+            "CREATE INDEX " + currentTestTableName() + "_IDX ON " + currentTestTableName() + "(name, id)",
+            async);
+    }
+
+
+    /**
+     * Trying to cancel DROP INDEX command.
+     *
+     * @throws Exception In case of failure.
+     */
+    @Test
+    public void testDropIndexCancellationUnsupported() throws Exception {
+        testDropIndexCancellationUnsupported(false);
+    }
+
+    /**
+     * Trying to async cancel DROP INDEX command.
+     *
+     * @throws Exception In case of failure.
+     */
+    @Test
+    public void testAsyncDropIndexCancellationUnsupported() throws Exception {
+        testDropIndexCancellationUnsupported(true);
+    }
+
+    /**
+     * Trying to cancel DROP INDEX command.
+     *
+     * @param async execute cancellation in ASYNC mode.
+     * @throws Exception In case of failure.
+     */
+    private void testDropIndexCancellationUnsupported(boolean async) throws Exception {
+        testCancellationUnsupported(
+            Arrays.asList("CREATE TABLE " + currentTestTableName() + " (id INTEGER PRIMARY KEY, name VARCHAR)",
+            "CREATE INDEX " + currentTestTableName() + "_IDX ON " + currentTestTableName() + "(name, id)"),
+            "DROP INDEX " + currentTestTableName() + "_IDX",
+            async);
+    }
+
+    /**
+     * Get test table name unique for per tests, but stable within one test run.
+     *
+     * @return Generated test table name unique for per tests, but stable within one test run.
+     */
+    private String currentTestTableName() {
+        return "TST_TABLE_" + tblCnt.get();
+    }
+
+    /**
+     * Check that trying cancellation execution of {@code sqlCmd} can't be cancelled due to it's not supported.
+     *
+     * @param prepareSteps Preparation SQLs before start test.
+     * @param sqlCmd Command which can't be cancelled
+     * @param async Execute cancellation in ASYNC mode.
+     * @throws Exception In case of failure.
+     */
+    private void testCancellationUnsupported(List<String> prepareSteps, String sqlCmd, boolean async) throws Exception {
+        for (String sql : prepareSteps) {
+            try {
+                stmt.execute(sql);
+            }
+            catch (SQLException e) {
+                throw new IgniteException(e);
+            }
+        }
+
+        newBarrier(2);
+
+        IgniteInternalFuture cancelRes = cancelQueryWithBarrier(sqlCmd, "Query doesn't support cancellation", async);
+
+        stmt.execute(sqlCmd);
+
+        cancelRes.get(TIMEOUT);
+    }
+
+    /**
      * Trying to cancel unexist query.
      */
     @Test
@@ -214,7 +472,7 @@ public class KillQueryTest extends GridCommonAbstractTest {
                 .query(createKillQuery(nodeId, Long.MAX_VALUE, async));
 
             return null;
-        }, CacheException.class, "Failed to cancel query due to query doesn't exist [nodeId=" + nodeId);
+        }, CacheException.class, "Query with provided ID doesn't exist [nodeId=" + nodeId);
     }
 
     /**
@@ -450,60 +708,51 @@ public class KillQueryTest extends GridCommonAbstractTest {
     }
 
     /**
-     * Trying to cancel long running file upload. No exceptions expected.
+     * Cancels current query which wait on barrier.
      *
-     * @throws Exception If failed.
+     * @param qry Query which need to try cancel.
+     * @param expErrMsg Exoected error message during cancellation.
+     * @param async Execute cancellation in ASYNC mode.
+     * @return <code>IgniteInternalFuture</code> to check whether exception was thrown.
      */
-    @Ignore("https://issues.apache.org/jira/browse/IGNITE-11176")
-    @Test
-    public void testCancellingLongRunningFileUpload() throws Exception {
-        testCancellingLongRunningFileUpload(false);
-    }
+    private IgniteInternalFuture cancelQueryWithBarrier(String qry, String expErrMsg, boolean async) {
+        return GridTestUtils.runAsync(() -> {
+            try {
+                List<GridRunningQueryInfo> runningQueries = new ArrayList<>();
 
-    /**
-     * Trying to async cancel long running file upload. No exceptions expected.
-     *
-     * @throws Exception If failed.
-     */
-    @Ignore("https://issues.apache.org/jira/browse/IGNITE-11176")
-    @Test
-    public void testAsyncCancellingLongRunningFileUpload() throws Exception {
-        testCancellingLongRunningFileUpload(true);
-    }
+                GridTestUtils.waitForCondition(() -> {
+                    List<GridRunningQueryInfo> r = (List<GridRunningQueryInfo>)ignite.context().query()
+                        .runningQueries(-1);
 
-    /**
-     * Trying to cancel long running file upload. No exceptions expected.
-     *
-     * @param async execute cancellation in ASYNC mode.
-     * @throws Exception If failed.
-     */
-    private void testCancellingLongRunningFileUpload(boolean async) throws Exception {
-        IgniteInternalFuture cancelRes = GridTestUtils.runAsync(() -> {
-            try {
-                Thread.sleep(200);
+                    runningQueries.addAll(r.stream().filter(q -> q.query().equals(qry)).collect(Collectors.toList()));
 
-                GridRunningQueryInfo runQry = ignite.context().query().runningQueries(-1).iterator().next();
+                    return !runningQueries.isEmpty();
+                }, TIMEOUT);
 
-                igniteForKillRequest.cache(DEFAULT_CACHE_NAME).query(createKillQuery(runQry.globalQueryId(), async));
+                assertFalse(runningQueries.isEmpty());
+
+                for (GridRunningQueryInfo runningQuery : runningQueries) {
+                    GridTestUtils.assertThrowsAnyCause(log,
+                        () -> igniteForKillRequest.cache(DEFAULT_CACHE_NAME).query(createKillQuery(runningQuery.globalQueryId(), async)),
+                        CacheException.class, expErrMsg);
+                }
             }
             catch (Exception e) {
                 log.error("Unexpected exception.", e);
 
                 Assert.fail("Unexpected exception");
             }
-        });
-
-        GridTestUtils.assertThrows(log, () -> {
-            stmt.executeUpdate(
-                "copy from '" + BULKLOAD_20_000_LINE_CSV_FILE + "' into Person" +
-                    " (_key, age, firstName, lastName)" +
-                    " format csv");
-
-            return null;
-        }, SQLException.class, "The query was cancelled while executing.");
+            finally {
+                try {
+                    awaitTimeout();
+                }
+                catch (Exception e) {
+                    log.error("Unexpected exception.", e);
 
-        // Ensure that there were no exceptions within async cancellation process.
-        cancelRes.get(CHECK_RESULT_TIMEOUT);
+                    Assert.fail("Unexpected exception");
+                }
+            }
+        });
     }
 
     /**
@@ -570,6 +819,24 @@ public class KillQueryTest extends GridCommonAbstractTest {
     }
 
     /**
+     * Create and set new CyclicBarrier for the function.
+     *
+     * @param parties the number of threads that must invoke await method before the barrier is tripped
+     */
+    private static void newBarrier(int parties) {
+        barrier = new CyclicBarrier(parties);
+    }
+
+    /**
+     * @throws InterruptedException In case of failure.
+     * @throws TimeoutException In case of failure.
+     * @throws BrokenBarrierException In case of failure.
+     */
+    private static void awaitTimeout() throws InterruptedException, TimeoutException, BrokenBarrierException {
+        barrier.await(TIMEOUT, TimeUnit.MILLISECONDS);
+    }
+
+    /**
      * Utility class with custom SQL functions.
      */
     public static class TestSQLFunctions {