You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by am...@apache.org on 2019/06/19 10:38:04 UTC
[ignite] branch master updated: IGNITE-11918: SQL: Extend test
coverage for KILL QUERY. This closes #6611.
This is an automated email from the ASF dual-hosted git repository.
amashenkov pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new 0b6a918 IGNITE-11918: SQL: Extend test coverage for KILL QUERY. This closes #6611.
0b6a918 is described below
commit 0b6a918b2ab0881fe114cc1943e97948325d710c
Author: Pavel Kuznetsov <pa...@gmail.com>
AuthorDate: Wed Jun 19 13:37:15 2019 +0300
IGNITE-11918: SQL: Extend test coverage for KILL QUERY. This closes #6611.
---
modules/core/src/main/resources/ignite.properties | 2 +-
.../query/h2/twostep/GridReduceQueryExecutor.java | 9 +-
.../processors/query/KillQueryFromClientTest.java | 4 +-
.../query/KillQueryFromNeighbourTest.java | 4 +-
.../internal/processors/query/KillQueryTest.java | 848 +++++++++++++++------
modules/yarn/src/main/resources/ignite.properties | 2 +-
6 files changed, 611 insertions(+), 258 deletions(-)
diff --git a/modules/core/src/main/resources/ignite.properties b/modules/core/src/main/resources/ignite.properties
index 57dcd33..90d5787 100644
--- a/modules/core/src/main/resources/ignite.properties
+++ b/modules/core/src/main/resources/ignite.properties
@@ -15,7 +15,7 @@
# limitations under the License.
#
-ignite.version=2.7.0-SNAPSHOT
+ignite.version=2.8.0-SNAPSHOT
ignite.build=0
ignite.revision=DEV
ignite.rel.date=01011970
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
index fb4779e..5f3886c 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
@@ -386,6 +386,13 @@ public class GridReduceQueryExecutor {
ReduceQueryRun lastRun = null;
for (int attempt = 0;; attempt++) {
+ try {
+ cancel.checkCancelled();
+ }
+ catch (QueryCancelledException cancelEx) {
+ throw new CacheException("Failed to run reduce query locally. " + cancelEx.getMessage(), cancelEx);
+ }
+
if (attempt > 0 && retryTimeout > 0 && (U.currentTimeMillis() - startTime > retryTimeout)) {
UUID retryNodeId = lastRun.retryNodeId();
String retryCause = lastRun.retryCause();
@@ -724,7 +731,7 @@ public class GridReduceQueryExecutor {
cause = disconnectedErr;
}
- throw new CacheException("Failed to run reduce query locally.", cause);
+ throw new CacheException("Failed to run reduce query locally. " + cause.getMessage(), cause);
}
finally {
if (detachedConn != null)
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/KillQueryFromClientTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/KillQueryFromClientTest.java
index 92fb3f9..9c4b9bc 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/KillQueryFromClientTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/KillQueryFromClientTest.java
@@ -20,13 +20,13 @@ package org.apache.ignite.internal.processors.query;
import org.apache.ignite.internal.IgniteEx;
import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
+import org.junit.runners.Parameterized;
/**
* Test KILL QUERY requested from client node.
*/
-@RunWith(JUnit4.class)
+@RunWith(Parameterized.class)
public class KillQueryFromClientTest extends KillQueryTest {
/** {@inheritDoc} */
@Override protected IgniteEx getKillRequestNode() {
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/KillQueryFromNeighbourTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/KillQueryFromNeighbourTest.java
index 3b0a0ea..c4e0756 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/KillQueryFromNeighbourTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/KillQueryFromNeighbourTest.java
@@ -20,12 +20,12 @@ package org.apache.ignite.internal.processors.query;
import org.apache.ignite.internal.IgniteEx;
import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
+import org.junit.runners.Parameterized;
/**
* Test KILL QUERY requested from neighbour server node.
*/
-@RunWith(JUnit4.class)
+@RunWith(Parameterized.class)
public class KillQueryFromNeighbourTest extends KillQueryTest {
/** {@inheritDoc} */
@Override protected IgniteEx getKillRequestNode() {
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/KillQueryTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/KillQueryTest.java
index fa6553e..a96dafc 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/KillQueryTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/KillQueryTest.java
@@ -19,6 +19,7 @@
package org.apache.ignite.internal.processors.query;
import java.io.Serializable;
+import java.lang.reflect.Field;
import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
@@ -32,14 +33,23 @@ import java.util.UUID;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
+import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
+import java.util.stream.IntStream;
import java.util.stream.Stream;
import javax.cache.CacheException;
import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteDataStreamer;
import org.apache.ignite.IgniteException;
+import org.apache.ignite.IgniteSystemProperties;
+import org.apache.ignite.cache.QueryEntity;
+import org.apache.ignite.cache.QueryIndex;
+import org.apache.ignite.cache.affinity.Affinity;
+import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
import org.apache.ignite.cache.query.FieldsQueryCursor;
import org.apache.ignite.cache.query.QueryCancelledException;
import org.apache.ignite.cache.query.SqlFieldsQuery;
@@ -47,12 +57,27 @@ import org.apache.ignite.cache.query.annotations.QuerySqlField;
import org.apache.ignite.cache.query.annotations.QuerySqlFunction;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.GridKernalContext;
+import org.apache.ignite.internal.GridTopic;
import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.TestRecordingCommunicationSpi;
+import org.apache.ignite.internal.managers.communication.GridIoPolicy;
+import org.apache.ignite.internal.managers.communication.GridMessageListener;
import org.apache.ignite.internal.managers.discovery.CustomMessageWrapper;
import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.DynamicCacheChangeBatch;
+import org.apache.ignite.internal.processors.datastreamer.DataStreamerRequest;
+import org.apache.ignite.internal.processors.query.h2.IgniteH2Indexing;
+import org.apache.ignite.internal.processors.query.h2.twostep.GridReduceQueryExecutor;
+import org.apache.ignite.internal.processors.query.h2.twostep.PartitionReservation;
+import org.apache.ignite.internal.processors.query.h2.twostep.PartitionReservationManager;
+import org.apache.ignite.internal.processors.query.h2.twostep.ReducePartitionMapResult;
+import org.apache.ignite.internal.processors.query.h2.twostep.ReducePartitionMapper;
+import org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2QueryRequest;
import org.apache.ignite.internal.processors.query.schema.message.SchemaProposeDiscoveryMessage;
+import org.apache.ignite.internal.util.GridSpinBusyLock;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.spi.discovery.DiscoverySpiCustomMessage;
import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
@@ -60,14 +85,17 @@ import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
import org.apache.ignite.testframework.GridTestUtils;
import org.apache.ignite.testframework.junits.GridAbstractTest;
+import org.apache.ignite.testframework.junits.WithSystemProperty;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.jetbrains.annotations.Nullable;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
+import org.junit.runners.Parameterized;
+import static java.util.Arrays.stream;
import static org.apache.ignite.cache.CacheMode.PARTITIONED;
import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
import static org.apache.ignite.internal.util.IgniteUtils.resolveIgnitePath;
@@ -76,8 +104,24 @@ import static org.apache.ignite.internal.util.IgniteUtils.resolveIgnitePath;
* Test KILL QUERY requested from the same node where quere was executed.
*/
@SuppressWarnings({"ThrowableNotThrown", "AssertWithSideEffects"})
-@RunWith(JUnit4.class)
+@RunWith(Parameterized.class)
+// We need to set this threshold bigger than partitions count to force partition pruning for the BETWEEN case.
+// see org.apache.ignite.internal.processors.query.h2.affinity.PartitionExtractor.tryExtractBetween
+@WithSystemProperty(key = IgniteSystemProperties.IGNITE_SQL_MAX_EXTRACTED_PARTS_FROM_BETWEEN, value = "21")
public class KillQueryTest extends GridCommonAbstractTest {
+ /** Generates values for the {@link #asyncCancel} parameter. */
+ @Parameterized.Parameters(name = "asyncCancel = {0}")
+ public static Iterable<Object[]> valuesForAsync() {
+ return Arrays.asList(new Object[][] {
+ {true},
+ {false}
+ });
+ }
+
+ /** Whether current test execution shuould use async or non-async cancel mechanism. */
+ @Parameterized.Parameter
+ public boolean asyncCancel;
+
/** IP finder. */
private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true);
@@ -98,6 +142,9 @@ public class KillQueryTest extends GridCommonAbstractTest {
/** Timeout for checking async result. */
public static final int CHECK_RESULT_TIMEOUT = 1_000;
+ /** Number of partitions in the test chache. Keep it small to have enough rows in each partitions. */
+ public static final int PARTS_CNT = 20;
+
/** Connection. */
private Connection conn;
@@ -116,8 +163,11 @@ public class KillQueryTest extends GridCommonAbstractTest {
/** Table count. */
private static AtomicInteger tblCnt = new AtomicInteger();
- /** Barrier. */
- private static volatile CyclicBarrier barrier;
+ /** Barrier. Needed to test unsupported cancelation. Doesn't block threads (parties=1) by default. */
+ private static volatile CyclicBarrier barrier = new CyclicBarrier(1);
+
+ /** Allows to block messages, issued FROM the client node. */
+ private static TestRecordingCommunicationSpi clientBlocker;
/** {@inheritDoc} */
@Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
@@ -125,6 +175,8 @@ public class KillQueryTest extends GridCommonAbstractTest {
CacheConfiguration<?, ?> cache = GridAbstractTest.defaultCacheConfiguration();
+ cache.setAffinity(new RendezvousAffinityFunction(false, PARTS_CNT));
+
cache.setCacheMode(PARTITIONED);
cache.setBackups(1);
cache.setWriteSynchronizationMode(FULL_SYNC);
@@ -133,16 +185,22 @@ public class KillQueryTest extends GridCommonAbstractTest {
cfg.setCacheConfiguration(cache);
- if (++cntr == NODES_COUNT)
+ TestRecordingCommunicationSpi commSpi = new TestRecordingCommunicationSpi();
+
+ cfg.setCommunicationSpi(commSpi);
+
+ if (++cntr == NODES_COUNT) {
cfg.setClientMode(true);
- cfg.setDiscoverySpi(new TcpDiscoverySpi() {
+ clientBlocker = commSpi;
+ }
+ cfg.setDiscoverySpi(new TcpDiscoverySpi() {
@Override public void sendCustomEvent(DiscoverySpiCustomMessage msg) throws IgniteException {
- if (CustomMessageWrapper.class.isAssignableFrom(msg.getClass())) {
+ if (msg instanceof CustomMessageWrapper) {
DiscoveryCustomMessage delegate = ((CustomMessageWrapper)msg).delegate();
- if (DynamicCacheChangeBatch.class.isAssignableFrom(delegate.getClass())) {
+ if (delegate instanceof DynamicCacheChangeBatch) {
try {
awaitTimeout();
}
@@ -151,7 +209,7 @@ public class KillQueryTest extends GridCommonAbstractTest {
}
}
- else if (SchemaProposeDiscoveryMessage.class.isAssignableFrom(delegate.getClass())) {
+ else if (delegate instanceof SchemaProposeDiscoveryMessage) {
try {
awaitTimeout();
}
@@ -168,19 +226,91 @@ public class KillQueryTest extends GridCommonAbstractTest {
return cfg;
}
+ /**
+ * Creates and populates a new cache that is used in distributed join scenario: We have table of Persons with some
+ * autogenerated PK. Join filter should be based on Person.id column which is not collocated with the pk. Result
+ * size of such join (with eq condition) is {@link #MAX_ROWS} rows.
+ *
+ * @param cacheName name of the created cache.
+ * @param shift integer to avoid collocation, put different value for the different caches.
+ */
+ private void createJoinCache(String cacheName, int shift) {
+ CacheConfiguration<Long, Person> ccfg = GridAbstractTest.defaultCacheConfiguration();
+
+ ccfg.setName(cacheName);
+
+ ccfg.setCacheMode(PARTITIONED);
+ ccfg.setBackups(1);
+ ccfg.setWriteSynchronizationMode(FULL_SYNC);
+ ccfg.setSqlFunctionClasses(TestSQLFunctions.class);
+
+ ccfg.setQueryEntities(Collections.singleton(
+ new QueryEntity(Integer.class.getName(), Person.class.getName())
+ .setTableName("PERSON")
+ .setKeyFieldName("rec_id") // PK
+ .addQueryField("rec_id", Integer.class.getName(), null)
+ .addQueryField("id", Integer.class.getName(), null)
+ .addQueryField("lastName", String.class.getName(), null)
+ .setIndexes(Collections.singleton(new QueryIndex("id", true, "idx_" + cacheName)))
+ ));
+
+ grid(0).createCache(ccfg);
+
+ try (IgniteDataStreamer<Object, Object> ds = grid(0).dataStreamer(cacheName)) {
+
+ for (int recordId = 0; recordId < MAX_ROWS; recordId++) {
+ // If two caches has the same PK, FK fields ("id") will be different.
+ int intTabIdFK = (recordId + shift) % MAX_ROWS;
+
+ ds.addData(recordId,
+ new Person(intTabIdFK,
+ "Name_" + recordId,
+ "LastName_" + recordId,
+ 42));
+ }
+ }
+ }
+
/** {@inheritDoc} */
@Override protected void beforeTestsStarted() throws Exception {
super.beforeTestsStarted();
cntr = 0;
+ GridQueryProcessor.idxCls = MockedIndexing.class;
+
startGrids(NODES_COUNT);
- for (int i = 0; i < MAX_ROWS; ++i) {
- grid(0).cache(GridAbstractTest.DEFAULT_CACHE_NAME).put(i, i);
+ // Let's set baseline topology manually. Doing so we are sure that partitions are distributed beetween our 2 srv
+ // nodes, not belong only one node.
+ awaitPartitionMapExchange(true, true, null);
+
+ long curTop = grid(0).cluster().topologyVersion();
+
+ grid(0).cluster().baselineAutoAdjustEnabled(false);
+
+ grid(0).cluster().setBaselineTopology(curTop);
- grid(0).cache(GridAbstractTest.DEFAULT_CACHE_NAME).put((long)i, (long)i);
+ awaitPartitionMapExchange(true, true, null);
+
+ // Populate data.
+ try (IgniteDataStreamer<Object, Object> ds = grid(0).dataStreamer(GridAbstractTest.DEFAULT_CACHE_NAME)) {
+ for (int i = 0; i < MAX_ROWS; ++i) {
+ ds.addData(i, i);
+
+ ds.addData((long)i, (long)i);
+ }
}
+
+ createJoinCache("PERS1", 1);
+ createJoinCache("PERS2", 2);
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTestsStopped() throws Exception {
+ GridQueryProcessor.idxCls = null;
+
+ super.afterTestsStopped();
}
/**
@@ -197,7 +327,7 @@ public class KillQueryTest extends GridCommonAbstractTest {
*/
@Before
public void before() throws Exception {
- TestSQLFunctions.init();
+ TestSQLFunctions.reset();
newBarrier(1);
@@ -212,6 +342,8 @@ public class KillQueryTest extends GridCommonAbstractTest {
ignite = grid(0);
igniteForKillRequest = getKillRequestNode();
+
+ MockedIndexing.resetToDefault();
}
/**
@@ -221,6 +353,10 @@ public class KillQueryTest extends GridCommonAbstractTest {
*/
@After
public void after() throws Exception {
+ MockedIndexing.resetToDefault();
+
+ clientBlocker.stopBlock(false);
+
if (stmt != null && !stmt.isClosed()) {
stmt.close();
@@ -233,176 +369,130 @@ public class KillQueryTest extends GridCommonAbstractTest {
}
/**
- * Trying to cancel COPY FROM command.
+ * Tries to cancel COPY FROM command, then checks such cancellation is unsupported.
*
- * @throws Exception In case of failure.
+ * 1) Run COPY query, got it suspended in the middle. 2) Try to cancel it, get expected exception. 3) Wake up the
+ * COPY. 4) Check COPY is done.
*/
@Test
public void testBulkLoadCancellationUnsupported() throws Exception {
- checkCreateTableCancellationUnsupported(false);
- }
-
- /**
- * Trying to async cancel COPY FROM command.
- *
- * @throws Exception In case of failure.
- */
- @Test
- public void testAsyncBulkLoadCancellationUnsupported() throws Exception {
- checkCreateTableCancellationUnsupported(true);
- }
-
- /**
- * Trying to cancel COPY FROM command.
- *
- * @param async execute cancellation in ASYNC mode.
- * @throws Exception In case of failure.
- */
- public void checkBulkLoadCancellationUnsupported(boolean async) throws Exception {
String path = Objects.requireNonNull(resolveIgnitePath("/modules/clients/src/test/resources/bulkload1.csv"))
.getAbsolutePath();
- String sqlPrepare = "CREATE TABLE " + currentTestTableName() +
+ String createTab = "CREATE TABLE " + currentTestTableName() +
"(id integer primary key, age integer, firstName varchar, lastname varchar)";
- String sqlCmd = "COPY FROM '" + path + "'" +
+
+ String copy = "COPY FROM '" + path + "'" +
" INTO " + currentTestTableName() +
- " (_key, age, firstName, lastName)" +
+ " (id, age, firstName, lastName)" +
" format csv charset 'ascii'";
- checkCancellationUnsupported(
- Arrays.asList(sqlPrepare),
- sqlCmd,
- async);
+ // It's importaint to COPY from the client node: in this case datastreamer doesn't perform local updates so
+ // it sends communication messages which we can hold.
+ IgniteEx clientNode = grid(NODES_COUNT - 1);
+
+ try (Connection clConn = GridTestUtils.connect(clientNode, null);
+ final Statement client = clConn.createStatement()) {
+ client.execute(createTab);
+
+ // Suspend further copy query by holding data streamer messages.
+ clientBlocker.blockMessages((dstNode, msg) -> msg instanceof DataStreamerRequest);
+
+ IgniteInternalFuture<Boolean> copyIsDone = GridTestUtils.runAsync(() -> client.execute(copy));
+
+ // Wait at least one streamer message, that means copy started.
+ clientBlocker.waitForBlocked(1, TIMEOUT);
+
+ // Query can be found only on the connected node.
+ String globQryId = findOneRunningQuery(copy, clientNode);
+
+ GridTestUtils.assertThrowsAnyCause(log,
+ () -> igniteForKillRequest.cache(DEFAULT_CACHE_NAME).query(createKillQuery(globQryId, asyncCancel)),
+ CacheException.class,
+ "Query doesn't support cancellation");
+
+ // Releases copy.
+ clientBlocker.stopBlock(true);
+
+ copyIsDone.get(TIMEOUT);
+
+ int tabSize = clientNode.cache(DEFAULT_CACHE_NAME)
+ .query(new SqlFieldsQuery("SELECT * FROM " + currentTestTableName() + " ").setSchema("PUBLIC"))
+ .getAll()
+ .size();
+
+ assertEquals("COPY command inserted incorrect number of rows.", 1, tabSize);
+ }
}
/**
- * Trying to cancel CREATE TABLE command.
+ * Finds global id of the specified query on the specified node. Expecting exactly one result.
*
- * @throws Exception In case of failure.
+ * @param query Query text to find id for.
+ * @param node Node handle to the node, which initiated the query.
*/
- @Test
- public void testCreateTableCancellationUnsupported() throws Exception {
- checkCreateTableCancellationUnsupported(false);
+ private String findOneRunningQuery(String query, IgniteEx node) {
+ List<GridRunningQueryInfo> qryList = findQueriesOnNode(query, node);
+
+ assertEquals("Expected only one running query: " + query + "\nBut found: " + qryList, 1, qryList.size());
+
+ return qryList.get(0).globalQueryId();
}
/**
- * Trying to async cancel CREATE TABLE command.
+ * Finds queries that has specified text and are initially runned on the specified server node.
*
- * @throws Exception In case of failure.
+ * @param query text of the query to find.
+ * @param node server node that runs the reduce query.
*/
- @Test
- public void testAsyncCreateTableCancellationUnsupported() throws Exception {
- checkCreateTableCancellationUnsupported(true);
+ private List<GridRunningQueryInfo> findQueriesOnNode(String query, IgniteEx node) {
+ List<GridRunningQueryInfo> allQrs = (List<GridRunningQueryInfo>)node.context().query().runningQueries(-1);
+
+ return allQrs.stream()
+ .filter(q -> q.query().equals(query))
+ .collect(Collectors.toList());
}
/**
* Trying to cancel CREATE TABLE command.
- *
- * @param async execute cancellation in ASYNC mode.
- * @throws Exception In case of failure.
*/
- public void checkCreateTableCancellationUnsupported(boolean async) throws Exception {
+ @Test
+ public void testCreateTableCancellationUnsupported() throws Exception {
checkCancellationUnsupported(Collections.<String>emptyList(),
"CREATE TABLE " + currentTestTableName() + " (id INTEGER PRIMARY KEY, name VARCHAR)",
- async);
+ asyncCancel);
}
/**
* Trying to cancel ALTER TABLE command.
- *
- * @throws Exception In case of failure.
*/
@Test
public void testAlterTableCancellationUnsupported() throws Exception {
- checkAlterTableCancellationUnsupported(false);
- }
-
- /**
- * Trying to async cancel ALTER TABLE command.
- *
- * @throws Exception In case of failure.
- */
- @Test
- public void testAsyncAlterTableCancellationUnsupported() throws Exception {
- checkAlterTableCancellationUnsupported(true);
- }
-
- /**
- * Trying to cancel ALTER TABLE command.
- *
- * @param async execute cancellation in ASYNC mode.
- * @throws Exception In case of failure.
- */
- private void checkAlterTableCancellationUnsupported(boolean async) throws Exception {
checkCancellationUnsupported(Arrays.asList("CREATE TABLE " + currentTestTableName() + " (id INTEGER PRIMARY KEY, name VARCHAR)"),
"ALTER TABLE " + currentTestTableName() + " ADD COLUMN COL VARCHAR",
- async);
+ asyncCancel);
}
/**
* Trying to cancel CREATE INDEX command.
- *
- * @throws Exception In case of failure.
*/
@Test
public void testCreateIndexCancellationUnsupported() throws Exception {
- checkCreateIndexCancellationUnsupported(false);
- }
-
- /**
- * Trying to async cancel CREATE INDEX command.
- *
- * @throws Exception In case of failure.
- */
- @Test
- public void testAsyncCreateIndexCancellationUnsupported() throws Exception {
- checkCreateIndexCancellationUnsupported(true);
- }
-
- /**
- * Trying to cancel CREATE INDEX command.
- *
- * @param async execute cancellation in ASYNC mode.
- * @throws Exception In case of failure.
- */
- private void checkCreateIndexCancellationUnsupported(boolean async) throws Exception {
checkCancellationUnsupported(Arrays.asList("CREATE TABLE " + currentTestTableName() + " (id INTEGER PRIMARY KEY, name VARCHAR)"),
"CREATE INDEX " + currentTestTableName() + "_IDX ON " + currentTestTableName() + "(name, id)",
- async);
+ asyncCancel);
}
/**
* Trying to cancel DROP INDEX command.
- *
- * @throws Exception In case of failure.
*/
@Test
public void testDropIndexCancellationUnsupported() throws Exception {
- checkDropIndexCancellationUnsupported(false);
- }
-
- /**
- * Trying to async cancel DROP INDEX command.
- *
- * @throws Exception In case of failure.
- */
- @Test
- public void testAsyncDropIndexCancellationUnsupported() throws Exception {
- checkDropIndexCancellationUnsupported(true);
- }
-
- /**
- * Trying to cancel DROP INDEX command.
- *
- * @param async execute cancellation in ASYNC mode.
- * @throws Exception In case of failure.
- */
- private void checkDropIndexCancellationUnsupported(boolean async) throws Exception {
checkCancellationUnsupported(
Arrays.asList("CREATE TABLE " + currentTestTableName() + " (id INTEGER PRIMARY KEY, name VARCHAR)",
"CREATE INDEX " + currentTestTableName() + "_IDX ON " + currentTestTableName() + "(name, id)"),
"DROP INDEX " + currentTestTableName() + "_IDX",
- async);
+ asyncCancel);
}
/**
@@ -422,7 +512,8 @@ public class KillQueryTest extends GridCommonAbstractTest {
* @param async Execute cancellation in ASYNC mode.
* @throws Exception In case of failure.
*/
- private void checkCancellationUnsupported(List<String> prepareSteps, String sqlCmd, boolean async) throws Exception {
+ private void checkCancellationUnsupported(List<String> prepareSteps, String sqlCmd,
+ boolean async) throws Exception {
for (String sql : prepareSteps) {
try {
stmt.execute(sql);
@@ -444,32 +535,15 @@ public class KillQueryTest extends GridCommonAbstractTest {
}
/**
- * Trying to cancel unexist query.
+ * Trying to cancel non-existing query.
*/
@Test
public void testKillUnknownQry() {
- checkKillUnknownQry(false);
- }
-
- /**
- * Trying to async cancel unexist query.
- */
- @Test
- public void testAsyncKillUnknownQry() {
- checkKillUnknownQry(true);
- }
-
- /**
- * Trying to cancel unexist query.
- *
- * @param async execute cancellation in ASYNC mode.
- */
- private void checkKillUnknownQry(boolean async) {
UUID nodeId = ignite.localNode().id();
GridTestUtils.assertThrows(log, () -> {
igniteForKillRequest.cache(DEFAULT_CACHE_NAME)
- .query(createKillQuery(nodeId, Long.MAX_VALUE, async));
+ .query(createKillQuery(nodeId, Long.MAX_VALUE, asyncCancel));
return null;
}, CacheException.class, "Query with provided ID doesn't exist [nodeId=" + nodeId);
@@ -480,26 +554,9 @@ public class KillQueryTest extends GridCommonAbstractTest {
*/
@Test
public void testKillQryUnknownNode() {
- checkKillQryUnknownNode(false);
- }
-
- /**
- * Trying to async cancel query on unknown node.
- */
- @Test
- public void testAsyncKillQryUnknownNode() {
- checkKillQryUnknownNode(true);
- }
-
- /**
- * Trying to cancel query on unexist node.
- *
- * @param async execute cancellation in ASYNC mode.
- */
- private void checkKillQryUnknownNode(boolean async) {
GridTestUtils.assertThrows(log, () -> {
igniteForKillRequest.cache(DEFAULT_CACHE_NAME)
- .query(createKillQuery(UUID.randomUUID(), Long.MAX_VALUE, async));
+ .query(createKillQuery(UUID.randomUUID(), Long.MAX_VALUE, asyncCancel));
return null;
}, CacheException.class, "Failed to cancel query, node is not alive");
@@ -507,44 +564,22 @@ public class KillQueryTest extends GridCommonAbstractTest {
/**
* Trying to kill already killed query. No exceptions expected.
- *
- * @throws Exception If failed.
*/
@Test
public void testKillAlreadyKilledQuery() throws Exception {
- checkKillAlreadyKilledQuery(false);
- }
-
- /**
- * Trying to kill already killed query. No exceptions expected.
- *
- * @throws Exception If failed.
- */
- @Test
- public void testAsyncKillAlreadyKilledQuery() throws Exception {
- checkKillAlreadyKilledQuery(true);
- }
-
- /**
- * Trying to kill already killed query. No exceptions expected.
- *
- * @param async execute cancellation in ASYNC mode.
- * @throws Exception If failed.
- */
- private void checkKillAlreadyKilledQuery(boolean async) throws Exception {
IgniteCache<Object, Object> cache = ignite.cache(DEFAULT_CACHE_NAME);
- FieldsQueryCursor<List<?>> cur = cache.query(new SqlFieldsQuery("select * from Integer"));
+ FieldsQueryCursor<List<?>> cur = cache.query(new SqlFieldsQuery("select * from Integer where awaitLatchCancelled() = 0"));
List<GridRunningQueryInfo> runningQueries = (List<GridRunningQueryInfo>)ignite.context().query().runningQueries(-1);
GridRunningQueryInfo runQryInfo = runningQueries.get(0);
- SqlFieldsQuery killQry = createKillQuery(runQryInfo.globalQueryId(), async);
+ SqlFieldsQuery killQry = createKillQuery(runQryInfo.globalQueryId(), asyncCancel);
IgniteCache<Object, Object> reqCache = igniteForKillRequest.cache(DEFAULT_CACHE_NAME);
- IgniteInternalFuture killFut = GridTestUtils.runAsync(() -> reqCache.query(killQry));
+ IgniteInternalFuture killFut = cancel(1, asyncCancel);
GridTestUtils.assertThrows(log,
() -> cur.iterator().next(),
@@ -579,36 +614,37 @@ public class KillQueryTest extends GridCommonAbstractTest {
/**
* Trying to cancel long running query. No exceptions expected.
- *
- * @throws Exception If failed.
*/
@Test
public void testCancelQuery() throws Exception {
- checkCancelQuery(false);
+ IgniteInternalFuture cancelRes = cancel(1, asyncCancel);
+
+ GridTestUtils.assertThrows(log, () -> {
+ stmt.executeQuery("select * from Integer where _key in " +
+ "(select _key from Integer where awaitLatchCancelled() = 0) and shouldNotBeCalledInCaseOfCancellation()");
+
+ return null;
+ }, SQLException.class, "The query was cancelled while executing.");
+
+ // Ensures that there were no exceptions within async cancellation process.
+ cancelRes.get(CHECK_RESULT_TIMEOUT);
}
/**
- * Trying to async cancel long running query. No exceptions expected.
- *
- * @throws Exception If failed.
+ * Trying to cancel long running query if partition pruning does it job. It's important to set {@link
+ * IgniteSystemProperties#IGNITE_SQL_MAX_EXTRACTED_PARTS_FROM_BETWEEN} bigger than partitions count {@link
+ * #PARTS_CNT}
*/
@Test
- public void testAsyncCancelQuery() throws Exception {
- checkCancelQuery(true);
- }
+ public void testCancelQueryPartitionPruning() throws Exception {
+ IgniteInternalFuture cancelRes = cancel(1, asyncCancel);
- /**
- * Trying to cancel long running query. No exceptions expected.
- *
- * @param async execute cancellation in ASYNC mode.
- * @throws Exception If failed.
- */
- private void checkCancelQuery(boolean async) throws Exception {
- IgniteInternalFuture cancelRes = cancel(1, async);
+ final int ROWS_ALLOWED_TO_PROCESS_AFTER_CANCEL = 400;
GridTestUtils.assertThrows(log, () -> {
- stmt.executeQuery("select * from Integer where _key in " +
- "(select _key from Integer where awaitLatchCancelled() = 0) and shouldNotBeCalledInCaseOfCancellation()");
+ stmt.executeQuery("select * from Integer where _key between 1000 and 2000 " +
+ "and awaitLatchCancelled() = 0 " +
+ "and shouldNotBeCalledMoreThan(" + ROWS_ALLOWED_TO_PROCESS_AFTER_CANCEL + ")");
return null;
}, SQLException.class, "The query was cancelled while executing.");
@@ -618,32 +654,58 @@ public class KillQueryTest extends GridCommonAbstractTest {
}
/**
- * Trying to cancel long running multiple statements query. No exceptions expected.
- *
- * @throws Exception If failed.
+ * Check that local query can be canceled either using async or non-async method. Local query is performed using
+ * cache.query() API with "local" property "true".
*/
@Test
- public void testKillMultipleStatementsQuery() throws Exception {
- checkKillMultipleStatementsQuery(false);
+ public void testCancelLocalQueryNative() throws Exception {
+ IgniteInternalFuture cancelRes = cancel(1, asyncCancel);
+
+ GridTestUtils.assertThrows(log, () -> {
+ ignite.cache(DEFAULT_CACHE_NAME).query(
+ new SqlFieldsQuery("select * from Integer where _key in " +
+ "(select _key from Integer where awaitLatchCancelled() = 0) and shouldNotBeCalledInCaseOfCancellation()")
+ .setLocal(true)
+ ).getAll();
+
+ return null;
+ }, IgniteException.class, "The query was cancelled while executing.");
+
+ // Ensures that there were no exceptions within async cancellation process.
+ cancelRes.get(CHECK_RESULT_TIMEOUT);
}
/**
- * Trying to async cancel long running multiple statements query. No exceptions expected.
- *
- * @throws Exception If failed.
+ * Check distributed query can be canceled.
*/
@Test
- public void testAsyncKillMultipleStatementsQuery() throws Exception {
- checkKillMultipleStatementsQuery(true);
+ public void testCancelDistributeJoin() throws Exception {
+ IgniteInternalFuture cancelRes = cancel(1, asyncCancel);
+
+ final int ROWS_ALLOWED_TO_PROCESS_AFTER_CANCEL = MAX_ROWS - 1;
+
+ GridTestUtils.assertThrows(log, () -> {
+ ignite.cache(DEFAULT_CACHE_NAME).query(
+ new SqlFieldsQuery("SELECT p1.rec_id, p1.id, p2.rec_id " +
+ "FROM PERS1.Person p1 JOIN PERS2.Person p2 " +
+ "ON p1.id = p2.id " +
+ "AND shouldNotBeCalledMoreThan(" + ROWS_ALLOWED_TO_PROCESS_AFTER_CANCEL + ")" +
+ "AND awaitLatchCancelled() = 0")
+ .setDistributedJoins(true)
+ ).getAll();
+
+ return null;
+ }, CacheException.class, "The query was cancelled while executing.");
+
+ // Ensures that there were no exceptions within async cancellation process.
+ cancelRes.get(CHECK_RESULT_TIMEOUT);
}
/**
* Trying to async cancel long running multiple statements query. No exceptions expected.
- *
- * @param async execute cancellation in ASYNC mode.
- * @throws Exception If failed.
*/
- private void checkKillMultipleStatementsQuery(boolean async) throws Exception {
+ @Test
+ public void testKillMultipleStatementsQuery() throws Exception {
try (Statement anotherStatement = conn.createStatement()) {
anotherStatement.setFetchSize(1);
@@ -653,7 +715,7 @@ public class KillQueryTest extends GridCommonAbstractTest {
assert rs.next();
- IgniteInternalFuture cancelRes = cancel(3, async, sql);
+ IgniteInternalFuture cancelRes = cancel(3, asyncCancel, sql);
GridTestUtils.assertThrows(log, () -> {
// Executes multiple long running query
@@ -677,26 +739,6 @@ public class KillQueryTest extends GridCommonAbstractTest {
*/
@Test
public void testCancelBatchQuery() throws Exception {
- checkCancelBatchQuery(false);
- }
-
- /**
- * Trying to async cancel long running batch query. No exceptions expected. *
- *
- * @throws Exception If failed.
- */
- @Test
- public void testAsyncCancelBatchQuery() throws Exception {
- checkCancelBatchQuery(true);
- }
-
- /**
- * Trying to cancel long running batch query. No exceptions expected. *
- *
- * @param async execute cancellation in ASYNC mode.
- * @throws Exception If failed.
- */
- private void checkCancelBatchQuery(boolean async) throws Exception {
try (Statement stmt2 = conn.createStatement()) {
stmt2.setFetchSize(1);
@@ -706,7 +748,7 @@ public class KillQueryTest extends GridCommonAbstractTest {
Assert.assertTrue(rs.next());
- IgniteInternalFuture cancelRes = cancel(2, async, sql);
+ IgniteInternalFuture cancelRes = cancel(2, asyncCancel, sql);
GridTestUtils.assertThrows(log, () -> {
stmt.addBatch("update Long set _val = _val + 1 where _key < sleep_func (30)");
@@ -726,6 +768,169 @@ public class KillQueryTest extends GridCommonAbstractTest {
}
/**
+ * Check if query hangs (due to reducer spins retrying to reserve partitions but they can't be reserved), we still
+ * able to cancel it. Used mocked indexing simulates 100% unability.
+ */
+ @Test
+ public void testCancelQueryIfPartitionsCantBeReservedOnMapNodes() throws Exception {
+ // Releases thread that kills query when map nodes receive query request. At least one map node received is ok.
+ GridMessageListener qryStarted = (node, msg, plc) -> {
+ if (msg instanceof GridH2QueryRequest)
+ TestSQLFunctions.cancelLatch.countDown();
+ };
+
+ for (int i = 0; i < NODES_COUNT; i++)
+ grid(i).context().io().addMessageListener(GridTopic.TOPIC_QUERY, qryStarted);
+
+ // Suspends distributed queries on the map nodes.
+ MockedIndexing.failReservations = true;
+
+ try {
+ IgniteInternalFuture cancelFut = cancel(1, asyncCancel);
+
+ GridTestUtils.assertThrows(log, () -> {
+ ignite.cache(DEFAULT_CACHE_NAME).query(
+ new SqlFieldsQuery("select * from Integer where _val <> 42")
+ ).getAll();
+
+ return null;
+ }, CacheException.class, "The query was cancelled while executing.");
+
+ cancelFut.get(CHECK_RESULT_TIMEOUT);
+ }
+ finally {
+ for (int i = 0; i < NODES_COUNT; i++)
+ grid(i).context().io().removeMessageListener(GridTopic.TOPIC_QUERY, qryStarted);
+ }
+ }
+
+ /**
+ * Check if query hangs due to reducer cannot get nodes for partitions, it still can be killed.
+ */
+ @Test
+ public void testCancelQueryIfUnableToGetNodesForPartitions() throws Exception {
+ // Force query to spin retrying to get nodes for partitions.
+ MockedIndexing.retryNodePartMapping = true;
+
+ String select = "select * from Integer where _val <> 42";
+
+ IgniteInternalFuture runQueryFut = GridTestUtils.runAsync(() ->
+ ignite.cache(DEFAULT_CACHE_NAME).query(
+ new SqlFieldsQuery(select)
+ ).getAll());
+
+ boolean gotOneFreezedSelect = GridTestUtils.waitForCondition(
+ () -> findQueriesOnNode(select, ignite).size() == 1, TIMEOUT);
+
+ if (!gotOneFreezedSelect) {
+ if (runQueryFut.isDone())
+ printFuturesException("Got exception getting running the query.", runQueryFut);
+
+ Assert.fail("Failed to wait for query to be in running queries list exactly one time " +
+ "[select=" + select + ", node=" + ignite.localNode().id() + ", timeout=" + TIMEOUT + "ms].");
+
+ }
+
+ SqlFieldsQuery killQry = createKillQuery(findOneRunningQuery(select, ignite), asyncCancel);
+
+ ignite.cache(DEFAULT_CACHE_NAME).query(killQry);
+
+ GridTestUtils.assertThrowsAnyCause(
+ log,
+ () -> runQueryFut.get(CHECK_RESULT_TIMEOUT),
+ CacheException.class,
+ "The query was cancelled while executing.");
+
+ }
+
+ /**
+ * Print to log exception that have been catched on other thread and have been put to specified future.
+ *
+ * @param msg message to add.
+ * @param fut future containing the exception.
+ */
+ private void printFuturesException(String msg, IgniteInternalFuture fut) {
+ try {
+ fut.get(TIMEOUT);
+ }
+ catch (Exception e) {
+ log.error(msg, e);
+ }
+ }
+
+ /**
+ * Test if user specified partitions for query explicitly, such query is cancealble.
+ *
+ * We check 3 scenarious in which partitions are belong to: 1) only first node <br/> 2) only second node <br/> 3)
+ * some to first, the others to second <br/>
+ */
+ @Test
+ public void testCancelQueryWithPartitions() throws Exception {
+ Affinity<Object> aff = ignite.affinity(DEFAULT_CACHE_NAME);
+
+ int halfOfNodeParts = PARTS_CNT / 4;
+
+ int[] firstParts = stream(aff.primaryPartitions(grid(0).localNode())).limit(halfOfNodeParts).toArray();
+ int[] secondParts = stream(aff.primaryPartitions(grid(1).localNode())).limit(halfOfNodeParts).toArray();
+
+ int[] mixedParts = IntStream.concat(
+ stream(firstParts).limit(halfOfNodeParts),
+ stream(secondParts).limit(halfOfNodeParts)
+ ).toArray();
+
+ checkPartitions(firstParts);
+ checkPartitions(secondParts);
+ checkPartitions(mixedParts);
+ }
+
+ /**
+ * Test if user specified partitions for query explicitly, such query is cancealble.
+ *
+ * @param partitions user specified partitions, could contain partitions that are mapped on one or both nodes.
+ */
+ public void checkPartitions(int[] partitions) throws Exception {
+ TestSQLFunctions.reset();
+
+ IgniteInternalFuture cancelRes = cancel(1, asyncCancel);
+
+ GridTestUtils.assertThrows(log, () -> {
+ ignite.cache(DEFAULT_CACHE_NAME).query(
+ new SqlFieldsQuery("select * from Integer where _key in " +
+ "(select _key from Integer where awaitLatchCancelled() = 0) and shouldNotBeCalledInCaseOfCancellation()")
+ .setPartitions(partitions)
+ ).getAll();
+
+ return null;
+ }, CacheException.class, "The query was cancelled while executing.");
+
+ // Ensures that there were no exceptions within async cancellation process.
+ cancelRes.get(CHECK_RESULT_TIMEOUT);
+ }
+
+ /**
+ * Wait until all map parts are finished on the specified node. Not needed when IGN-13862 is done.
+ *
+ * @param node node for which map request completion to wait.
+ */
+ private void ensureMapQueriesHasFinished(IgniteEx node) throws Exception {
+ boolean noTasksInQryPool = GridTestUtils.waitForCondition(() -> queryPoolIsEmpty(node), TIMEOUT);
+
+ Assert.assertTrue("Node " + node.localNode().id() + " has not finished its tasks in the query pool",
+ noTasksInQryPool);
+ }
+
+ /**
+ * @param node node which query pool to check.
+ * @return {@code True} if {@link GridIoPolicy#QUERY_POOL} is empty. This means no queries are currntly executed and
+ * no queries are executed at the moment; {@code false} otherwise.
+ */
+ private boolean queryPoolIsEmpty(IgniteEx node) {
+ ThreadPoolExecutor qryPool = (ThreadPoolExecutor)node.context().getQueryExecutorService();
+
+ return qryPool.getQueue().isEmpty() && qryPool.getActiveCount() == 0;
+ }
+
+ /**
* Cancels current query which wait on barrier.
*
* @param qry Query which need to try cancel.
@@ -795,15 +1000,25 @@ public class KillQueryTest extends GridCommonAbstractTest {
));
}
+ // This sleep is required to wait for kill queries get started.
doSleep(500);
+ if (expQryNum != runningQueries.size())
+ log.error("Found running queries are incorrect, " +
+ "expected only " + expQryNum + " queries. Found : " + runningQueries);
+
assertEquals(expQryNum, runningQueries.size());
TestSQLFunctions.reqLatch.countDown();
- for (IgniteInternalFuture fut : res) {
+ for (IgniteInternalFuture fut : res)
fut.get(TIMEOUT);
- }
+
+ // Currently canceled query returns (unblocks the caller code) without waiting for map parts of the
+ // query to be finished. We need to wait for them. This is a workaround for IGN-13862 because we
+ // observe side effects if map parts of canceled query are still running.
+ ensureMapQueriesHasFinished(grid(0));
+ ensureMapQueriesHasFinished(grid(1));
}
catch (Exception e) {
log.error("Unexpected exception.", e);
@@ -843,6 +1058,9 @@ public class KillQueryTest extends GridCommonAbstractTest {
* @param parties the number of threads that must invoke await method before the barrier is tripped
*/
private static void newBarrier(int parties) {
+ if (barrier != null)
+ barrier.reset();
+
barrier = new CyclicBarrier(parties);
}
@@ -860,23 +1078,40 @@ public class KillQueryTest extends GridCommonAbstractTest {
*/
public static class TestSQLFunctions {
/** Request latch. */
- static CountDownLatch reqLatch;
+ static volatile CountDownLatch reqLatch;
/** Cancel latch. */
- static CountDownLatch cancelLatch;
+ static volatile CountDownLatch cancelLatch;
/** Suspend query latch. */
- static CountDownLatch suspendQryLatch;
+ static volatile CountDownLatch suspendQryLatch;
+
+ /** How many times function {@link #shouldNotBeCalledMoreThan} have been called so far. */
+ static volatile AtomicInteger funCallCnt;
/**
- * Recreate latches.
+ * Recreate latches. Old latches are released.
*/
- static void init() {
+ static void reset() {
+ releaseLatches(reqLatch, cancelLatch, suspendQryLatch);
+
reqLatch = new CountDownLatch(1);
cancelLatch = new CountDownLatch(1);
suspendQryLatch = new CountDownLatch(1);
+
+ funCallCnt = new AtomicInteger(0);
+ }
+
+ /**
+ * @param latches latches to release. Our latches have initial count = 1.
+ */
+ private static void releaseLatches(CountDownLatch... latches) {
+ for (CountDownLatch l : latches) {
+ if (l != null)
+ l.countDown();
+ }
}
/**
@@ -915,6 +1150,22 @@ public class KillQueryTest extends GridCommonAbstractTest {
}
/**
+ * Asserts that this function have not been called more than specified number times. Otherwise we're failing.
+ * Intended to check that query is canceled but since cancel is not instant (query continues to process some
+ * number of rows), it don't process all the rows in the table (in case of full scan, of course).
+ *
+ * @param times deadline times.
+ * @return always {@link true}.
+ */
+ @QuerySqlFunction
+ public static boolean shouldNotBeCalledMoreThan(int times) {
+ if (funCallCnt.incrementAndGet() >= times)
+ fail("Query is running too long since it was canceled.");
+
+ return true;
+ }
+
+ /**
* If called fails with corresponding message.
*
* @return 0;
@@ -1007,4 +1258,99 @@ public class KillQueryTest extends GridCommonAbstractTest {
return result;
}
}
+
+ /**
+ * Mocked indexing to eventually suspend mapper or reducer code. It simulates never ending PMEs, unstable topologies
+ * and etc.
+ */
+ static class MockedIndexing extends IgniteH2Indexing {
+ /**
+ * All the time this flag is set to {@code true}, no partitions can be reserved. Acts like normal indexing by
+ * default.
+ */
+ static volatile boolean failReservations;
+
+ /**
+ * All the time this flag is set to {@code true}, reducer is not able to get nodes for given partitions because
+ * mapper says "retry later".
+ */
+ static volatile boolean retryNodePartMapping;
+
+ /**
+ * Result of the mapping partitions to nodes, that indicates that caller should retry request later.
+ */
+ private static final ReducePartitionMapResult RETRY_RESULT = new ReducePartitionMapResult(null, null, null);
+
+ /**
+ * Resets to default behaviour: disable all mock added logic.
+ */
+ static void resetToDefault() {
+ failReservations = false;
+
+ retryNodePartMapping = false;
+ }
+
+ /**
+ * Setups mock objects into this indexing, just after super initialization is done.
+ */
+ @Override public void start(GridKernalContext ctx, GridSpinBusyLock busyLock) throws IgniteCheckedException {
+ super.start(ctx, busyLock);
+
+ setReservationManager(new PartitionReservationManager(super.ctx) {
+ /** {@inheritDoc} */
+ @Override public PartitionReservation reservePartitions(@Nullable List<Integer> cacheIds,
+ AffinityTopologyVersion reqTopVer, int[] explicitParts, UUID nodeId, long reqId)
+ throws IgniteCheckedException {
+ return failReservations ? new PartitionReservation(null,
+ "[TESTS]: Failed to reserve partitions for the testing purpose!") :
+ super.reservePartitions(cacheIds, reqTopVer, explicitParts, nodeId, reqId);
+ }
+ });
+
+ setMapper(new ReducePartitionMapper(ctx, ctx.log(GridReduceQueryExecutor.class)) {
+ /** {@inheritDoc} */
+ @Override public ReducePartitionMapResult nodesForPartitions(List<Integer> cacheIds,
+ AffinityTopologyVersion topVer, int[] parts, boolean isReplicatedOnly, long qryId) {
+
+ return retryNodePartMapping ? RETRY_RESULT :
+ super.nodesForPartitions(cacheIds, topVer, parts, isReplicatedOnly, qryId);
+ }
+ });
+
+ }
+
+ /**
+ * Injects custom {@link PartitionReservationManager} into indexing instance.
+ */
+ protected void setReservationManager(PartitionReservationManager mockMgr) {
+ try {
+ Field partReservationMgrFld = IgniteH2Indexing.class.getDeclaredField("partReservationMgr");
+
+ partReservationMgrFld.setAccessible(true);
+
+ partReservationMgrFld.set(this, mockMgr);
+ }
+ catch (Exception rethrown) {
+ throw new RuntimeException(rethrown);
+ }
+ }
+
+ /**
+ * Injects custom {@link ReducePartitionMapper} into reducer of this indexing instance.
+ */
+ private void setMapper(ReducePartitionMapper mock) {
+ try {
+ GridReduceQueryExecutor rdcExec = this.reduceQueryExecutor();
+
+ Field mapperFld = GridReduceQueryExecutor.class.getDeclaredField("mapper");
+
+ mapperFld.setAccessible(true);
+
+ mapperFld.set(rdcExec, mock);
+ }
+ catch (Exception rethrown) {
+ throw new RuntimeException(rethrown);
+ }
+ }
+ }
}
diff --git a/modules/yarn/src/main/resources/ignite.properties b/modules/yarn/src/main/resources/ignite.properties
index 57dcd33..90d5787 100644
--- a/modules/yarn/src/main/resources/ignite.properties
+++ b/modules/yarn/src/main/resources/ignite.properties
@@ -15,7 +15,7 @@
# limitations under the License.
#
-ignite.version=2.7.0-SNAPSHOT
+ignite.version=2.8.0-SNAPSHOT
ignite.build=0
ignite.revision=DEV
ignite.rel.date=01011970