You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2016/02/20 12:46:24 UTC
[05/38] ignite git commit: Fixed IGNITE-2604
"CacheContinuousQueryBatchAck is sent to nodes that doesn't hold cache data".
Fixed IGNITE-2604 "CacheContinuousQueryBatchAck is sent to nodes that doesn't hold cache data".
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/675a7c1f
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/675a7c1f
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/675a7c1f
Branch: refs/heads/ignite-961
Commit: 675a7c1fc321d7f3d19202a5055df3f6076e1fd6
Parents: 6247ac7
Author: nikolay_tikhonov <nt...@gridgain.com>
Authored: Tue Feb 16 13:47:44 2016 +0300
Committer: nikolay_tikhonov <nt...@gridgain.com>
Committed: Tue Feb 16 13:47:44 2016 +0300
----------------------------------------------------------------------
.../continuous/CacheContinuousQueryHandler.java | 4 +-
.../continuous/CacheContinuousQueryManager.java | 21 +-
.../continuous/CacheContinuousBatchAckTest.java | 355 +++++++++++++++++++
...heContinuousBatchForceServerModeAckTest.java | 80 +++++
.../CacheContinuousQueryLostPartitionTest.java | 4 +-
.../IgniteCacheQuerySelfTestSuite.java | 4 +
6 files changed, 456 insertions(+), 12 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/675a7c1f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
index cf9b439..498f37d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
@@ -1034,10 +1034,10 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
Collection<ClusterNode> nodes = new HashSet<>();
for (AffinityTopologyVersion topVer : t.get2())
- nodes.addAll(ctx.discovery().cacheNodes(topVer));
+ nodes.addAll(ctx.discovery().cacheAffinityNodes(cctx.name(), topVer));
for (ClusterNode node : nodes) {
- if (!node.id().equals(ctx.localNodeId())) {
+ if (!node.isLocal()) {
try {
cctx.io().send(node, msg, GridIoPolicy.SYSTEM_POOL);
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/675a7c1f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
index cc59989..968fc23 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
@@ -56,6 +56,7 @@ import org.apache.ignite.internal.processors.cache.KeyCacheObject;
import org.apache.ignite.internal.processors.continuous.GridContinuousHandler;
import org.apache.ignite.internal.util.typedef.CI2;
import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgnitePredicate;
import org.apache.ignite.plugin.security.SecurityPermission;
@@ -116,17 +117,19 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter {
// Append cache name to the topic.
topicPrefix = "CONTINUOUS_QUERY" + (cctx.name() == null ? "" : "_" + cctx.name());
- cctx.io().addHandler(cctx.cacheId(), CacheContinuousQueryBatchAck.class,
- new CI2<UUID, CacheContinuousQueryBatchAck>() {
- @Override public void apply(UUID uuid, CacheContinuousQueryBatchAck msg) {
- CacheContinuousQueryListener lsnr = lsnrs.get(msg.routineId());
+ if (cctx.affinityNode()) {
+ cctx.io().addHandler(cctx.cacheId(), CacheContinuousQueryBatchAck.class,
+ new CI2<UUID, CacheContinuousQueryBatchAck>() {
+ @Override public void apply(UUID uuid, CacheContinuousQueryBatchAck msg) {
+ CacheContinuousQueryListener lsnr = lsnrs.get(msg.routineId());
- if (lsnr != null)
- lsnr.cleanupBackupQueue(msg.updateCntrs());
- }
- });
+ if (lsnr != null)
+ lsnr.cleanupBackupQueue(msg.updateCntrs());
+ }
+ });
- cctx.time().schedule(new BackupCleaner(lsnrs, cctx.kernalContext()), BACKUP_ACK_FREQ, BACKUP_ACK_FREQ);
+ cctx.time().schedule(new BackupCleaner(lsnrs, cctx.kernalContext()), BACKUP_ACK_FREQ, BACKUP_ACK_FREQ);
+ }
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/ignite/blob/675a7c1f/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousBatchAckTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousBatchAckTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousBatchAckTest.java
new file mode 100644
index 0000000..c69ccf2
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousBatchAckTest.java
@@ -0,0 +1,355 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.query.continuous;
+
+import java.io.Serializable;
+import java.util.UUID;
+import java.util.concurrent.atomic.AtomicBoolean;
+import javax.cache.event.CacheEntryListenerException;
+import javax.cache.event.CacheEntryUpdatedListener;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.cache.CacheMemoryMode;
+import org.apache.ignite.cache.CacheMode;
+import org.apache.ignite.cache.query.ContinuousQuery;
+import org.apache.ignite.cache.query.QueryCursor;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.managers.communication.GridIoMessage;
+import org.apache.ignite.internal.util.typedef.P1;
+import org.apache.ignite.internal.util.typedef.PA;
+import org.apache.ignite.lang.IgniteRunnable;
+import org.apache.ignite.plugin.extensions.communication.Message;
+import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+
+import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC;
+import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
+import static org.apache.ignite.cache.CacheMemoryMode.OFFHEAP_TIERED;
+import static org.apache.ignite.cache.CacheMemoryMode.ONHEAP_TIERED;
+import static org.apache.ignite.cache.CacheMode.PARTITIONED;
+import static org.apache.ignite.cache.CacheMode.REPLICATED;
+import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
+import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_GRID_NAME;
+
+/**
+ * Continuous queries tests.
+ */
+public class CacheContinuousBatchAckTest extends GridCommonAbstractTest implements Serializable {
+ /** IP finder. */
+ private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true);
+
+ /** */
+ protected static final String CLIENT = "_client";
+
+ /** */
+ protected static final String SERVER = "server";
+
+ /** */
+ protected static final String SERVER2 = "server2";
+
+ /** */
+ protected static final AtomicBoolean fail = new AtomicBoolean(false);
+
+ /** */
+ protected static final AtomicBoolean filterOn = new AtomicBoolean(false);
+
+ /** {@inheritDoc} */
+ @SuppressWarnings("unchecked")
+ @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+ if (gridName.endsWith(CLIENT)) {
+ cfg.setClientMode(true);
+
+ cfg.setCommunicationSpi(new FailedTcpCommunicationSpi(true, false));
+ }
+ else if (gridName.endsWith(SERVER2))
+ cfg.setCommunicationSpi(new FailedTcpCommunicationSpi(false, true));
+ else
+ cfg.setCommunicationSpi(new FailedTcpCommunicationSpi(false, false));
+
+ TcpDiscoverySpi disco = new TcpDiscoverySpi();
+
+ disco.setIpFinder(IP_FINDER);
+
+ cfg.setDiscoverySpi(disco);
+
+ return cfg;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void beforeTestsStarted() throws Exception {
+ super.beforeTestsStarted();
+
+ startGrid(SERVER);
+ startGrid(SERVER2);
+ startGrid("1" + CLIENT);
+ startGrid("2" + CLIENT);
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTestsStopped() throws Exception {
+ super.afterTestsStopped();
+
+ stopAllGrids();
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void beforeTest() throws Exception {
+ super.beforeTest();
+
+ fail.set(false);
+
+ filterOn.set(false);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testPartition() throws Exception {
+ checkBackupAcknowledgeMessage(cacheConfiguration(PARTITIONED, 1, ATOMIC, ONHEAP_TIERED, false));
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testPartitionWithFilter() throws Exception {
+ filterOn.set(true);
+
+ checkBackupAcknowledgeMessage(cacheConfiguration(PARTITIONED, 1, ATOMIC, ONHEAP_TIERED, true));
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testPartitionNoBackups() throws Exception {
+ checkBackupAcknowledgeMessage(cacheConfiguration(PARTITIONED, 0, ATOMIC, ONHEAP_TIERED, false));
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testPartitionTx() throws Exception {
+ checkBackupAcknowledgeMessage(cacheConfiguration(PARTITIONED, 1, TRANSACTIONAL, ONHEAP_TIERED, false));
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testPartitionTxWithFilter() throws Exception {
+ filterOn.set(true);
+
+ checkBackupAcknowledgeMessage(cacheConfiguration(PARTITIONED, 1, TRANSACTIONAL, ONHEAP_TIERED, true));
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testPartitionTxNoBackup() throws Exception {
+ checkBackupAcknowledgeMessage(cacheConfiguration(PARTITIONED, 0, TRANSACTIONAL, ONHEAP_TIERED, false));
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testPartitionTxNoBackupWithFilter() throws Exception {
+ filterOn.set(true);
+
+ checkBackupAcknowledgeMessage(cacheConfiguration(PARTITIONED, 0, TRANSACTIONAL, ONHEAP_TIERED, true));
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testPartitionOffheap() throws Exception {
+ checkBackupAcknowledgeMessage(cacheConfiguration(PARTITIONED, 1, ATOMIC, OFFHEAP_TIERED, false));
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testPartitionOffheapWithFilter() throws Exception {
+ filterOn.set(true);
+
+ checkBackupAcknowledgeMessage(cacheConfiguration(PARTITIONED, 1, ATOMIC, OFFHEAP_TIERED, true));
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testPartitionTxOffheap() throws Exception {
+ checkBackupAcknowledgeMessage(cacheConfiguration(PARTITIONED, 1, TRANSACTIONAL, OFFHEAP_TIERED, false));
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testReplicated() throws Exception {
+ checkBackupAcknowledgeMessage(cacheConfiguration(REPLICATED, 1, ATOMIC, ONHEAP_TIERED, false));
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testReplicatedTx() throws Exception {
+ checkBackupAcknowledgeMessage(cacheConfiguration(REPLICATED, 1, TRANSACTIONAL, ONHEAP_TIERED, false));
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testReplicatedTxWithFilter() throws Exception {
+ filterOn.set(true);
+
+ checkBackupAcknowledgeMessage(cacheConfiguration(REPLICATED, 1, TRANSACTIONAL, ONHEAP_TIERED, true));
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testReplicatedOffheap() throws Exception {
+ checkBackupAcknowledgeMessage(cacheConfiguration(REPLICATED, 1, ATOMIC, OFFHEAP_TIERED, false));
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testReplicatedTxOffheap() throws Exception {
+ checkBackupAcknowledgeMessage(cacheConfiguration(REPLICATED, 1, TRANSACTIONAL, OFFHEAP_TIERED, false));
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testReplicatedTxOffheapWithFilter() throws Exception {
+ filterOn.set(true);
+
+ checkBackupAcknowledgeMessage(cacheConfiguration(REPLICATED, 1, TRANSACTIONAL, OFFHEAP_TIERED, true));
+ }
+
+ /**
+ * @param ccfg Cache configuration.
+ * @throws Exception If failed.
+ */
+ private void checkBackupAcknowledgeMessage(CacheConfiguration<Object, Object> ccfg) throws Exception {
+ QueryCursor qry = null;
+
+ IgniteCache<Object, Object> cache = null;
+
+ try {
+ ContinuousQuery q = new ContinuousQuery();
+
+ q.setLocalListener(new CacheEntryUpdatedListener() {
+ @Override public void onUpdated(Iterable iterable) throws CacheEntryListenerException {
+ // No-op.
+ }
+ });
+
+ cache = grid(SERVER).getOrCreateCache(ccfg);
+
+ qry = cache.query(q);
+
+ for (int i = 0; i < 10000; i++)
+ cache.put(i, i);
+
+ assert !GridTestUtils.waitForCondition(new PA() {
+ @Override public boolean apply() {
+ return fail.get();
+ }
+ }, 1300L);
+ }
+ finally {
+ if (qry != null)
+ qry.close();
+
+ if (cache != null)
+ grid(SERVER).destroyCache(cache.getName());
+ }
+ }
+
+ /**
+ *
+ * @param cacheMode Cache mode.
+ * @param backups Number of backups.
+ * @param atomicityMode Cache atomicity mode.
+ * @param memoryMode Cache memory mode.
+ * @param filter Filter enabled.
+ * @return Cache configuration.
+ */
+ private CacheConfiguration<Object, Object> cacheConfiguration(
+ CacheMode cacheMode,
+ int backups,
+ CacheAtomicityMode atomicityMode,
+ CacheMemoryMode memoryMode, boolean filter) {
+ CacheConfiguration<Object, Object> ccfg = new CacheConfiguration<>();
+
+ ccfg.setAtomicityMode(atomicityMode);
+ ccfg.setCacheMode(cacheMode);
+ ccfg.setMemoryMode(memoryMode);
+ ccfg.setWriteSynchronizationMode(FULL_SYNC);
+
+ if (cacheMode == PARTITIONED)
+ ccfg.setBackups(backups);
+
+ if (filter)
+ ccfg.setNodeFilter(new P1<ClusterNode>() {
+ @Override public boolean apply(ClusterNode node) {
+ return !node.attributes().get(ATTR_GRID_NAME).equals(SERVER2);
+ }
+ });
+
+ return ccfg;
+ }
+
+ /**
+ *
+ */
+ protected static class FailedTcpCommunicationSpi extends TcpCommunicationSpi {
+ /** */
+ private boolean check;
+
+ /** */
+ private boolean periodicCheck;
+
+ /**
+ * @param alwaysCheck Always check inbound message.
+ * @param periodicCheck Check when {@code filterOn} enabled.
+ */
+ public FailedTcpCommunicationSpi(boolean alwaysCheck, boolean periodicCheck) {
+ this.check = alwaysCheck;
+ this.periodicCheck = periodicCheck;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void notifyListener(UUID sndId, Message msg, IgniteRunnable msgC) {
+ if (check || (periodicCheck && filterOn.get())) {
+ if (msg instanceof GridIoMessage &&
+ ((GridIoMessage)msg).message() instanceof CacheContinuousQueryBatchAck)
+ fail.set(true);
+ }
+
+ super.notifyListener(sndId, msg, msgC);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/675a7c1f/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousBatchForceServerModeAckTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousBatchForceServerModeAckTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousBatchForceServerModeAckTest.java
new file mode 100644
index 0000000..f1794fa
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousBatchForceServerModeAckTest.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.query.continuous;
+
+import java.io.Serializable;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+
+/**
+ * Continuous queries tests.
+ */
+public class CacheContinuousBatchForceServerModeAckTest extends CacheContinuousBatchAckTest implements Serializable {
+ /** IP finder. */
+ private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true);
+
+ /** {@inheritDoc} */
+ @SuppressWarnings("unchecked")
+ @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+ if (gridName.endsWith(CLIENT)) {
+ cfg.setClientMode(true);
+
+ FailedTcpCommunicationSpi spi = new FailedTcpCommunicationSpi(true, false);
+
+ cfg.setCommunicationSpi(spi);
+
+ TcpDiscoverySpi disco = new TcpDiscoverySpi();
+
+ disco.setForceServerMode(true);
+
+ disco.setIpFinder(IP_FINDER);
+
+ cfg.setDiscoverySpi(disco);
+ }
+ else if (gridName.endsWith(SERVER2)) {
+ cfg.setCommunicationSpi(new FailedTcpCommunicationSpi(false, true));
+
+ TcpDiscoverySpi disco = new TcpDiscoverySpi();
+
+ disco.setIpFinder(IP_FINDER);
+
+ cfg.setDiscoverySpi(disco);
+ }
+ else {
+ cfg.setCommunicationSpi(new FailedTcpCommunicationSpi(false, false));
+
+ TcpDiscoverySpi disco = new TcpDiscoverySpi();
+
+ disco.setIpFinder(IP_FINDER);
+
+ cfg.setDiscoverySpi(disco);
+ }
+
+ TcpDiscoverySpi disco = new TcpDiscoverySpi();
+
+ disco.setIpFinder(IP_FINDER);
+
+ cfg.setDiscoverySpi(disco);
+
+ return cfg;
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/675a7c1f/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryLostPartitionTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryLostPartitionTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryLostPartitionTest.java
index 30613a4..f4659dc 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryLostPartitionTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryLostPartitionTest.java
@@ -138,7 +138,9 @@ public class CacheContinuousQueryLostPartitionTest extends GridCommonAbstractTes
}, 2000L) : "Expected no create events, but got: " + lsnr2.createdCnt.get();
// node2 now becomes the primary for the key.
- grid(0).close();
+ stopGrid(0);
+
+ awaitPartitionMapExchange();
cache2.put(key, "2");
http://git-wip-us.apache.org/repos/asf/ignite/blob/675a7c1f/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java
index 84a9a45..c67a8cf 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java
@@ -72,6 +72,8 @@ import org.apache.ignite.internal.processors.cache.local.IgniteCacheLocalAtomicQ
import org.apache.ignite.internal.processors.cache.local.IgniteCacheLocalFieldsQuerySelfTest;
import org.apache.ignite.internal.processors.cache.local.IgniteCacheLocalQuerySelfTest;
import org.apache.ignite.internal.processors.cache.query.GridCacheSwapScanQuerySelfTest;
+import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousBatchAckTest;
+import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousBatchForceServerModeAckTest;
import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryFailoverAtomicPrimaryWriteOrderOffheapTieredTest;
import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryFailoverAtomicPrimaryWriteOrderSelfTest;
import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryFailoverAtomicReplicatedSelfTest;
@@ -214,6 +216,8 @@ public class IgniteCacheQuerySelfTestSuite extends TestSuite {
suite.addTestSuite(CacheContinuousQueryFailoverAtomicPrimaryWriteOrderOffheapTieredTest.class);
suite.addTestSuite(CacheContinuousQueryFailoverTxOffheapTieredTest.class);
suite.addTestSuite(CacheContinuousQueryRandomOperationsTest.class);
+ suite.addTestSuite(CacheContinuousBatchAckTest.class);
+ suite.addTestSuite(CacheContinuousBatchForceServerModeAckTest.class);
// Reduce fields queries.
suite.addTestSuite(GridCacheReduceFieldsQueryLocalSelfTest.class);