You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vo...@apache.org on 2019/03/27 14:03:15 UTC
[ignite] branch ignite-11564 updated: IGNITE-11452: Throw exception
when cancel is attempted on a command which do not support it.
This is an automated email from the ASF dual-hosted git repository.
vozerov pushed a commit to branch ignite-11564
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/ignite-11564 by this push:
new 03066a5 IGNITE-11452: Throw exception when cancel is attempted on a command which do not support it.
03066a5 is described below
commit 03066a510a0c3cd2980cf57e353d3d37001a42d1
Author: Yuriy Gerzhedovich <yg...@gridgain.com>
AuthorDate: Wed Mar 27 17:03:05 2019 +0300
IGNITE-11452: Throw exception when cancel is attempted on a command which do not support it.
---
.../processors/query/h2/CommandProcessor.java | 7 +-
.../processors/query/h2/IgniteH2Indexing.java | 7 +-
.../internal/processors/query/KillQueryTest.java | 363 ++++++++++++++++++---
3 files changed, 323 insertions(+), 54 deletions(-)
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/CommandProcessor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/CommandProcessor.java
index f360e28..9f2ae7e 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/CommandProcessor.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/CommandProcessor.java
@@ -299,8 +299,9 @@ public class CommandProcessor {
GridRunningQueryInfo runningQryInfo = idx.runningQueryManager().runningQueryInfo(qryId);
if (runningQryInfo == null)
- err = "Failed to cancel query due to query doesn't exist " +
- "[nodeId=" + ctx.localNodeId() + ",qryId=" + qryId + "]";
+ err = "Query with provided ID doesn't exist [nodeId=" + ctx.localNodeId() + ", qryId=" + qryId + "]";
+ else if (!runningQryInfo.cancelable())
+ err = "Query doesn't support cancellation [nodeId=" + ctx.localNodeId() + ", qryId=" + qryId + "]";
if (msg.asyncResponse()) {
snd = idx.send(GridTopic.TOPIC_QUERY,
@@ -314,7 +315,7 @@ public class CommandProcessor {
}
try {
- if (runningQryInfo != null)
+ if (err == null)
runningQryInfo.cancel();
}
catch (Exception e) {
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
index 39a606c..b5e807b 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
@@ -1339,9 +1339,6 @@ public class IgniteH2Indexing implements GridQueryIndexing {
boolean keepBinary,
GridQueryCancel cancel
) {
- if (cancel == null)
- cancel = new GridQueryCancel();
-
// Check security.
if (ctx.security().enabled())
checkSecurity(select.cacheIds());
@@ -2304,6 +2301,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
try {
List<List<List<?>>> cur = plan.createRows(argss);
+ //https://issues.apache.org/jira/browse/IGNITE-11176 - Need to support cancellation
ress = DmlUtils.processSelectResultBatched(plan, cur, qryParams.pageSize());
}
finally {
@@ -2573,6 +2571,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
int pageSize = loc ? 0 : qryParams.pageSize();
+ //https://issues.apache.org/jira/browse/IGNITE-11176 - Need to support cancellation
return DmlUtils.processSelectResult(plan, cur, pageSize);
}
@@ -2662,6 +2661,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
it = plan.iteratorForTransaction(connMgr, cur);
}
+ //https://issues.apache.org/jira/browse/IGNITE-11176 - Need to support cancellation
IgniteInternalFuture<Long> fut = tx.updateAsync(
cctx,
it,
@@ -2700,6 +2700,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
if (parts != null && parts.length == 0)
return new UpdateResult(0, X.EMPTY_OBJECT_ARRAY);
else {
+ //https://issues.apache.org/jira/browse/IGNITE-11176 - Need to support cancellation
IgniteInternalFuture<Long> fut = tx.updateAsync(
cctx,
ids,
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/KillQueryTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/KillQueryTest.java
index 369099a..aeb6da1 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/KillQueryTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/KillQueryTest.java
@@ -24,14 +24,23 @@ import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.UUID;
+import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.CyclicBarrier;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
import javax.cache.CacheException;
-import junit.framework.Assert;
+import org.apache.ignite.Ignite;
+import org.junit.Assert;
import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteException;
import org.apache.ignite.cache.query.FieldsQueryCursor;
import org.apache.ignite.cache.query.SqlFieldsQuery;
import org.apache.ignite.cache.query.annotations.QuerySqlField;
@@ -40,7 +49,12 @@ import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.managers.discovery.CustomMessageWrapper;
+import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage;
+import org.apache.ignite.internal.processors.cache.DynamicCacheChangeBatch;
+import org.apache.ignite.internal.processors.query.schema.message.SchemaProposeDiscoveryMessage;
import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.spi.discovery.DiscoverySpiCustomMessage;
import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
@@ -99,6 +113,12 @@ public class KillQueryTest extends GridCommonAbstractTest {
/** Node configration conter. */
private static int cntr;
+ /** Table count. */
+ private static AtomicInteger tblCnt = new AtomicInteger();
+
+ /** Barrier. */
+ private static volatile CyclicBarrier barrier;
+
/** {@inheritDoc} */
@Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
@@ -113,14 +133,38 @@ public class KillQueryTest extends GridCommonAbstractTest {
cfg.setCacheConfiguration(cache);
- TcpDiscoverySpi disco = new TcpDiscoverySpi();
+ if (++cntr == NODES_COUNT)
+ cfg.setClientMode(true);
- disco.setIpFinder(IP_FINDER);
+ cfg.setDiscoverySpi(new TcpDiscoverySpi() {
- cfg.setDiscoverySpi(disco);
+ @Override public void sendCustomEvent(DiscoverySpiCustomMessage msg) throws IgniteException {
+ if (CustomMessageWrapper.class.isAssignableFrom(msg.getClass())) {
+ DiscoveryCustomMessage delegate = ((CustomMessageWrapper)msg).delegate();
- if (++cntr == NODES_COUNT)
- cfg.setClientMode(true);
+ if (DynamicCacheChangeBatch.class.isAssignableFrom(delegate.getClass())) {
+ try {
+ awaitTimeout();
+ }
+ catch (Exception e) {
+ log.error(e.getMessage(), e);
+ }
+
+ }
+ else
+ if (SchemaProposeDiscoveryMessage.class.isAssignableFrom(delegate.getClass())) {
+ try {
+ awaitTimeout();
+ }
+ catch (Exception e) {
+ log.error(e.getMessage(), e);
+ }
+ }
+ }
+
+ super.sendCustomEvent(msg);
+ }
+ }.setIpFinder(IP_FINDER));
return cfg;
}
@@ -156,6 +200,10 @@ public class KillQueryTest extends GridCommonAbstractTest {
public void before() throws Exception {
TestSQLFunctions.init();
+ newBarrier(1);
+
+ tblCnt.incrementAndGet();
+
conn = GridTestUtils.connect(grid(0), null);
conn.setSchema('"' + GridAbstractTest.DEFAULT_CACHE_NAME + '"');
@@ -186,6 +234,216 @@ public class KillQueryTest extends GridCommonAbstractTest {
}
/**
+ * Trying to cancel COPY FROM command.
+ *
+ * @throws Exception In case of failure.
+ */
+ @Test
+ public void testBulkLoadCancellationUnsupported() throws Exception {
+ testCreateTableCancellationUnsupported(false);
+ }
+
+ /**
+ * Trying to async cancel COPY FROM command.
+ *
+ * @throws Exception In case of failure.
+ */
+ @Test
+ public void testAsyncBulkLoadCancellationUnsupported() throws Exception {
+ testCreateTableCancellationUnsupported(true);
+ }
+
+ /**
+ * Trying to cancel COPY FROM command.
+ *
+ * @param async execute cancellation in ASYNC mode.
+ * @throws Exception In case of failure.
+ */
+ public void testBulkLoadCancellationUnsupported(boolean async) throws Exception {
+ String path = Objects.requireNonNull(resolveIgnitePath("/modules/clients/src/test/resources/bulkload1.csv"))
+ .getAbsolutePath();
+
+ String sqlPrepare = "CREATE TABLE " + currentTestTableName() +
+ "(id integer primary key, age integer, firstName varchar, lastname varchar)";
+ String sqlCmd = "COPY FROM '" + path + "'" +
+ " INTO " + currentTestTableName() +
+ " (_key, age, firstName, lastName)" +
+ " format csv charset 'ascii'";
+
+ testCancellationUnsupported(
+ Arrays.asList(sqlPrepare),
+ sqlCmd,
+ async);
+ }
+
+ /**
+ * Trying to cancel CREATE TABLE command.
+ *
+ * @throws Exception In case of failure.
+ */
+ @Test
+ public void testCreateTableCancellationUnsupported() throws Exception {
+ testCreateTableCancellationUnsupported(false);
+ }
+
+ /**
+ * Trying to async cancel CREATE TABLE command.
+ *
+ * @throws Exception In case of failure.
+ */
+ @Test
+ public void testAsyncCreateTableCancellationUnsupported() throws Exception {
+ testCreateTableCancellationUnsupported(true);
+ }
+
+ /**
+ * Trying to cancel CREATE TABLE command.
+ *
+ * @param async execute cancellation in ASYNC mode.
+ * @throws Exception In case of failure.
+ */
+ public void testCreateTableCancellationUnsupported(boolean async) throws Exception {
+ testCancellationUnsupported(Collections.<String>emptyList(),
+ "CREATE TABLE " + currentTestTableName() + " (id INTEGER PRIMARY KEY, name VARCHAR)",
+ async);
+ }
+
+ /**
+ * Trying to cancel ALTER TABLE command.
+ *
+ * @throws Exception In case of failure.
+ */
+ @Test
+ public void testAlterTableCancellationUnsupported() throws Exception {
+ testAlterTableCancellationUnsupported(false);
+ }
+
+ /**
+ * Trying to async cancel ALTER TABLE command.
+ *
+ * @throws Exception In case of failure.
+ */
+ @Test
+ public void testAsyncAlterTableCancellationUnsupported() throws Exception {
+ testAlterTableCancellationUnsupported(true);
+ }
+
+ /**
+ * Trying to cancel ALTER TABLE command.
+ *
+ * @param async execute cancellation in ASYNC mode.
+ * @throws Exception In case of failure.
+ */
+ private void testAlterTableCancellationUnsupported(boolean async) throws Exception {
+ testCancellationUnsupported(Arrays.asList("CREATE TABLE " + currentTestTableName() + " (id INTEGER PRIMARY KEY, name VARCHAR)"),
+ "ALTER TABLE " + currentTestTableName() + " ADD COLUMN COL VARCHAR",
+ async);
+ }
+
+ /**
+ * Trying to cancel CREATE INDEX command.
+ *
+ * @throws Exception In case of failure.
+ */
+ @Test
+ public void testCreateIndexCancellationUnsupported() throws Exception {
+ testCreateIndexCancellationUnsupported(false);
+ }
+
+ /**
+ * Trying to async cancel CREATE INDEX command.
+ *
+ * @throws Exception In case of failure.
+ */
+ @Test
+ public void testAsyncCreateIndexCancellationUnsupported() throws Exception {
+ testCreateIndexCancellationUnsupported(true);
+ }
+
+ /**
+ * Trying to cancel CREATE INDEX command.
+ *
+ * @param async execute cancellation in ASYNC mode.
+ * @throws Exception In case of failure.
+ */
+ private void testCreateIndexCancellationUnsupported(boolean async) throws Exception {
+ testCancellationUnsupported(Arrays.asList("CREATE TABLE " + currentTestTableName() + " (id INTEGER PRIMARY KEY, name VARCHAR)"),
+ "CREATE INDEX " + currentTestTableName() + "_IDX ON " + currentTestTableName() + "(name, id)",
+ async);
+ }
+
+
+ /**
+ * Trying to cancel DROP INDEX command.
+ *
+ * @throws Exception In case of failure.
+ */
+ @Test
+ public void testDropIndexCancellationUnsupported() throws Exception {
+ testDropIndexCancellationUnsupported(false);
+ }
+
+ /**
+ * Trying to async cancel DROP INDEX command.
+ *
+ * @throws Exception In case of failure.
+ */
+ @Test
+ public void testAsyncDropIndexCancellationUnsupported() throws Exception {
+ testDropIndexCancellationUnsupported(true);
+ }
+
+ /**
+ * Trying to cancel DROP INDEX command.
+ *
+ * @param async execute cancellation in ASYNC mode.
+ * @throws Exception In case of failure.
+ */
+ private void testDropIndexCancellationUnsupported(boolean async) throws Exception {
+ testCancellationUnsupported(
+ Arrays.asList("CREATE TABLE " + currentTestTableName() + " (id INTEGER PRIMARY KEY, name VARCHAR)",
+ "CREATE INDEX " + currentTestTableName() + "_IDX ON " + currentTestTableName() + "(name, id)"),
+ "DROP INDEX " + currentTestTableName() + "_IDX",
+ async);
+ }
+
+ /**
+ * Get test table name unique for per tests, but stable within one test run.
+ *
+ * @return Generated test table name unique for per tests, but stable within one test run.
+ */
+ private String currentTestTableName() {
+ return "TST_TABLE_" + tblCnt.get();
+ }
+
+ /**
+ * Check that trying cancellation execution of {@code sqlCmd} can't be cancelled due to it's not supported.
+ *
+ * @param prepareSteps Preparation SQLs before start test.
+ * @param sqlCmd Command which can't be cancelled
+ * @param async Execute cancellation in ASYNC mode.
+ * @throws Exception In case of failure.
+ */
+ private void testCancellationUnsupported(List<String> prepareSteps, String sqlCmd, boolean async) throws Exception {
+ for (String sql : prepareSteps) {
+ try {
+ stmt.execute(sql);
+ }
+ catch (SQLException e) {
+ throw new IgniteException(e);
+ }
+ }
+
+ newBarrier(2);
+
+ IgniteInternalFuture cancelRes = cancelQueryWithBarrier(sqlCmd, "Query doesn't support cancellation", async);
+
+ stmt.execute(sqlCmd);
+
+ cancelRes.get(TIMEOUT);
+ }
+
+ /**
* Trying to cancel unexist query.
*/
@Test
@@ -214,7 +472,7 @@ public class KillQueryTest extends GridCommonAbstractTest {
.query(createKillQuery(nodeId, Long.MAX_VALUE, async));
return null;
- }, CacheException.class, "Failed to cancel query due to query doesn't exist [nodeId=" + nodeId);
+ }, CacheException.class, "Query with provided ID doesn't exist [nodeId=" + nodeId);
}
/**
@@ -450,60 +708,51 @@ public class KillQueryTest extends GridCommonAbstractTest {
}
/**
- * Trying to cancel long running file upload. No exceptions expected.
+ * Cancels current query which wait on barrier.
*
- * @throws Exception If failed.
+ * @param qry Query which need to try cancel.
+ * @param expErrMsg Exoected error message during cancellation.
+ * @param async Execute cancellation in ASYNC mode.
+ * @return <code>IgniteInternalFuture</code> to check whether exception was thrown.
*/
- @Ignore("https://issues.apache.org/jira/browse/IGNITE-11176")
- @Test
- public void testCancellingLongRunningFileUpload() throws Exception {
- testCancellingLongRunningFileUpload(false);
- }
+ private IgniteInternalFuture cancelQueryWithBarrier(String qry, String expErrMsg, boolean async) {
+ return GridTestUtils.runAsync(() -> {
+ try {
+ List<GridRunningQueryInfo> runningQueries = new ArrayList<>();
- /**
- * Trying to async cancel long running file upload. No exceptions expected.
- *
- * @throws Exception If failed.
- */
- @Ignore("https://issues.apache.org/jira/browse/IGNITE-11176")
- @Test
- public void testAsyncCancellingLongRunningFileUpload() throws Exception {
- testCancellingLongRunningFileUpload(true);
- }
+ GridTestUtils.waitForCondition(() -> {
+ List<GridRunningQueryInfo> r = (List<GridRunningQueryInfo>)ignite.context().query()
+ .runningQueries(-1);
- /**
- * Trying to cancel long running file upload. No exceptions expected.
- *
- * @param async execute cancellation in ASYNC mode.
- * @throws Exception If failed.
- */
- private void testCancellingLongRunningFileUpload(boolean async) throws Exception {
- IgniteInternalFuture cancelRes = GridTestUtils.runAsync(() -> {
- try {
- Thread.sleep(200);
+ runningQueries.addAll(r.stream().filter(q -> q.query().equals(qry)).collect(Collectors.toList()));
- GridRunningQueryInfo runQry = ignite.context().query().runningQueries(-1).iterator().next();
+ return !runningQueries.isEmpty();
+ }, TIMEOUT);
- igniteForKillRequest.cache(DEFAULT_CACHE_NAME).query(createKillQuery(runQry.globalQueryId(), async));
+ assertFalse(runningQueries.isEmpty());
+
+ for (GridRunningQueryInfo runningQuery : runningQueries) {
+ GridTestUtils.assertThrowsAnyCause(log,
+ () -> igniteForKillRequest.cache(DEFAULT_CACHE_NAME).query(createKillQuery(runningQuery.globalQueryId(), async)),
+ CacheException.class, expErrMsg);
+ }
}
catch (Exception e) {
log.error("Unexpected exception.", e);
Assert.fail("Unexpected exception");
}
- });
-
- GridTestUtils.assertThrows(log, () -> {
- stmt.executeUpdate(
- "copy from '" + BULKLOAD_20_000_LINE_CSV_FILE + "' into Person" +
- " (_key, age, firstName, lastName)" +
- " format csv");
-
- return null;
- }, SQLException.class, "The query was cancelled while executing.");
+ finally {
+ try {
+ awaitTimeout();
+ }
+ catch (Exception e) {
+ log.error("Unexpected exception.", e);
- // Ensure that there were no exceptions within async cancellation process.
- cancelRes.get(CHECK_RESULT_TIMEOUT);
+ Assert.fail("Unexpected exception");
+ }
+ }
+ });
}
/**
@@ -570,6 +819,24 @@ public class KillQueryTest extends GridCommonAbstractTest {
}
/**
+ * Create and set new CyclicBarrier for the function.
+ *
+ * @param parties the number of threads that must invoke await method before the barrier is tripped
+ */
+ private static void newBarrier(int parties) {
+ barrier = new CyclicBarrier(parties);
+ }
+
+ /**
+ * @throws InterruptedException In case of failure.
+ * @throws TimeoutException In case of failure.
+ * @throws BrokenBarrierException In case of failure.
+ */
+ private static void awaitTimeout() throws InterruptedException, TimeoutException, BrokenBarrierException {
+ barrier.await(TIMEOUT, TimeUnit.MILLISECONDS);
+ }
+
+ /**
* Utility class with custom SQL functions.
*/
public static class TestSQLFunctions {