You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by nt...@apache.org on 2016/02/15 15:39:07 UTC
ignite git commit: IGNITE-2604 Fixed review notes.
Repository: ignite
Updated Branches:
refs/heads/ignite-2604 155af4982 -> 0eb91541d
IGNITE-2604 Fixed review notes.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/0eb91541
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/0eb91541
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/0eb91541
Branch: refs/heads/ignite-2604
Commit: 0eb91541d48638bbd2c85b044ea695c2fe3c45d8
Parents: 155af49
Author: nikolay_tikhonov <nt...@gridgain.com>
Authored: Mon Feb 15 17:39:01 2016 +0300
Committer: nikolay_tikhonov <nt...@gridgain.com>
Committed: Mon Feb 15 17:39:01 2016 +0300
----------------------------------------------------------------------
.../continuous/CacheContinuousBatchAckTest.java | 364 ++++---------
...heContinuousBatchForceServerModeAckTest.java | 80 +++
.../CacheContinuousCacheFilterBatchAckTest.java | 531 -------------------
3 files changed, 180 insertions(+), 795 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/0eb91541/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousBatchAckTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousBatchAckTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousBatchAckTest.java
index f683ffd..c69ccf2 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousBatchAckTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousBatchAckTest.java
@@ -28,9 +28,11 @@ import org.apache.ignite.cache.CacheMemoryMode;
import org.apache.ignite.cache.CacheMode;
import org.apache.ignite.cache.query.ContinuousQuery;
import org.apache.ignite.cache.query.QueryCursor;
+import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.internal.managers.communication.GridIoMessage;
+import org.apache.ignite.internal.util.typedef.P1;
import org.apache.ignite.internal.util.typedef.PA;
import org.apache.ignite.lang.IgniteRunnable;
import org.apache.ignite.plugin.extensions.communication.Message;
@@ -48,6 +50,7 @@ import static org.apache.ignite.cache.CacheMemoryMode.ONHEAP_TIERED;
import static org.apache.ignite.cache.CacheMode.PARTITIONED;
import static org.apache.ignite.cache.CacheMode.REPLICATED;
import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
+import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_GRID_NAME;
/**
* Continuous queries tests.
@@ -57,16 +60,19 @@ public class CacheContinuousBatchAckTest extends GridCommonAbstractTest implemen
private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true);
/** */
- private static final String CLIENT = "_client";
+ protected static final String CLIENT = "_client";
/** */
- private static final String SERVER = "server";
+ protected static final String SERVER = "server";
/** */
- private static final String SERVER2 = "server2";
+ protected static final String SERVER2 = "server2";
/** */
- private static final AtomicBoolean fail = new AtomicBoolean(false);
+ protected static final AtomicBoolean fail = new AtomicBoolean(false);
+
+ /** */
+ protected static final AtomicBoolean filterOn = new AtomicBoolean(false);
/** {@inheritDoc} */
@SuppressWarnings("unchecked")
@@ -76,10 +82,12 @@ public class CacheContinuousBatchAckTest extends GridCommonAbstractTest implemen
if (gridName.endsWith(CLIENT)) {
cfg.setClientMode(true);
- cfg.setCommunicationSpi(new FailedTcpCommunicationSpi(true));
+ cfg.setCommunicationSpi(new FailedTcpCommunicationSpi(true, false));
}
+ else if (gridName.endsWith(SERVER2))
+ cfg.setCommunicationSpi(new FailedTcpCommunicationSpi(false, true));
else
- cfg.setCommunicationSpi(new FailedTcpCommunicationSpi(false));
+ cfg.setCommunicationSpi(new FailedTcpCommunicationSpi(false, false));
TcpDiscoverySpi disco = new TcpDiscoverySpi();
@@ -112,330 +120,143 @@ public class CacheContinuousBatchAckTest extends GridCommonAbstractTest implemen
super.beforeTest();
fail.set(false);
+
+ filterOn.set(false);
}
/**
* @throws Exception If failed.
*/
public void testPartition() throws Exception {
- QueryCursor qry = null;
-
- try {
- ContinuousQuery q = new ContinuousQuery();
-
- q.setLocalListener(new CacheEntryUpdatedListener() {
- @Override public void onUpdated(Iterable iterable) throws CacheEntryListenerException {
- // No-op.
- }
- });
-
- IgniteCache<Object, Object> cache =
- grid(SERVER).getOrCreateCache(cacheConfiguration(PARTITIONED, 1, ATOMIC, ONHEAP_TIERED));
-
- qry = cache.query(q);
+ checkBackupAcknowledgeMessage(cacheConfiguration(PARTITIONED, 1, ATOMIC, ONHEAP_TIERED, false));
+ }
- for (int i = 0; i < 10000; i++)
- cache.put(i, i);
+ /**
+ * @throws Exception If failed.
+ */
+ public void testPartitionWithFilter() throws Exception {
+ filterOn.set(true);
- assert !GridTestUtils.waitForCondition(new PA() {
- @Override public boolean apply() {
- return fail.get();
- }
- }, 2000L);
- }
- finally {
- if (qry != null)
- qry.close();
- }
+ checkBackupAcknowledgeMessage(cacheConfiguration(PARTITIONED, 1, ATOMIC, ONHEAP_TIERED, true));
}
/**
* @throws Exception If failed.
*/
public void testPartitionNoBackups() throws Exception {
- QueryCursor qry = null;
-
- try {
- ContinuousQuery q = new ContinuousQuery();
-
- q.setLocalListener(new CacheEntryUpdatedListener() {
- @Override public void onUpdated(Iterable iterable) throws CacheEntryListenerException {
- // No-op.
- }
- });
-
- IgniteCache<Object, Object> cache =
- grid(SERVER).getOrCreateCache(cacheConfiguration(PARTITIONED, 0, ATOMIC, ONHEAP_TIERED));
-
- qry = cache.query(q);
-
- for (int i = 0; i < 10000; i++)
- cache.put(i, i);
-
- assert !GridTestUtils.waitForCondition(new PA() {
- @Override public boolean apply() {
- return fail.get();
- }
- }, 2000L);
- }
- finally {
- if (qry != null)
- qry.close();
- }
+ checkBackupAcknowledgeMessage(cacheConfiguration(PARTITIONED, 0, ATOMIC, ONHEAP_TIERED, false));
}
/**
* @throws Exception If failed.
*/
public void testPartitionTx() throws Exception {
- QueryCursor qry = null;
-
- try {
- ContinuousQuery q = new ContinuousQuery();
-
- q.setLocalListener(new CacheEntryUpdatedListener() {
- @Override public void onUpdated(Iterable iterable) throws CacheEntryListenerException {
- // No-op.
- }
- });
-
- IgniteCache<Object, Object> cache =
- grid(SERVER).getOrCreateCache(cacheConfiguration(PARTITIONED, 1, TRANSACTIONAL, ONHEAP_TIERED));
-
- qry = cache.query(q);
+ checkBackupAcknowledgeMessage(cacheConfiguration(PARTITIONED, 1, TRANSACTIONAL, ONHEAP_TIERED, false));
+ }
- for (int i = 0; i < 10000; i++)
- cache.put(i, i);
+ /**
+ * @throws Exception If failed.
+ */
+ public void testPartitionTxWithFilter() throws Exception {
+ filterOn.set(true);
- assert !GridTestUtils.waitForCondition(new PA() {
- @Override public boolean apply() {
- return fail.get();
- }
- }, 2000L);
- }
- finally {
- if (qry != null)
- qry.close();
- }
+ checkBackupAcknowledgeMessage(cacheConfiguration(PARTITIONED, 1, TRANSACTIONAL, ONHEAP_TIERED, true));
}
/**
* @throws Exception If failed.
*/
public void testPartitionTxNoBackup() throws Exception {
- QueryCursor qry = null;
-
- try {
- ContinuousQuery q = new ContinuousQuery();
-
- q.setLocalListener(new CacheEntryUpdatedListener() {
- @Override public void onUpdated(Iterable iterable) throws CacheEntryListenerException {
- // No-op.
- }
- });
-
- IgniteCache<Object, Object> cache =
- grid(SERVER).getOrCreateCache(cacheConfiguration(PARTITIONED, 0, TRANSACTIONAL, ONHEAP_TIERED));
-
- qry = cache.query(q);
+ checkBackupAcknowledgeMessage(cacheConfiguration(PARTITIONED, 0, TRANSACTIONAL, ONHEAP_TIERED, false));
+ }
- for (int i = 0; i < 10000; i++)
- cache.put(i, i);
+ /**
+ * @throws Exception If failed.
+ */
+ public void testPartitionTxNoBackupWithFilter() throws Exception {
+ filterOn.set(true);
- assert !GridTestUtils.waitForCondition(new PA() {
- @Override public boolean apply() {
- return fail.get();
- }
- }, 2000L);
- }
- finally {
- if (qry != null)
- qry.close();
- }
+ checkBackupAcknowledgeMessage(cacheConfiguration(PARTITIONED, 0, TRANSACTIONAL, ONHEAP_TIERED, true));
}
/**
* @throws Exception If failed.
*/
public void testPartitionOffheap() throws Exception {
- QueryCursor qry = null;
-
- try {
- ContinuousQuery q = new ContinuousQuery();
-
- q.setLocalListener(new CacheEntryUpdatedListener() {
- @Override public void onUpdated(Iterable iterable) throws CacheEntryListenerException {
- // No-op.
- }
- });
-
- IgniteCache<Object, Object> cache =
- grid(SERVER).getOrCreateCache(cacheConfiguration(PARTITIONED, 1, ATOMIC, OFFHEAP_TIERED));
-
- qry = cache.query(q);
-
- for (int i = 0; i < 10000; i++)
- cache.put(i, i);
+ checkBackupAcknowledgeMessage(cacheConfiguration(PARTITIONED, 1, ATOMIC, OFFHEAP_TIERED, false));
+ }
- assert !GridTestUtils.waitForCondition(new PA() {
- @Override public boolean apply() {
- return fail.get();
- }
- }, 2000L);
- }
- finally {
+ /**
+ * @throws Exception If failed.
+ */
+ public void testPartitionOffheapWithFilter() throws Exception {
+ filterOn.set(true);
- if (qry != null)
- qry.close();
- }
+ checkBackupAcknowledgeMessage(cacheConfiguration(PARTITIONED, 1, ATOMIC, OFFHEAP_TIERED, true));
}
/**
* @throws Exception If failed.
*/
public void testPartitionTxOffheap() throws Exception {
- QueryCursor qry = null;
-
- try {
- ContinuousQuery q = new ContinuousQuery();
-
- q.setLocalListener(new CacheEntryUpdatedListener() {
- @Override public void onUpdated(Iterable iterable) throws CacheEntryListenerException {
- // No-op.
- }
- });
-
- IgniteCache<Object, Object> cache =
- grid(SERVER).getOrCreateCache(cacheConfiguration(PARTITIONED, 1, TRANSACTIONAL, OFFHEAP_TIERED));
-
- qry = cache.query(q);
-
- for (int i = 0; i < 10000; i++)
- cache.put(i, i);
-
- assert !GridTestUtils.waitForCondition(new PA() {
- @Override public boolean apply() {
- return fail.get();
- }
- }, 2000L);
- }
- finally {
- if (qry != null)
- qry.close();
- }
+ checkBackupAcknowledgeMessage(cacheConfiguration(PARTITIONED, 1, TRANSACTIONAL, OFFHEAP_TIERED, false));
}
/**
* @throws Exception If failed.
*/
public void testReplicated() throws Exception {
- QueryCursor qry = null;
-
- try {
- ContinuousQuery q = new ContinuousQuery();
-
- q.setLocalListener(new CacheEntryUpdatedListener() {
- @Override public void onUpdated(Iterable iterable) throws CacheEntryListenerException {
- // No-op.
- }
- });
-
- IgniteCache<Object, Object> cache =
- grid(SERVER).getOrCreateCache(cacheConfiguration(REPLICATED, 1, ATOMIC, ONHEAP_TIERED));
-
- qry = cache.query(q);
-
- for (int i = 0; i < 10000; i++)
- cache.put(i, i);
-
- assert !GridTestUtils.waitForCondition(new PA() {
- @Override public boolean apply() {
- return fail.get();
- }
- }, 2000L);
- }
- finally {
- if (qry != null)
- qry.close();
- }
+ checkBackupAcknowledgeMessage(cacheConfiguration(REPLICATED, 1, ATOMIC, ONHEAP_TIERED, false));
}
/**
* @throws Exception If failed.
*/
public void testReplicatedTx() throws Exception {
- QueryCursor qry = null;
-
- try {
- ContinuousQuery q = new ContinuousQuery();
-
- q.setLocalListener(new CacheEntryUpdatedListener() {
- @Override public void onUpdated(Iterable iterable) throws CacheEntryListenerException {
- // No-op.
- }
- });
-
- IgniteCache<Object, Object> cache =
- grid(SERVER).getOrCreateCache(cacheConfiguration(REPLICATED, 1, ATOMIC, ONHEAP_TIERED));
-
- qry = cache.query(q);
+ checkBackupAcknowledgeMessage(cacheConfiguration(REPLICATED, 1, TRANSACTIONAL, ONHEAP_TIERED, false));
+ }
- for (int i = 0; i < 10000; i++)
- cache.put(i, i);
+ /**
+ * @throws Exception If failed.
+ */
+ public void testReplicatedTxWithFilter() throws Exception {
+ filterOn.set(true);
- assert !GridTestUtils.waitForCondition(new PA() {
- @Override public boolean apply() {
- return fail.get();
- }
- }, 2000L);
- }
- finally {
- if (qry != null)
- qry.close();
- }
+ checkBackupAcknowledgeMessage(cacheConfiguration(REPLICATED, 1, TRANSACTIONAL, ONHEAP_TIERED, true));
}
/**
* @throws Exception If failed.
*/
public void testReplicatedOffheap() throws Exception {
- QueryCursor qry = null;
-
- try {
- ContinuousQuery q = new ContinuousQuery();
-
- q.setLocalListener(new CacheEntryUpdatedListener() {
- @Override public void onUpdated(Iterable iterable) throws CacheEntryListenerException {
- // No-op.
- }
- });
-
- IgniteCache<Object, Object> cache =
- grid(SERVER).getOrCreateCache(cacheConfiguration(REPLICATED, 1, ATOMIC, OFFHEAP_TIERED));
+ checkBackupAcknowledgeMessage(cacheConfiguration(REPLICATED, 1, ATOMIC, OFFHEAP_TIERED, false));
+ }
- qry = cache.query(q);
+ /**
+ * @throws Exception If failed.
+ */
+ public void testReplicatedTxOffheap() throws Exception {
+ checkBackupAcknowledgeMessage(cacheConfiguration(REPLICATED, 1, TRANSACTIONAL, OFFHEAP_TIERED, false));
+ }
- for (int i = 0; i < 10000; i++)
- cache.put(i, i);
+ /**
+ * @throws Exception If failed.
+ */
+ public void testReplicatedTxOffheapWithFilter() throws Exception {
+ filterOn.set(true);
- assert !GridTestUtils.waitForCondition(new PA() {
- @Override public boolean apply() {
- return fail.get();
- }
- }, 2000L);
- }
- finally {
- if (qry != null)
- qry.close();
- }
+ checkBackupAcknowledgeMessage(cacheConfiguration(REPLICATED, 1, TRANSACTIONAL, OFFHEAP_TIERED, true));
}
/**
+ * @param ccfg Cache configuration.
* @throws Exception If failed.
*/
- public void testReplicatedTxOffheap() throws Exception {
+ private void checkBackupAcknowledgeMessage(CacheConfiguration<Object, Object> ccfg) throws Exception {
QueryCursor qry = null;
+ IgniteCache<Object, Object> cache = null;
+
try {
ContinuousQuery q = new ContinuousQuery();
@@ -445,8 +266,7 @@ public class CacheContinuousBatchAckTest extends GridCommonAbstractTest implemen
}
});
- IgniteCache<Object, Object> cache =
- grid(SERVER).getOrCreateCache(cacheConfiguration(REPLICATED, 1, ATOMIC, ONHEAP_TIERED));
+ cache = grid(SERVER).getOrCreateCache(ccfg);
qry = cache.query(q);
@@ -457,11 +277,14 @@ public class CacheContinuousBatchAckTest extends GridCommonAbstractTest implemen
@Override public boolean apply() {
return fail.get();
}
- }, 2000L);
+ }, 1300L);
}
finally {
if (qry != null)
qry.close();
+
+ if (cache != null)
+ grid(SERVER).destroyCache(cache.getName());
}
}
@@ -471,13 +294,14 @@ public class CacheContinuousBatchAckTest extends GridCommonAbstractTest implemen
* @param backups Number of backups.
* @param atomicityMode Cache atomicity mode.
* @param memoryMode Cache memory mode.
+ * @param filter Filter enabled.
* @return Cache configuration.
*/
private CacheConfiguration<Object, Object> cacheConfiguration(
CacheMode cacheMode,
int backups,
CacheAtomicityMode atomicityMode,
- CacheMemoryMode memoryMode) {
+ CacheMemoryMode memoryMode, boolean filter) {
CacheConfiguration<Object, Object> ccfg = new CacheConfiguration<>();
ccfg.setAtomicityMode(atomicityMode);
@@ -488,6 +312,13 @@ public class CacheContinuousBatchAckTest extends GridCommonAbstractTest implemen
if (cacheMode == PARTITIONED)
ccfg.setBackups(backups);
+ if (filter)
+ ccfg.setNodeFilter(new P1<ClusterNode>() {
+ @Override public boolean apply(ClusterNode node) {
+ return !node.attributes().get(ATTR_GRID_NAME).equals(SERVER2);
+ }
+ });
+
return ccfg;
}
@@ -498,16 +329,21 @@ public class CacheContinuousBatchAckTest extends GridCommonAbstractTest implemen
/** */
private boolean check;
+ /** */
+ private boolean periodicCheck;
+
/**
- * @param check Check inbound message.
+ * @param alwaysCheck Always check inbound message.
+ * @param periodicCheck Check when {@code filterOn} enabled.
*/
- public FailedTcpCommunicationSpi(boolean check) {
- this.check = check;
+ public FailedTcpCommunicationSpi(boolean alwaysCheck, boolean periodicCheck) {
+ this.check = alwaysCheck;
+ this.periodicCheck = periodicCheck;
}
/** {@inheritDoc} */
@Override protected void notifyListener(UUID sndId, Message msg, IgniteRunnable msgC) {
- if (check) {
+ if (check || (periodicCheck && filterOn.get())) {
if (msg instanceof GridIoMessage &&
((GridIoMessage)msg).message() instanceof CacheContinuousQueryBatchAck)
fail.set(true);
http://git-wip-us.apache.org/repos/asf/ignite/blob/0eb91541/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousBatchForceServerModeAckTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousBatchForceServerModeAckTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousBatchForceServerModeAckTest.java
new file mode 100644
index 0000000..f1794fa
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousBatchForceServerModeAckTest.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.query.continuous;
+
+import java.io.Serializable;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+
+/**
+ * Continuous queries tests.
+ */
+public class CacheContinuousBatchForceServerModeAckTest extends CacheContinuousBatchAckTest implements Serializable {
+ /** IP finder. */
+ private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true);
+
+ /** {@inheritDoc} */
+ @SuppressWarnings("unchecked")
+ @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+ if (gridName.endsWith(CLIENT)) {
+ cfg.setClientMode(true);
+
+ FailedTcpCommunicationSpi spi = new FailedTcpCommunicationSpi(true, false);
+
+ cfg.setCommunicationSpi(spi);
+
+ TcpDiscoverySpi disco = new TcpDiscoverySpi();
+
+ disco.setForceServerMode(true);
+
+ disco.setIpFinder(IP_FINDER);
+
+ cfg.setDiscoverySpi(disco);
+ }
+ else if (gridName.endsWith(SERVER2)) {
+ cfg.setCommunicationSpi(new FailedTcpCommunicationSpi(false, true));
+
+ TcpDiscoverySpi disco = new TcpDiscoverySpi();
+
+ disco.setIpFinder(IP_FINDER);
+
+ cfg.setDiscoverySpi(disco);
+ }
+ else {
+ cfg.setCommunicationSpi(new FailedTcpCommunicationSpi(false, false));
+
+ TcpDiscoverySpi disco = new TcpDiscoverySpi();
+
+ disco.setIpFinder(IP_FINDER);
+
+ cfg.setDiscoverySpi(disco);
+ }
+
+ TcpDiscoverySpi disco = new TcpDiscoverySpi();
+
+ disco.setIpFinder(IP_FINDER);
+
+ cfg.setDiscoverySpi(disco);
+
+ return cfg;
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/0eb91541/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousCacheFilterBatchAckTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousCacheFilterBatchAckTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousCacheFilterBatchAckTest.java
deleted file mode 100644
index fb094b9..0000000
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousCacheFilterBatchAckTest.java
+++ /dev/null
@@ -1,531 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.cache.query.continuous;
-
-import java.io.Serializable;
-import java.util.UUID;
-import java.util.concurrent.atomic.AtomicBoolean;
-import javax.cache.event.CacheEntryListenerException;
-import javax.cache.event.CacheEntryUpdatedListener;
-import org.apache.ignite.IgniteCache;
-import org.apache.ignite.cache.CacheAtomicityMode;
-import org.apache.ignite.cache.CacheMemoryMode;
-import org.apache.ignite.cache.CacheMode;
-import org.apache.ignite.cache.query.ContinuousQuery;
-import org.apache.ignite.cache.query.QueryCursor;
-import org.apache.ignite.cluster.ClusterNode;
-import org.apache.ignite.configuration.CacheConfiguration;
-import org.apache.ignite.configuration.IgniteConfiguration;
-import org.apache.ignite.internal.IgniteNodeAttributes;
-import org.apache.ignite.internal.managers.communication.GridIoMessage;
-import org.apache.ignite.internal.util.typedef.P1;
-import org.apache.ignite.internal.util.typedef.PA;
-import org.apache.ignite.lang.IgniteRunnable;
-import org.apache.ignite.plugin.extensions.communication.Message;
-import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
-import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
-import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
-import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
-import org.apache.ignite.testframework.GridTestUtils;
-import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
-
-import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC;
-import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
-import static org.apache.ignite.cache.CacheMemoryMode.OFFHEAP_TIERED;
-import static org.apache.ignite.cache.CacheMemoryMode.ONHEAP_TIERED;
-import static org.apache.ignite.cache.CacheMode.PARTITIONED;
-import static org.apache.ignite.cache.CacheMode.REPLICATED;
-import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
-import static org.apache.ignite.internal.IgniteNodeAttributes.*;
-
-/**
- * Continuous queries tests.
- */
-public class CacheContinuousCacheFilterBatchAckTest extends GridCommonAbstractTest implements Serializable {
- /** IP finder. */
- private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true);
-
- /** */
- private static final String CLIENT = "_client";
-
- /** */
- private static final String SERVER = "server";
-
- /** */
- private static final String SERVER2 = "server2";
-
- /** */
- private static final AtomicBoolean fail = new AtomicBoolean(false);
-
- /** {@inheritDoc} */
- @SuppressWarnings("unchecked")
- @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
- IgniteConfiguration cfg = super.getConfiguration(gridName);
-
- if (gridName.endsWith(CLIENT)) {
- cfg.setClientMode(true);
-
- cfg.setCommunicationSpi(new FailedTcpCommunicationSpi(true));
- }
- else if (gridName.endsWith(SERVER2))
- cfg.setCommunicationSpi(new FailedTcpCommunicationSpi(true));
- else
- cfg.setCommunicationSpi(new FailedTcpCommunicationSpi(false));
-
- TcpDiscoverySpi disco = new TcpDiscoverySpi();
-
- disco.setIpFinder(IP_FINDER);
-
- cfg.setDiscoverySpi(disco);
-
- return cfg;
- }
-
- /** {@inheritDoc} */
- @Override protected void beforeTestsStarted() throws Exception {
- super.beforeTestsStarted();
-
- startGrid(SERVER);
- startGrid(SERVER2);
- startGrid("1" + CLIENT);
- startGrid("2" + CLIENT);
- }
-
- /** {@inheritDoc} */
- @Override protected void afterTestsStopped() throws Exception {
- super.afterTestsStopped();
-
- stopAllGrids();
- }
-
- /** {@inheritDoc} */
- @Override protected void beforeTest() throws Exception {
- super.beforeTest();
-
- fail.set(false);
- }
-
- /**
- * @throws Exception If failed.
- */
- public void testPartition() throws Exception {
- QueryCursor qry = null;
-
- try {
- ContinuousQuery q = new ContinuousQuery();
-
- q.setLocalListener(new CacheEntryUpdatedListener() {
- @Override public void onUpdated(Iterable iterable) throws CacheEntryListenerException {
- // No-op.
- }
- });
-
- IgniteCache<Object, Object> cache =
- grid(SERVER).getOrCreateCache(cacheConfiguration(PARTITIONED, 1, ATOMIC, ONHEAP_TIERED));
-
- qry = cache.query(q);
-
- for (int i = 0; i < 10000; i++)
- cache.put(i, i);
-
- assert !GridTestUtils.waitForCondition(new PA() {
- @Override public boolean apply() {
- return fail.get();
- }
- }, 2000L);
- }
- finally {
- if (qry != null)
- qry.close();
- }
- }
-
- /**
- * @throws Exception If failed.
- */
- public void testPartitionNoBackups() throws Exception {
- QueryCursor qry = null;
-
- try {
- ContinuousQuery q = new ContinuousQuery();
-
- q.setLocalListener(new CacheEntryUpdatedListener() {
- @Override public void onUpdated(Iterable iterable) throws CacheEntryListenerException {
- // No-op.
- }
- });
-
- IgniteCache<Object, Object> cache =
- grid(SERVER).getOrCreateCache(cacheConfiguration(PARTITIONED, 0, ATOMIC, ONHEAP_TIERED));
-
- qry = cache.query(q);
-
- for (int i = 0; i < 10000; i++)
- cache.put(i, i);
-
- assert !GridTestUtils.waitForCondition(new PA() {
- @Override public boolean apply() {
- return fail.get();
- }
- }, 2000L);
- }
- finally {
- if (qry != null)
- qry.close();
- }
- }
-
- /**
- * @throws Exception If failed.
- */
- public void testPartitionTx() throws Exception {
- QueryCursor qry = null;
-
- try {
- ContinuousQuery q = new ContinuousQuery();
-
- q.setLocalListener(new CacheEntryUpdatedListener() {
- @Override public void onUpdated(Iterable iterable) throws CacheEntryListenerException {
- // No-op.
- }
- });
-
- IgniteCache<Object, Object> cache =
- grid(SERVER).getOrCreateCache(cacheConfiguration(PARTITIONED, 1, TRANSACTIONAL, ONHEAP_TIERED));
-
- qry = cache.query(q);
-
- for (int i = 0; i < 10000; i++)
- cache.put(i, i);
-
- assert !GridTestUtils.waitForCondition(new PA() {
- @Override public boolean apply() {
- return fail.get();
- }
- }, 2000L);
- }
- finally {
- if (qry != null)
- qry.close();
- }
- }
-
- /**
- * @throws Exception If failed.
- */
- public void testPartitionTxNoBackup() throws Exception {
- QueryCursor qry = null;
-
- try {
- ContinuousQuery q = new ContinuousQuery();
-
- q.setLocalListener(new CacheEntryUpdatedListener() {
- @Override public void onUpdated(Iterable iterable) throws CacheEntryListenerException {
- // No-op.
- }
- });
-
- IgniteCache<Object, Object> cache =
- grid(SERVER).getOrCreateCache(cacheConfiguration(PARTITIONED, 0, TRANSACTIONAL, ONHEAP_TIERED));
-
- qry = cache.query(q);
-
- for (int i = 0; i < 10000; i++)
- cache.put(i, i);
-
- assert !GridTestUtils.waitForCondition(new PA() {
- @Override public boolean apply() {
- return fail.get();
- }
- }, 2000L);
- }
- finally {
- if (qry != null)
- qry.close();
- }
- }
-
- /**
- * @throws Exception If failed.
- */
- public void testPartitionOffheap() throws Exception {
- QueryCursor qry = null;
-
- try {
- ContinuousQuery q = new ContinuousQuery();
-
- q.setLocalListener(new CacheEntryUpdatedListener() {
- @Override public void onUpdated(Iterable iterable) throws CacheEntryListenerException {
- // No-op.
- }
- });
-
- IgniteCache<Object, Object> cache =
- grid(SERVER).getOrCreateCache(cacheConfiguration(PARTITIONED, 1, ATOMIC, OFFHEAP_TIERED));
-
- qry = cache.query(q);
-
- for (int i = 0; i < 10000; i++)
- cache.put(i, i);
-
- assert !GridTestUtils.waitForCondition(new PA() {
- @Override public boolean apply() {
- return fail.get();
- }
- }, 2000L);
- }
- finally {
-
- if (qry != null)
- qry.close();
- }
- }
-
- /**
- * @throws Exception If failed.
- */
- public void testPartitionTxOffheap() throws Exception {
- QueryCursor qry = null;
-
- try {
- ContinuousQuery q = new ContinuousQuery();
-
- q.setLocalListener(new CacheEntryUpdatedListener() {
- @Override public void onUpdated(Iterable iterable) throws CacheEntryListenerException {
- // No-op.
- }
- });
-
- IgniteCache<Object, Object> cache =
- grid(SERVER).getOrCreateCache(cacheConfiguration(PARTITIONED, 1, TRANSACTIONAL, OFFHEAP_TIERED));
-
- qry = cache.query(q);
-
- for (int i = 0; i < 10000; i++)
- cache.put(i, i);
-
- assert !GridTestUtils.waitForCondition(new PA() {
- @Override public boolean apply() {
- return fail.get();
- }
- }, 2000L);
- }
- finally {
- if (qry != null)
- qry.close();
- }
- }
-
- /**
- * @throws Exception If failed.
- */
- public void testReplicated() throws Exception {
- QueryCursor qry = null;
-
- try {
- ContinuousQuery q = new ContinuousQuery();
-
- q.setLocalListener(new CacheEntryUpdatedListener() {
- @Override public void onUpdated(Iterable iterable) throws CacheEntryListenerException {
- // No-op.
- }
- });
-
- IgniteCache<Object, Object> cache =
- grid(SERVER).getOrCreateCache(cacheConfiguration(REPLICATED, 1, ATOMIC, ONHEAP_TIERED));
-
- qry = cache.query(q);
-
- for (int i = 0; i < 10000; i++)
- cache.put(i, i);
-
- assert !GridTestUtils.waitForCondition(new PA() {
- @Override public boolean apply() {
- return fail.get();
- }
- }, 2000L);
- }
- finally {
- if (qry != null)
- qry.close();
- }
- }
-
- /**
- * @throws Exception If failed.
- */
- public void testReplicatedTx() throws Exception {
- QueryCursor qry = null;
-
- try {
- ContinuousQuery q = new ContinuousQuery();
-
- q.setLocalListener(new CacheEntryUpdatedListener() {
- @Override public void onUpdated(Iterable iterable) throws CacheEntryListenerException {
- // No-op.
- }
- });
-
- IgniteCache<Object, Object> cache =
- grid(SERVER).getOrCreateCache(cacheConfiguration(REPLICATED, 1, ATOMIC, ONHEAP_TIERED));
-
- qry = cache.query(q);
-
- for (int i = 0; i < 10000; i++)
- cache.put(i, i);
-
- assert !GridTestUtils.waitForCondition(new PA() {
- @Override public boolean apply() {
- return fail.get();
- }
- }, 2000L);
- }
- finally {
- if (qry != null)
- qry.close();
- }
- }
-
- /**
- * @throws Exception If failed.
- */
- public void testReplicatedOffheap() throws Exception {
- QueryCursor qry = null;
-
- try {
- ContinuousQuery q = new ContinuousQuery();
-
- q.setLocalListener(new CacheEntryUpdatedListener() {
- @Override public void onUpdated(Iterable iterable) throws CacheEntryListenerException {
- // No-op.
- }
- });
-
- IgniteCache<Object, Object> cache =
- grid(SERVER).getOrCreateCache(cacheConfiguration(REPLICATED, 1, ATOMIC, OFFHEAP_TIERED));
-
- qry = cache.query(q);
-
- for (int i = 0; i < 10000; i++)
- cache.put(i, i);
-
- assert !GridTestUtils.waitForCondition(new PA() {
- @Override public boolean apply() {
- return fail.get();
- }
- }, 2000L);
- }
- finally {
- if (qry != null)
- qry.close();
- }
- }
-
- /**
- * @throws Exception If failed.
- */
- public void testReplicatedTxOffheap() throws Exception {
- QueryCursor qry = null;
-
- try {
- ContinuousQuery q = new ContinuousQuery();
-
- q.setLocalListener(new CacheEntryUpdatedListener() {
- @Override public void onUpdated(Iterable iterable) throws CacheEntryListenerException {
- // No-op.
- }
- });
-
- IgniteCache<Object, Object> cache =
- grid(SERVER).getOrCreateCache(cacheConfiguration(REPLICATED, 1, ATOMIC, ONHEAP_TIERED));
-
- qry = cache.query(q);
-
- for (int i = 0; i < 10000; i++)
- cache.put(i, i);
-
- assert !GridTestUtils.waitForCondition(new PA() {
- @Override public boolean apply() {
- return fail.get();
- }
- }, 2000L);
- }
- finally {
- if (qry != null)
- qry.close();
- }
- }
-
- /**
- *
- * @param cacheMode Cache mode.
- * @param backups Number of backups.
- * @param atomicityMode Cache atomicity mode.
- * @param memoryMode Cache memory mode.
- * @return Cache configuration.
- */
- private CacheConfiguration<Object, Object> cacheConfiguration(
- CacheMode cacheMode,
- int backups,
- CacheAtomicityMode atomicityMode,
- CacheMemoryMode memoryMode) {
- CacheConfiguration<Object, Object> ccfg = new CacheConfiguration<>();
-
- ccfg.setAtomicityMode(atomicityMode);
- ccfg.setCacheMode(cacheMode);
- ccfg.setMemoryMode(memoryMode);
- ccfg.setWriteSynchronizationMode(FULL_SYNC);
-
- ccfg.setNodeFilter(new P1<ClusterNode>() {
- @Override public boolean apply(ClusterNode node) {
- return !node.attributes().get(ATTR_GRID_NAME).equals(SERVER2);
- }
- });
-
- if (cacheMode == PARTITIONED)
- ccfg.setBackups(backups);
-
- return ccfg;
- }
-
- /**
- *
- */
- protected static class FailedTcpCommunicationSpi extends TcpCommunicationSpi {
- /** */
- private boolean check;
-
- /**
- * @param check Check inbound message.
- */
- public FailedTcpCommunicationSpi(boolean check) {
- this.check = check;
- }
-
- /** {@inheritDoc} */
- @Override protected void notifyListener(UUID sndId, Message msg, IgniteRunnable msgC) {
- if (check) {
- if (msg instanceof GridIoMessage &&
- ((GridIoMessage)msg).message() instanceof CacheContinuousQueryBatchAck)
- fail.set(true);
- }
-
- super.notifyListener(sndId, msg, msgC);
- }
- }
-}