You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vo...@apache.org on 2018/10/15 14:42:54 UTC
[2/5] ignite git commit: IGNITE-7953: MVCC: Continuous queries
support. This closes #4767.
http://git-wip-us.apache.org/repos/asf/ignite/blob/51a202a4/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheKeepBinaryIterationTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheKeepBinaryIterationTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheKeepBinaryIterationTest.java
index ce45570..842b6f4 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheKeepBinaryIterationTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheKeepBinaryIterationTest.java
@@ -40,6 +40,7 @@ import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC;
import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
+import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL_SNAPSHOT;
import static org.apache.ignite.cache.CacheMode.PARTITIONED;
import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
@@ -104,6 +105,18 @@ public class CacheKeepBinaryIterationTest extends GridCommonAbstractTest {
/**
* @throws Exception If failed.
*/
+ public void testMvccTxOnHeap() throws Exception {
+ CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED,1, TRANSACTIONAL_SNAPSHOT);
+
+ doTestScanQuery(ccfg, true, true);
+ doTestScanQuery(ccfg, true, false);
+ doTestScanQuery(ccfg, false, true);
+ doTestScanQuery(ccfg, false, false);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
public void testAtomicOnHeapLocalEntries() throws Exception {
CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED, 1, ATOMIC);
@@ -126,6 +139,19 @@ public class CacheKeepBinaryIterationTest extends GridCommonAbstractTest {
}
/**
+ * @throws Exception If failed.
+ */
+ public void testMvccTxOnHeapLocalEntries() throws Exception {
+ CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED, 1, TRANSACTIONAL_SNAPSHOT);
+
+ doTestLocalEntries(ccfg, true, true);
+ doTestLocalEntries(ccfg, true, false);
+ doTestLocalEntries(ccfg, false, true);
+ doTestLocalEntries(ccfg, false, false);
+ }
+
+
+ /**
* @param ccfg Cache configuration.
*/
private void doTestScanQuery(final CacheConfiguration<Object, Object> ccfg, boolean keepBinary,
http://git-wip-us.apache.org/repos/asf/ignite/blob/51a202a4/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/ClientReconnectContinuousQueryTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/ClientReconnectContinuousQueryTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/ClientReconnectContinuousQueryTest.java
index 9b531c6..9ec25d3 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/ClientReconnectContinuousQueryTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/ClientReconnectContinuousQueryTest.java
@@ -23,6 +23,7 @@ import java.util.concurrent.TimeUnit;
import javax.cache.event.CacheEntryListenerException;
import javax.cache.event.CacheEntryUpdatedListener;
import org.apache.ignite.IgniteCache;
+import org.apache.ignite.cache.CacheAtomicityMode;
import org.apache.ignite.cache.query.ContinuousQuery;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
@@ -77,6 +78,12 @@ public class ClientReconnectContinuousQueryTest extends GridCommonAbstractTest {
else {
CacheConfiguration ccfg = defaultCacheConfiguration();
+ ccfg.setAtomicityMode(atomicityMode());
+
+ // TODO IGNITE-9530 Remove this clause.
+ if (atomicityMode() == CacheAtomicityMode.TRANSACTIONAL_SNAPSHOT)
+ ccfg.setNearConfiguration(null);
+
cfg.setCacheConfiguration(ccfg);
}
@@ -84,6 +91,13 @@ public class ClientReconnectContinuousQueryTest extends GridCommonAbstractTest {
}
/**
+ * @return Transaction snapshot.
+ */
+ protected CacheAtomicityMode atomicityMode() {
+ return CacheAtomicityMode.TRANSACTIONAL;
+ }
+
+ /**
* Test client reconnect to alive grid.
*
* @throws Exception If failed.
http://git-wip-us.apache.org/repos/asf/ignite/blob/51a202a4/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryAbstractSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryAbstractSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryAbstractSelfTest.java
index 0ace0ba..707ef4a 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryAbstractSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryAbstractSelfTest.java
@@ -78,6 +78,7 @@ import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static java.util.concurrent.TimeUnit.SECONDS;
import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC;
import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
+import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL_SNAPSHOT;
import static org.apache.ignite.cache.CacheMode.LOCAL;
import static org.apache.ignite.cache.CacheMode.PARTITIONED;
import static org.apache.ignite.cache.CacheMode.REPLICATED;
@@ -112,13 +113,18 @@ public abstract class GridCacheContinuousQueryAbstractSelfTest extends GridCommo
cacheCfg.setCacheMode(cacheMode());
cacheCfg.setAtomicityMode(atomicityMode());
- cacheCfg.setNearConfiguration(nearConfiguration());
+ cacheCfg.setLoadPreviousValue(true);
cacheCfg.setRebalanceMode(ASYNC);
cacheCfg.setWriteSynchronizationMode(FULL_SYNC);
- cacheCfg.setCacheStoreFactory(new StoreFactory());
- cacheCfg.setReadThrough(true);
- cacheCfg.setWriteThrough(true);
- cacheCfg.setLoadPreviousValue(true);
+ cacheCfg.setNearConfiguration(nearConfiguration());
+
+ if (atomicityMode() != TRANSACTIONAL_SNAPSHOT) {
+ cacheCfg.setCacheStoreFactory(new StoreFactory()); // TODO IGNITE-8582 enable for tx snapshot.
+ cacheCfg.setReadThrough(true); // TODO IGNITE-8582 enable for tx snapshot.
+ cacheCfg.setWriteThrough(true); // TODO IGNITE-8582 enable for tx snapshot.
+ }
+ else
+ cacheCfg.setIndexedTypes(Integer.class, Integer.class);
cfg.setCacheConfiguration(cacheCfg);
}
@@ -240,6 +246,23 @@ public abstract class GridCacheContinuousQueryAbstractSelfTest extends GridCommo
protected abstract int gridCount();
/**
+ * @param cache Cache.
+ * @param key Key.
+ * @param val Value.
+ */
+ protected void cachePut(IgniteCache cache, Integer key, Integer val) {
+ cache.put(key, val);
+ }
+
+ /**
+ * @param cache Cache.
+ * @param key Key.
+ */
+ protected void cacheRemove(IgniteCache cache, Integer key) {
+ cache.remove(key);
+ }
+
+ /**
* @throws Exception If failed.
*/
@SuppressWarnings("ThrowableResultOfMethodCallIgnored")
@@ -314,13 +337,13 @@ public abstract class GridCacheContinuousQueryAbstractSelfTest extends GridCommo
});
try (QueryCursor<Cache.Entry<Integer, Integer>> ignored = cache.query(qry)) {
- cache.put(1, 1);
- cache.put(2, 2);
- cache.put(3, 3);
+ cachePut(cache,1, 1);
+ cachePut(cache,2, 2);
+ cachePut(cache,3, 3);
- cache.remove(2);
+ cacheRemove(cache, 2);
- cache.put(1, 10);
+ cachePut(cache, 1, 10);
assert latch.await(LATCH_TIMEOUT, MILLISECONDS);
@@ -370,7 +393,7 @@ public abstract class GridCacheContinuousQueryAbstractSelfTest extends GridCommo
try (QueryCursor<Cache.Entry<Integer, Integer>> ignored = cache.query(qry)) {
for (int i = 0; i < 100; i++)
- cache.put(i, i);
+ cachePut(cache, i, i);
}
}
@@ -409,13 +432,13 @@ public abstract class GridCacheContinuousQueryAbstractSelfTest extends GridCommo
for (int i = 0; i < gridCount(); i++) {
IgniteCache<Object, Object> cache0 = grid(i).cache(DEFAULT_CACHE_NAME);
- cache0.put(1, 1);
- cache0.put(2, 2);
- cache0.put(3, 3);
+ cachePut(cache0, 1, 1);
+ cachePut(cache0, 2, 2);
+ cachePut(cache0, 3, 3);
- cache0.remove(1);
- cache0.remove(2);
- cache0.remove(3);
+ cacheRemove(cache0, 1);
+ cacheRemove(cache0, 2);
+ cacheRemove(cache0, 3);
final int iter = i + 1;
@@ -467,7 +490,7 @@ public abstract class GridCacheContinuousQueryAbstractSelfTest extends GridCommo
final int keyCnt = parts * 2;
for (int i = 0; i < parts / 2; i++)
- cache.put(i, i);
+ cachePut(cache, i, i);
for (int i = 0; i < 10; i++) {
if (i % 2 == 0) {
@@ -486,7 +509,7 @@ public abstract class GridCacheContinuousQueryAbstractSelfTest extends GridCommo
QueryCursor<Cache.Entry<Integer, Integer>> qryCur = cache.query(qry);
for (int key = 0; key < keyCnt; key++)
- cache.put(key, key);
+ cachePut(cache, key, key);
try {
assert GridTestUtils.waitForCondition(new PA() {
@@ -501,7 +524,7 @@ public abstract class GridCacheContinuousQueryAbstractSelfTest extends GridCommo
}
else {
for (int key = 0; key < keyCnt; key++)
- cache.put(key, key);
+ cachePut(cache, key, key);
}
}
}
@@ -544,16 +567,16 @@ public abstract class GridCacheContinuousQueryAbstractSelfTest extends GridCommo
});
try (QueryCursor<Cache.Entry<Integer, Integer>> ignored = cache.query(qry)) {
- cache.put(1, 1);
- cache.put(2, 2);
- cache.put(3, 3);
- cache.put(4, 4);
+ cachePut(cache, 1, 1);
+ cachePut(cache, 2, 2);
+ cachePut(cache, 3, 3);
+ cachePut(cache, 4, 4);
- cache.remove(2);
- cache.remove(3);
+ cacheRemove(cache, 2);
+ cacheRemove(cache, 3);
- cache.put(1, 10);
- cache.put(4, 40);
+ cachePut(cache, 1, 10);
+ cachePut(cache, 4, 40);
assert latch.await(LATCH_TIMEOUT, MILLISECONDS);
@@ -631,8 +654,8 @@ public abstract class GridCacheContinuousQueryAbstractSelfTest extends GridCommo
break;
}
- cache.put(locKey, 1);
- cache.put(rmtKey, 2);
+ cachePut(cache, locKey, 1);
+ cachePut(cache, rmtKey, 2);
assert latch.await(LATCH_TIMEOUT, MILLISECONDS);
@@ -706,12 +729,12 @@ public abstract class GridCacheContinuousQueryAbstractSelfTest extends GridCommo
Iterator<Integer> it = keys.iterator();
for (int i = 0; i < 4; i++)
- cache.put(it.next(), 0);
+ cachePut(cache, it.next(), 0);
assert !latch.await(2, SECONDS);
for (int i = 0; i < 2; i++)
- cache.put(it.next(), 0);
+ cachePut(cache, it.next(), 0);
assert latch.await(LATCH_TIMEOUT, MILLISECONDS);
@@ -790,7 +813,7 @@ public abstract class GridCacheContinuousQueryAbstractSelfTest extends GridCommo
}
for (Integer k : keys)
- cache.put(k, 0);
+ cachePut(cache, k, 0);
assert !latch.await(2, SECONDS);
assert latch.await(1000 + LATCH_TIMEOUT, MILLISECONDS);
@@ -832,7 +855,7 @@ public abstract class GridCacheContinuousQueryAbstractSelfTest extends GridCommo
});
for (int i = 0; i < 10; i++)
- cache.put(i, i);
+ cachePut(cache, i, i);
try (QueryCursor<Cache.Entry<Integer, Integer>> cur = cache.query(qry)) {
List<Cache.Entry<Integer, Integer>> res = cur.getAll();
@@ -884,7 +907,7 @@ public abstract class GridCacheContinuousQueryAbstractSelfTest extends GridCommo
});
for (int i = 0; i < 10; i++)
- cache.put(i, i);
+ cachePut(cache, i, i);
try (QueryCursor<Cache.Entry<Integer, Integer>> cur = cache.query(qry)) {
List<Cache.Entry<Integer, Integer>> res = cur.getAll();
@@ -906,8 +929,8 @@ public abstract class GridCacheContinuousQueryAbstractSelfTest extends GridCommo
exp++;
}
- cache.put(10, 10);
- cache.put(11, 11);
+ cachePut(cache, 10, 10);
+ cachePut(cache, 11, 11);
assert latch.await(LATCH_TIMEOUT, MILLISECONDS) : latch.getCount();
@@ -978,8 +1001,8 @@ public abstract class GridCacheContinuousQueryAbstractSelfTest extends GridCommo
try (QueryCursor<Cache.Entry<Object, Object>> ignored = cache.query(qry)) {
cache.put(new GridCacheInternalKeyImpl("test", "test"), 1);
- cache.put(1, 1);
- cache.put(2, 2);
+ cachePut(cache, 1, 1);
+ cachePut(cache, 2, 2);
assert latch.await(LATCH_TIMEOUT, MILLISECONDS);
@@ -1014,7 +1037,7 @@ public abstract class GridCacheContinuousQueryAbstractSelfTest extends GridCommo
log.info("Started node without cache: " + ignite);
}
- cache.put(1, 1);
+ cachePut(cache, 1, 1);
assertTrue(latch.await(5000, MILLISECONDS));
}
@@ -1102,7 +1125,7 @@ public abstract class GridCacheContinuousQueryAbstractSelfTest extends GridCommo
try (QueryCursor<Cache.Entry<Integer, Integer>> ignored = cache.query(qry)) {
for (int i = 0; i < 100; i++)
- cache.put(i, i);
+ cachePut(cache, i, i);
assert latch.await(LATCH_TIMEOUT, MILLISECONDS);
assert execLatch.await(LATCH_TIMEOUT, MILLISECONDS);
@@ -1147,8 +1170,8 @@ public abstract class GridCacheContinuousQueryAbstractSelfTest extends GridCommo
});
try (QueryCursor<Cache.Entry<Object, Object>> ignored = cache.query(qry)) {
- cache.put(1, 1);
- cache.put(2, 2);
+ cachePut(cache, 1, 1);
+ cachePut(cache, 2, 2);
// Wait for expiration.
Thread.sleep(2000);
http://git-wip-us.apache.org/repos/asf/ignite/blob/51a202a4/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryConcurrentTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryConcurrentTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryConcurrentTest.java
index 0241a69..9dca5ea 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryConcurrentTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryConcurrentTest.java
@@ -40,6 +40,7 @@ import org.apache.ignite.cache.CacheWriteSynchronizationMode;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.processors.cache.IgniteCacheProxy;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
import org.apache.ignite.internal.util.future.IgniteFinishedFutureImpl;
import org.apache.ignite.internal.util.future.IgniteFutureImpl;
@@ -106,6 +107,13 @@ public class GridCacheContinuousQueryConcurrentTest extends GridCommonAbstractTe
/**
* @throws Exception If failed.
*/
+ public void testReplicatedMvccTx() throws Exception {
+ testRegistration(cacheConfiguration(CacheMode.REPLICATED, CacheAtomicityMode.TRANSACTIONAL_SNAPSHOT, 1));
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
public void testRestartReplicated() throws Exception {
testRestartRegistration(cacheConfiguration(CacheMode.REPLICATED, CacheAtomicityMode.ATOMIC, 2));
}
@@ -127,6 +135,13 @@ public class GridCacheContinuousQueryConcurrentTest extends GridCommonAbstractTe
/**
* @throws Exception If failed.
*/
+ public void testRestartPartitionMvccTx() throws Exception {
+ testRestartRegistration(cacheConfiguration(CacheMode.PARTITIONED, CacheAtomicityMode.TRANSACTIONAL_SNAPSHOT, 2));
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
public void testReplicatedAtomic() throws Exception {
testRegistration(cacheConfiguration(CacheMode.REPLICATED, CacheAtomicityMode.ATOMIC, 2));
}
@@ -141,6 +156,13 @@ public class GridCacheContinuousQueryConcurrentTest extends GridCommonAbstractTe
/**
* @throws Exception If failed.
*/
+ public void testPartitionMvccTx() throws Exception {
+ testRegistration(cacheConfiguration(CacheMode.PARTITIONED, CacheAtomicityMode.TRANSACTIONAL_SNAPSHOT, 2));
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
public void testPartitionAtomic() throws Exception {
testRegistration(cacheConfiguration(CacheMode.PARTITIONED, CacheAtomicityMode.ATOMIC, 2));
}
@@ -342,17 +364,44 @@ public class GridCacheContinuousQueryConcurrentTest extends GridCommonAbstractTe
// were busy setting up the cache listener.
// Check asynchronously.
// Complete the promise if the key was inserted concurrently.
- cache.getAsync(key).listen(new IgniteInClosure<IgniteFuture<String>>() {
- @Override public void apply(IgniteFuture<String> f) {
- String val = f.get();
+ if (!((IgniteCacheProxy)cache).context().mvccEnabled()) {
+ cache.getAsync(key).listen(new IgniteInClosure<IgniteFuture<String>>() {
+ @Override public void apply(IgniteFuture<String> f) {
+ String val = f.get();
- if (val != null) {
- log.info("Completed by get: " + id);
+ if (val != null) {
+ log.info("Completed by get: " + id);
- (((GridFutureAdapter)((IgniteFutureImpl)promise).internalFuture())).onDone("by get");
+ (((GridFutureAdapter)((IgniteFutureImpl)promise).internalFuture())).onDone("by async get");
+ }
}
- }
- });
+ });
+ }
+ else {
+ // For MVCC caches we need to wait until updated value becomes visible for consequent readers.
+ // When MVCC transaction completes, it's updates are not visible immediately for the new transactions.
+ // This is caused by the lag between transaction completes on the node and mvcc coordinator
+ // removes this transaction from the active list.
+ GridTestUtils.runAsync(new Runnable() {
+ @Override public void run() {
+ String v;
+
+ while (!Thread.currentThread().isInterrupted()) {
+ v = cache.get(key);
+
+ if (v == null)
+ doSleep(100);
+ else {
+ log.info("Completed by async mvcc get: " + id);
+
+ (((GridFutureAdapter)((IgniteFutureImpl)promise).internalFuture())).onDone("by get");
+
+ break;
+ }
+ }
+ }
+ });
+ }
return promise;
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/51a202a4/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryMultiNodesFilteringTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryMultiNodesFilteringTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryMultiNodesFilteringTest.java
index b316042..0eb2e87 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryMultiNodesFilteringTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryMultiNodesFilteringTest.java
@@ -40,6 +40,7 @@ import javax.cache.event.CacheEntryUpdatedListener;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.IgniteException;
+import org.apache.ignite.cache.CacheAtomicityMode;
import org.apache.ignite.cache.query.ContinuousQuery;
import org.apache.ignite.cache.query.QueryCursor;
import org.apache.ignite.cluster.ClusterNode;
@@ -318,11 +319,18 @@ public class GridCacheContinuousQueryMultiNodesFilteringTest extends GridCommonA
return new CacheConfiguration("test-cache-cq")
.setBackups(1)
.setNodeFilter(filter)
- .setAtomicityMode(ATOMIC)
+ .setAtomicityMode(atomicityMode())
.setWriteSynchronizationMode(FULL_SYNC)
.setCacheMode(PARTITIONED);
}
+ /**
+ * @return Atomicity mode.
+ */
+ protected CacheAtomicityMode atomicityMode() {
+ return ATOMIC;
+ }
+
/** */
private final static class ListenerConfiguration extends MutableCacheEntryListenerConfiguration {
/** Operation. */
http://git-wip-us.apache.org/repos/asf/ignite/blob/51a202a4/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryReplicatedTxOneNodeTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryReplicatedTxOneNodeTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryReplicatedTxOneNodeTest.java
index 6474df5..d02c6ff 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryReplicatedTxOneNodeTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryReplicatedTxOneNodeTest.java
@@ -17,9 +17,11 @@
package org.apache.ignite.internal.processors.cache.query.continuous;
+import java.util.Iterator;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
+import javax.cache.Cache;
import javax.cache.event.CacheEntryEvent;
import javax.cache.event.CacheEntryListenerException;
import javax.cache.event.CacheEntryUpdatedListener;
@@ -55,6 +57,10 @@ public class GridCacheContinuousQueryReplicatedTxOneNodeTest extends GridCommonA
cacheCfg.setRebalanceMode(CacheRebalanceMode.SYNC);
cacheCfg.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC);
+ // TODO IGNITE-9530 Remove this clause.
+ if (atomicMode() == CacheAtomicityMode.TRANSACTIONAL_SNAPSHOT)
+ cacheCfg.setNearConfiguration(null);
+
cfg.setCacheConfiguration(cacheCfg);
TcpDiscoverySpi disco = new TcpDiscoverySpi();
@@ -164,7 +170,14 @@ public class GridCacheContinuousQueryReplicatedTxOneNodeTest extends GridCommonA
for (int i = 0; i < 10; i++)
cache.put("key" + i, i);
- cache.clear();
+ if (atomicMode() != CacheAtomicityMode.TRANSACTIONAL_SNAPSHOT)
+ cache.clear();
+ else { // TODO IGNITE-7952. Remove "else" clause - do cache.clear() instead of iteration.
+ for (Iterator it = cache.iterator(); it.hasNext();) {
+ it.next();
+ it.remove();
+ }
+ }
qry.setLocalListener(new CacheEntryUpdatedListener<String, Integer>() {
@Override public void onUpdated(Iterable<CacheEntryEvent<? extends String, ? extends Integer>> evts)
http://git-wip-us.apache.org/repos/asf/ignite/blob/51a202a4/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/IgniteCacheContinuousQueryBackupQueueTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/IgniteCacheContinuousQueryBackupQueueTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/IgniteCacheContinuousQueryBackupQueueTest.java
index 5baa3a7..a8f0d71 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/IgniteCacheContinuousQueryBackupQueueTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/IgniteCacheContinuousQueryBackupQueueTest.java
@@ -30,6 +30,7 @@ import javax.cache.event.CacheEntryEvent;
import javax.cache.event.CacheEntryEventFilter;
import javax.cache.event.CacheEntryUpdatedListener;
import org.apache.ignite.Ignite;
+import org.apache.ignite.cache.CacheAtomicityMode;
import org.apache.ignite.cache.query.ContinuousQuery;
import org.apache.ignite.cache.query.QueryCursor;
import org.apache.ignite.configuration.CacheConfiguration;
@@ -98,6 +99,13 @@ public class IgniteCacheContinuousQueryBackupQueueTest extends GridCommonAbstrac
return cfg;
}
+ /**
+ * @return Atomicity mode.
+ */
+ protected CacheAtomicityMode atomicityMode() {
+ return ATOMIC;
+ }
+
/** {@inheritDoc} */
@Override protected void beforeTest() throws Exception {
super.beforeTest();
http://git-wip-us.apache.org/repos/asf/ignite/blob/51a202a4/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/IgniteCacheContinuousQueryClientTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/IgniteCacheContinuousQueryClientTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/IgniteCacheContinuousQueryClientTest.java
index 1e40170..ab0cb50 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/IgniteCacheContinuousQueryClientTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/IgniteCacheContinuousQueryClientTest.java
@@ -24,6 +24,7 @@ import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.IgniteClientDisconnectedException;
import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.cache.CacheAtomicityMode;
import org.apache.ignite.cache.query.ContinuousQuery;
import org.apache.ignite.cache.query.QueryCursor;
import org.apache.ignite.configuration.CacheConfiguration;
@@ -61,7 +62,7 @@ public class IgniteCacheContinuousQueryClientTest extends GridCommonAbstractTest
CacheConfiguration ccfg = new CacheConfiguration(DEFAULT_CACHE_NAME);
ccfg.setCacheMode(PARTITIONED);
- ccfg.setAtomicityMode(ATOMIC);
+ ccfg.setAtomicityMode(atomicityMode());
ccfg.setWriteSynchronizationMode(FULL_SYNC);
cfg.setCacheConfiguration(ccfg);
@@ -71,6 +72,13 @@ public class IgniteCacheContinuousQueryClientTest extends GridCommonAbstractTest
return cfg;
}
+ /**
+ * @return Atomicity mode.
+ */
+ protected CacheAtomicityMode atomicityMode() {
+ return ATOMIC;
+ }
+
/** {@inheritDoc} */
@Override protected void afterTest() throws Exception {
super.afterTest();
http://git-wip-us.apache.org/repos/asf/ignite/blob/51a202a4/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/IgniteCacheContinuousQueryImmutableEntryTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/IgniteCacheContinuousQueryImmutableEntryTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/IgniteCacheContinuousQueryImmutableEntryTest.java
index 81a7515..4e50cb9 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/IgniteCacheContinuousQueryImmutableEntryTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/IgniteCacheContinuousQueryImmutableEntryTest.java
@@ -25,6 +25,7 @@ import javax.cache.event.CacheEntryEvent;
import javax.cache.event.CacheEntryEventFilter;
import javax.cache.event.CacheEntryUpdatedListener;
import javax.cache.event.EventType;
+import org.apache.ignite.cache.CacheAtomicityMode;
import org.apache.ignite.cache.query.ContinuousQuery;
import org.apache.ignite.cache.query.QueryCursor;
import org.apache.ignite.configuration.CacheConfiguration;
@@ -66,7 +67,7 @@ public class IgniteCacheContinuousQueryImmutableEntryTest extends GridCommonAbst
CacheConfiguration ccfg = new CacheConfiguration(DEFAULT_CACHE_NAME);
ccfg.setCacheMode(PARTITIONED);
- ccfg.setAtomicityMode(ATOMIC);
+ ccfg.setAtomicityMode(atomicityMode());
ccfg.setWriteSynchronizationMode(FULL_SYNC);
cfg.setCacheConfiguration(ccfg);
@@ -76,6 +77,13 @@ public class IgniteCacheContinuousQueryImmutableEntryTest extends GridCommonAbst
return cfg;
}
+ /**
+ * @return Atomicity mode.
+ */
+ protected CacheAtomicityMode atomicityMode() {
+ return ATOMIC;
+ }
+
/** {@inheritDoc} */
@Override protected void afterTest() throws Exception {
super.afterTest();
http://git-wip-us.apache.org/repos/asf/ignite/blob/51a202a4/modules/core/src/test/java/org/apache/ignite/internal/processors/database/CacheFreeListImplSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/database/CacheFreeListImplSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/database/CacheFreeListImplSelfTest.java
index d9804bf..74f80df 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/database/CacheFreeListImplSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/database/CacheFreeListImplSelfTest.java
@@ -480,11 +480,6 @@ public class CacheFreeListImplSelfTest extends GridCommonAbstractTest {
@Override public byte newMvccTxState() {
return 0;
}
-
- /** {@inheritDoc} */
- @Override public boolean isKeyAbsentBefore() {
- return false;
- }
}
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/51a202a4/modules/core/src/test/java/org/apache/ignite/loadtests/hashmap/GridCacheTestContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/loadtests/hashmap/GridCacheTestContext.java b/modules/core/src/test/java/org/apache/ignite/loadtests/hashmap/GridCacheTestContext.java
index 344a1cc..c8cf9aa 100644
--- a/modules/core/src/test/java/org/apache/ignite/loadtests/hashmap/GridCacheTestContext.java
+++ b/modules/core/src/test/java/org/apache/ignite/loadtests/hashmap/GridCacheTestContext.java
@@ -81,6 +81,7 @@ public class GridCacheTestContext<K, V> extends GridCacheContext<K, V> {
new GridCacheSharedTtlCleanupManager(),
new PartitionsEvictManager(),
new CacheNoopJtaManager(),
+ null,
null
),
defaultCacheConfiguration(),
http://git-wip-us.apache.org/repos/asf/ignite/blob/51a202a4/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2Row.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2Row.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2Row.java
index 6246aa5..7fb896f 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2Row.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2Row.java
@@ -127,11 +127,6 @@ public abstract class GridH2Row extends GridH2SearchRowAdapter implements CacheD
}
/** {@inheritDoc} */
- @Override public boolean isKeyAbsentBefore() {
- return row.isKeyAbsentBefore();
- }
-
- /** {@inheritDoc} */
@Override public boolean indexSearchRow() {
return false;
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/51a202a4/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccAbstractContinuousQuerySelfTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccAbstractContinuousQuerySelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccAbstractContinuousQuerySelfTest.java
new file mode 100644
index 0000000..1418d47
--- /dev/null
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccAbstractContinuousQuerySelfTest.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ignite.internal.processors.cache.mvcc;
+
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.configuration.NearCacheConfiguration;
+import org.apache.ignite.internal.processors.cache.query.continuous.GridCacheContinuousQueryAbstractSelfTest;
+
+import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL_SNAPSHOT;
+
+/**
+ *
+ */
+public abstract class CacheMvccAbstractContinuousQuerySelfTest extends GridCacheContinuousQueryAbstractSelfTest {
+ /** {@inheritDoc} */
+ @Override protected CacheAtomicityMode atomicityMode() {
+ return TRANSACTIONAL_SNAPSHOT;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected int gridCount() {
+ return 2;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected NearCacheConfiguration nearConfiguration() {
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void testInternalKey() throws Exception {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
+ public void testExpired() throws Exception {
+ fail("https://issues.apache.org/jira/browse/IGNITE-7311");
+ }
+
+ /** {@inheritDoc} */
+ public void testLoadCache() throws Exception {
+ fail("https://issues.apache.org/jira/browse/IGNITE-7954");
+ }
+
+ /** {@inheritDoc} */
+ public void testEvents() throws Exception {
+ fail("https://issues.apache.org/jira/browse/IGNITE-9321");
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/51a202a4/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccAbstractSqlContinuousQuerySelfTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccAbstractSqlContinuousQuerySelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccAbstractSqlContinuousQuerySelfTest.java
new file mode 100644
index 0000000..96fdf06
--- /dev/null
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccAbstractSqlContinuousQuerySelfTest.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ignite.internal.processors.cache.mvcc;
+
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.cache.query.SqlFieldsQuery;
+import org.apache.ignite.configuration.NearCacheConfiguration;
+import org.apache.ignite.internal.processors.cache.query.continuous.GridCacheContinuousQueryAbstractSelfTest;
+
+import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL_SNAPSHOT;
+
+/**
+ * Base class for MVCC continuous queries.
+ */
+public abstract class CacheMvccAbstractSqlContinuousQuerySelfTest extends CacheMvccAbstractContinuousQuerySelfTest {
+ /** {@inheritDoc} */
+ @Override protected void cachePut(IgniteCache cache, Integer key, Integer val) {
+ cache.query(new SqlFieldsQuery("MERGE INTO Integer (_key, _val) values (" + key + ',' + val + ')')).getAll();
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void cacheRemove(IgniteCache cache, Integer key) {
+ cache.query(new SqlFieldsQuery("DELETE FROM Integer WHERE _key=" + key)).getAll();
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/51a202a4/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccBasicContinuousQueryTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccBasicContinuousQueryTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccBasicContinuousQueryTest.java
new file mode 100644
index 0000000..ed97b1b
--- /dev/null
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccBasicContinuousQueryTest.java
@@ -0,0 +1,230 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ignite.internal.processors.cache.mvcc;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CountDownLatch;
+import javax.cache.Cache;
+import javax.cache.CacheException;
+import javax.cache.event.CacheEntryEvent;
+import javax.cache.event.CacheEntryUpdatedListener;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.cache.CacheMode;
+import org.apache.ignite.cache.query.ContinuousQuery;
+import org.apache.ignite.cache.query.QueryCursor;
+import org.apache.ignite.cache.query.SqlFieldsQuery;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryManager;
+import org.apache.ignite.internal.processors.continuous.GridContinuousProcessor;
+import org.apache.ignite.internal.util.typedef.G;
+import org.apache.ignite.internal.util.typedef.PA;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.transactions.Transaction;
+
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
+import static org.apache.ignite.internal.processors.cache.mvcc.MvccCachingManager.TX_SIZE_THRESHOLD;
+import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC;
+import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ;
+
+/**
+ * Basic continuous queries test with enabled mvcc.
+ */
+public class CacheMvccBasicContinuousQueryTest extends CacheMvccAbstractTest {
+ /** */
+ private static final long LATCH_TIMEOUT = 5000;
+
+ /** {@inheritDoc} */
+ @Override protected CacheMode cacheMode() {
+ return CacheMode.PARTITIONED;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTest() throws Exception {
+ // Wait for all routines are unregistered
+ GridTestUtils.waitForCondition(new PA() {
+ @Override public boolean apply() {
+ for (Ignite node : G.allGrids()) {
+ GridContinuousProcessor proc = ((IgniteEx)node).context().continuous();
+
+ if(((Map)U.field(proc, "rmtInfos")).size() > 0)
+ return false;
+ }
+
+ return true;
+ }
+ }, 3000);
+
+ for (Ignite node : G.allGrids()) {
+ GridContinuousProcessor proc = ((IgniteEx)node).context().continuous();
+
+ assertEquals(1, ((Map)U.field(proc, "locInfos")).size());
+ assertEquals(0, ((Map)U.field(proc, "rmtInfos")).size());
+ assertEquals(0, ((Map)U.field(proc, "startFuts")).size());
+ assertEquals(0, ((Map)U.field(proc, "stopFuts")).size());
+ assertEquals(0, ((Map)U.field(proc, "bufCheckThreads")).size());
+
+ CacheContinuousQueryManager mgr = ((IgniteEx)node).context().cache().internalCache(DEFAULT_CACHE_NAME).context().continuousQueries();
+
+ assertEquals(0, ((Map)U.field(mgr, "lsnrs")).size());
+
+ MvccCachingManager cachingMgr = ((IgniteEx)node).context().cache().context().mvccCaching();
+
+ assertEquals(0, ((Map)U.field(cachingMgr, "enlistCache")).size());
+ assertEquals(0, ((Map)U.field(cachingMgr, "cntrs")).size());
+ }
+
+ super.afterTest();
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testAllEntries() throws Exception {
+ Ignite node = startGrids(3);
+
+ final IgniteCache cache = node.createCache(
+ cacheConfiguration(cacheMode(), FULL_SYNC, 1, 2)
+ .setCacheMode(CacheMode.REPLICATED)
+ .setIndexedTypes(Integer.class, Integer.class));
+
+ ContinuousQuery<Integer, Integer> qry = new ContinuousQuery<>();
+
+ final Map<Integer, List<Integer>> map = new HashMap<>();
+ final CountDownLatch latch = new CountDownLatch(5);
+
+ qry.setLocalListener(new CacheEntryUpdatedListener<Integer, Integer>() {
+ @Override public void onUpdated(Iterable<CacheEntryEvent<? extends Integer, ? extends Integer>> evts) {
+ for (CacheEntryEvent<? extends Integer, ? extends Integer> e : evts) {
+ synchronized (map) {
+ List<Integer> vals = map.get(e.getKey());
+
+ if (vals == null) {
+ vals = new ArrayList<>();
+
+ map.put(e.getKey(), vals);
+ }
+
+ vals.add(e.getValue());
+ }
+
+ latch.countDown();
+ }
+ }
+ });
+
+ try (QueryCursor<Cache.Entry<Integer, Integer>> ignored = cache.query(qry)) {
+
+ try (Transaction tx = node.transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) {
+ String dml = "INSERT INTO Integer (_key, _val) values (1,1),(2,2)";
+
+ cache.query(new SqlFieldsQuery(dml)).getAll();
+
+ tx.commit();
+ }
+
+ try (Transaction tx = node.transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) {
+ String dml1 = "MERGE INTO Integer (_key, _val) values (3,3)";
+
+ cache.query(new SqlFieldsQuery(dml1)).getAll();
+
+ String dml2 = "DELETE FROM Integer WHERE _key = 2";
+
+ cache.query(new SqlFieldsQuery(dml2)).getAll();
+
+ String dml3 = "UPDATE Integer SET _val = 10 WHERE _key = 1";
+
+ cache.query(new SqlFieldsQuery(dml3)).getAll();
+
+ tx.commit();
+ }
+
+ try (Transaction tx = node.transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) {
+ String dml = "INSERT INTO Integer (_key, _val) values (4,4),(5,5)";
+
+ cache.query(new SqlFieldsQuery(dml)).getAll();
+
+ tx.rollback();
+ }
+
+ assert latch.await(LATCH_TIMEOUT, MILLISECONDS);
+
+ assertEquals(3, map.size());
+
+ List<Integer> vals = map.get(1);
+
+ assertNotNull(vals);
+ assertEquals(2, vals.size());
+ assertEquals(1, (int)vals.get(0));
+ assertEquals(10, (int)vals.get(1));
+
+ vals = map.get(2);
+
+ assertNotNull(vals);
+ assertEquals(2, vals.size());
+ assertEquals(2, (int)vals.get(0));
+ assertEquals(2, (int)vals.get(1));
+
+ vals = map.get(3);
+
+ assertNotNull(vals);
+ assertEquals(1, vals.size());
+ assertEquals(3, (int)vals.get(0));
+ }
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testCachingMaxSize() throws Exception {
+ Ignite node = startGrids(1);
+
+ final IgniteCache cache = node.createCache(
+ cacheConfiguration(cacheMode(), FULL_SYNC, 1, 2)
+ .setCacheMode(CacheMode.PARTITIONED)
+ .setIndexedTypes(Integer.class, Integer.class));
+
+ ContinuousQuery<Integer, Integer> qry = new ContinuousQuery<>();
+
+ qry.setLocalListener(new CacheEntryUpdatedListener<Integer, Integer>() {
+ @Override public void onUpdated(Iterable<CacheEntryEvent<? extends Integer, ? extends Integer>> evts) {
+ // No-op.
+ }
+ });
+
+ GridTestUtils.assertThrows(log, new Callable<Object>() {
+ @Override public Object call() throws Exception {
+ try (QueryCursor<Cache.Entry<Integer, Integer>> ignored = cache.query(qry)) {
+ try (Transaction tx = node.transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) {
+ for (int i = 0; i < TX_SIZE_THRESHOLD + 1; i++)
+ cache.query(new SqlFieldsQuery("INSERT INTO Integer (_key, _val) values (" + i + ", 1)")).getAll();
+
+ tx.commit();
+ }
+ }
+
+ return null;
+ }
+ }, CacheException.class, "Failed to run update. Transaction is too large. Consider reducing transaction size");
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/51a202a4/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccClientReconnectContinuousQueryTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccClientReconnectContinuousQueryTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccClientReconnectContinuousQueryTest.java
new file mode 100644
index 0000000..33e0960
--- /dev/null
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccClientReconnectContinuousQueryTest.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ignite.internal.processors.cache.mvcc;
+
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.internal.processors.cache.query.continuous.ClientReconnectContinuousQueryTest;
+
+/**
+ *
+ */
+public class CacheMvccClientReconnectContinuousQueryTest extends ClientReconnectContinuousQueryTest {
+ /** {@inheritDoc} */
+ @Override protected CacheAtomicityMode atomicityMode() {
+ return CacheAtomicityMode.TRANSACTIONAL_SNAPSHOT;
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/51a202a4/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccContinuousQueryBackupQueueTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccContinuousQueryBackupQueueTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccContinuousQueryBackupQueueTest.java
new file mode 100644
index 0000000..3a598a2
--- /dev/null
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccContinuousQueryBackupQueueTest.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ignite.internal.processors.cache.mvcc;
+
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.internal.processors.cache.query.continuous.IgniteCacheContinuousQueryBackupQueueTest;
+
+/**
+ *
+ */
+public class CacheMvccContinuousQueryBackupQueueTest extends IgniteCacheContinuousQueryBackupQueueTest {
+ /** {@inheritDoc} */
+ @Override protected CacheAtomicityMode atomicityMode() {
+ return CacheAtomicityMode.TRANSACTIONAL_SNAPSHOT;
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/51a202a4/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccContinuousQueryClientReconnectTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccContinuousQueryClientReconnectTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccContinuousQueryClientReconnectTest.java
new file mode 100644
index 0000000..4c4c95b
--- /dev/null
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccContinuousQueryClientReconnectTest.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ignite.internal.processors.cache.mvcc;
+
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.internal.processors.cache.query.continuous.IgniteCacheContinuousQueryClientReconnectTest;
+
+import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL_SNAPSHOT;
+
+/**
+ * Mvcc CQ client reconnect test.
+ */
+public class CacheMvccContinuousQueryClientReconnectTest extends IgniteCacheContinuousQueryClientReconnectTest {
+ /** {@inheritDoc} */
+ @Override protected CacheAtomicityMode atomicMode() {
+ return TRANSACTIONAL_SNAPSHOT;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/51a202a4/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccContinuousQueryClientTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccContinuousQueryClientTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccContinuousQueryClientTest.java
new file mode 100644
index 0000000..5c6c7a8
--- /dev/null
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccContinuousQueryClientTest.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ignite.internal.processors.cache.mvcc;
+
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.internal.processors.cache.query.continuous.IgniteCacheContinuousQueryClientTest;
+
+/**
+ * Mvcc CQ client test.
+ */
+public class CacheMvccContinuousQueryClientTest extends IgniteCacheContinuousQueryClientTest {
+ /** {@inheritDoc} */
+ @Override protected CacheAtomicityMode atomicityMode() {
+ return CacheAtomicityMode.TRANSACTIONAL_SNAPSHOT;
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/51a202a4/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccContinuousQueryImmutableEntryTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccContinuousQueryImmutableEntryTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccContinuousQueryImmutableEntryTest.java
new file mode 100644
index 0000000..bef9c70
--- /dev/null
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccContinuousQueryImmutableEntryTest.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ignite.internal.processors.cache.mvcc;
+
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.internal.processors.cache.query.continuous.IgniteCacheContinuousQueryImmutableEntryTest;
+
+/**
+ *
+ */
+public class CacheMvccContinuousQueryImmutableEntryTest extends IgniteCacheContinuousQueryImmutableEntryTest {
+ /** {@inheritDoc} */
+ @Override protected CacheAtomicityMode atomicityMode() {
+ return CacheAtomicityMode.TRANSACTIONAL_SNAPSHOT;
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/51a202a4/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccContinuousQueryMultiNodesFilteringTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccContinuousQueryMultiNodesFilteringTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccContinuousQueryMultiNodesFilteringTest.java
new file mode 100644
index 0000000..714e834
--- /dev/null
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccContinuousQueryMultiNodesFilteringTest.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ignite.internal.processors.cache.mvcc;
+
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.internal.processors.cache.query.continuous.GridCacheContinuousQueryMultiNodesFilteringTest;
+
+/**
+ *
+ */
+public class CacheMvccContinuousQueryMultiNodesFilteringTest extends GridCacheContinuousQueryMultiNodesFilteringTest {
+ /** {@inheritDoc} */
+ @Override protected CacheAtomicityMode atomicityMode() {
+ return CacheAtomicityMode.TRANSACTIONAL_SNAPSHOT;
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/51a202a4/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccContinuousQueryPartitionedSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccContinuousQueryPartitionedSelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccContinuousQueryPartitionedSelfTest.java
new file mode 100644
index 0000000..80b039d
--- /dev/null
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccContinuousQueryPartitionedSelfTest.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ignite.internal.processors.cache.mvcc;
+
+import org.apache.ignite.cache.CacheMode;
+
+/**
+ * Mvcc continuous query test for partitioned cache.
+ */
+public class CacheMvccContinuousQueryPartitionedSelfTest extends CacheMvccAbstractContinuousQuerySelfTest {
+ /** {@inheritDoc} */
+ @Override protected CacheMode cacheMode() {
+ return CacheMode.PARTITIONED;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/51a202a4/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccContinuousQueryPartitionedTxOneNodeTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccContinuousQueryPartitionedTxOneNodeTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccContinuousQueryPartitionedTxOneNodeTest.java
new file mode 100644
index 0000000..795932e
--- /dev/null
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccContinuousQueryPartitionedTxOneNodeTest.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ignite.internal.processors.cache.mvcc;
+
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.cache.CacheMode;
+import org.apache.ignite.internal.processors.cache.query.continuous.GridCacheContinuousQueryReplicatedTxOneNodeTest;
+
+/**
+ * Mvcc continuous query test for one node.
+ */
+public class CacheMvccContinuousQueryPartitionedTxOneNodeTest extends GridCacheContinuousQueryReplicatedTxOneNodeTest {
+ /** {@inheritDoc} */
+ @Override protected CacheAtomicityMode atomicMode() {
+ return CacheAtomicityMode.TRANSACTIONAL_SNAPSHOT;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected CacheMode cacheMode() {
+ return CacheMode.PARTITIONED;
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/51a202a4/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccContinuousQueryReplicatedSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccContinuousQueryReplicatedSelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccContinuousQueryReplicatedSelfTest.java
new file mode 100644
index 0000000..c9adbf9
--- /dev/null
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccContinuousQueryReplicatedSelfTest.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ignite.internal.processors.cache.mvcc;
+
+import org.apache.ignite.cache.CacheMode;
+
+/**
+ * Mvcc continuous query test for replicated cache.
+ */
+public class CacheMvccContinuousQueryReplicatedSelfTest extends CacheMvccAbstractContinuousQuerySelfTest {
+ /** {@inheritDoc} */
+ @Override protected CacheMode cacheMode() {
+ return CacheMode.REPLICATED;
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/51a202a4/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccContinuousQueryReplicatedTxOneNodeTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccContinuousQueryReplicatedTxOneNodeTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccContinuousQueryReplicatedTxOneNodeTest.java
new file mode 100644
index 0000000..d522ee9
--- /dev/null
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccContinuousQueryReplicatedTxOneNodeTest.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ignite.internal.processors.cache.mvcc;
+
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.cache.CacheMode;
+import org.apache.ignite.internal.processors.cache.query.continuous.GridCacheContinuousQueryReplicatedTxOneNodeTest;
+
+/**
+ * Mvcc continuous query test for one node.
+ */
+public class CacheMvccContinuousQueryReplicatedTxOneNodeTest extends GridCacheContinuousQueryReplicatedTxOneNodeTest {
+ /** {@inheritDoc} */
+ @Override protected CacheAtomicityMode atomicMode() {
+ return CacheAtomicityMode.TRANSACTIONAL_SNAPSHOT;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected CacheMode cacheMode() {
+ return CacheMode.REPLICATED;
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/ignite/blob/51a202a4/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccContinuousWithTransformerClientSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccContinuousWithTransformerClientSelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccContinuousWithTransformerClientSelfTest.java
new file mode 100644
index 0000000..a3ea0e8
--- /dev/null
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccContinuousWithTransformerClientSelfTest.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ignite.internal.processors.cache.mvcc;
+
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousWithTransformerClientSelfTest;
+
+/**
+ *
+ */
+public class CacheMvccContinuousWithTransformerClientSelfTest extends CacheContinuousWithTransformerClientSelfTest {
+ /** {@inheritDoc} */
+ @Override protected CacheAtomicityMode atomicityMode() {
+ return CacheAtomicityMode.TRANSACTIONAL_SNAPSHOT;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void testExpired() throws Exception {
+ fail("https://issues.apache.org/jira/browse/IGNITE-7311");
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/51a202a4/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccContinuousWithTransformerPartitionedSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccContinuousWithTransformerPartitionedSelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccContinuousWithTransformerPartitionedSelfTest.java
new file mode 100644
index 0000000..d029143
--- /dev/null
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccContinuousWithTransformerPartitionedSelfTest.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ignite.internal.processors.cache.mvcc;
+
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.cache.CacheMode;
+import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousWithTransformerReplicatedSelfTest;
+
+/**
+ *
+ */
+public class CacheMvccContinuousWithTransformerPartitionedSelfTest extends CacheContinuousWithTransformerReplicatedSelfTest {
+ /** {@inheritDoc} */
+ @Override protected CacheAtomicityMode atomicityMode() {
+ return CacheAtomicityMode.TRANSACTIONAL_SNAPSHOT;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected CacheMode cacheMode() {
+ return CacheMode.PARTITIONED;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void testExpired() throws Exception {
+ fail("https://issues.apache.org/jira/browse/IGNITE-7311");
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/ignite/blob/51a202a4/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccContinuousWithTransformerReplicatedSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccContinuousWithTransformerReplicatedSelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccContinuousWithTransformerReplicatedSelfTest.java
new file mode 100644
index 0000000..a294e17
--- /dev/null
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccContinuousWithTransformerReplicatedSelfTest.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ignite.internal.processors.cache.mvcc;
+
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousWithTransformerReplicatedSelfTest;
+
+/**
+ *
+ */
+public class CacheMvccContinuousWithTransformerReplicatedSelfTest
+ extends CacheContinuousWithTransformerReplicatedSelfTest {
+ /** {@inheritDoc} */
+ @Override protected CacheAtomicityMode atomicityMode() {
+ return CacheAtomicityMode.TRANSACTIONAL_SNAPSHOT;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void testExpired() throws Exception {
+ fail("https://issues.apache.org/jira/browse/IGNITE-7311");
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/51a202a4/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccSqlContinuousQueryPartitionedSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccSqlContinuousQueryPartitionedSelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccSqlContinuousQueryPartitionedSelfTest.java
new file mode 100644
index 0000000..cef553e
--- /dev/null
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccSqlContinuousQueryPartitionedSelfTest.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ignite.internal.processors.cache.mvcc;
+
+import org.apache.ignite.cache.CacheMode;
+
+/**
+ * Mvcc continuous query test for partitioned SQL cache.
+ */
+public class CacheMvccSqlContinuousQueryPartitionedSelfTest extends CacheMvccAbstractSqlContinuousQuerySelfTest {
+ /** {@inheritDoc} */
+ @Override protected CacheMode cacheMode() {
+ return CacheMode.PARTITIONED;
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/ignite/blob/51a202a4/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccSqlContinuousQueryReplicatedSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccSqlContinuousQueryReplicatedSelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccSqlContinuousQueryReplicatedSelfTest.java
new file mode 100644
index 0000000..948e6e1
--- /dev/null
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccSqlContinuousQueryReplicatedSelfTest.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ignite.internal.processors.cache.mvcc;
+
+import org.apache.ignite.cache.CacheMode;
+
+/**
+ * Mvcc continuous query test for replicated SQL cache.
+ */
+public class CacheMvccSqlContinuousQueryReplicatedSelfTest extends CacheMvccAbstractSqlContinuousQuerySelfTest {
+ /** {@inheritDoc} */
+ @Override protected CacheMode cacheMode() {
+ return CacheMode.REPLICATED;
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/51a202a4/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheMvccSqlTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheMvccSqlTestSuite.java b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheMvccSqlTestSuite.java
index b5cb3e0..ce2a130 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheMvccSqlTestSuite.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheMvccSqlTestSuite.java
@@ -20,7 +20,21 @@ package org.apache.ignite.testsuites;
import junit.framework.TestSuite;
import org.apache.ignite.internal.processors.cache.index.MvccEmptyTransactionSelfTest;
import org.apache.ignite.internal.processors.cache.index.SqlTransactionsCommandsWithMvccEnabledSelfTest;
+import org.apache.ignite.internal.processors.cache.mvcc.CacheMvccBasicContinuousQueryTest;
import org.apache.ignite.internal.processors.cache.mvcc.CacheMvccBulkLoadTest;
+import org.apache.ignite.internal.processors.cache.mvcc.CacheMvccClientReconnectContinuousQueryTest;
+import org.apache.ignite.internal.processors.cache.mvcc.CacheMvccContinuousQueryBackupQueueTest;
+import org.apache.ignite.internal.processors.cache.mvcc.CacheMvccContinuousQueryClientReconnectTest;
+import org.apache.ignite.internal.processors.cache.mvcc.CacheMvccContinuousQueryClientTest;
+import org.apache.ignite.internal.processors.cache.mvcc.CacheMvccContinuousQueryImmutableEntryTest;
+import org.apache.ignite.internal.processors.cache.mvcc.CacheMvccContinuousQueryMultiNodesFilteringTest;
+import org.apache.ignite.internal.processors.cache.mvcc.CacheMvccContinuousQueryPartitionedSelfTest;
+import org.apache.ignite.internal.processors.cache.mvcc.CacheMvccContinuousQueryPartitionedTxOneNodeTest;
+import org.apache.ignite.internal.processors.cache.mvcc.CacheMvccContinuousQueryReplicatedSelfTest;
+import org.apache.ignite.internal.processors.cache.mvcc.CacheMvccContinuousQueryReplicatedTxOneNodeTest;
+import org.apache.ignite.internal.processors.cache.mvcc.CacheMvccContinuousWithTransformerClientSelfTest;
+import org.apache.ignite.internal.processors.cache.mvcc.CacheMvccContinuousWithTransformerPartitionedSelfTest;
+import org.apache.ignite.internal.processors.cache.mvcc.CacheMvccContinuousWithTransformerReplicatedSelfTest;
import org.apache.ignite.internal.processors.cache.mvcc.CacheMvccDmlSimpleTest;
import org.apache.ignite.internal.processors.cache.mvcc.CacheMvccIteratorWithConcurrentJdbcTransactionTest;
import org.apache.ignite.internal.processors.cache.mvcc.CacheMvccLocalEntriesWithConcurrentJdbcTransactionTest;
@@ -40,8 +54,10 @@ import org.apache.ignite.internal.processors.cache.mvcc.CacheMvccScanQueryWithCo
import org.apache.ignite.internal.processors.cache.mvcc.CacheMvccSizeTest;
import org.apache.ignite.internal.processors.cache.mvcc.CacheMvccSizeWithConcurrentJdbcTransactionTest;
import org.apache.ignite.internal.processors.cache.mvcc.CacheMvccSqlConfigurationValidationTest;
-import org.apache.ignite.internal.processors.cache.mvcc.CacheMvccSqlUpdateCountersTest;
+import org.apache.ignite.internal.processors.cache.mvcc.CacheMvccSqlContinuousQueryPartitionedSelfTest;
+import org.apache.ignite.internal.processors.cache.mvcc.CacheMvccSqlContinuousQueryReplicatedSelfTest;
import org.apache.ignite.internal.processors.cache.mvcc.CacheMvccSqlLockTimeoutTest;
+import org.apache.ignite.internal.processors.cache.mvcc.CacheMvccSqlUpdateCountersTest;
import org.apache.ignite.internal.processors.cache.mvcc.CacheMvccStreamingInsertTest;
import org.apache.ignite.internal.processors.cache.mvcc.CacheMvccTxNodeMappingTest;
import org.apache.ignite.internal.processors.cache.mvcc.MvccRepeatableReadBulkOpsTest;
@@ -102,6 +118,28 @@ public class IgniteCacheMvccSqlTestSuite extends TestSuite {
suite.addTestSuite(CacheMvccPartitionedSqlCoordinatorFailoverTest.class);
suite.addTestSuite(CacheMvccReplicatedSqlCoordinatorFailoverTest.class);
+ // Continuous queries.
+ suite.addTestSuite(CacheMvccBasicContinuousQueryTest.class);
+ suite.addTestSuite(CacheMvccContinuousQueryPartitionedSelfTest.class);
+ suite.addTestSuite(CacheMvccContinuousQueryReplicatedSelfTest.class);
+ suite.addTestSuite(CacheMvccSqlContinuousQueryPartitionedSelfTest.class);
+ suite.addTestSuite(CacheMvccSqlContinuousQueryReplicatedSelfTest.class);
+
+ suite.addTestSuite(CacheMvccContinuousQueryPartitionedTxOneNodeTest.class);
+ suite.addTestSuite(CacheMvccContinuousQueryReplicatedTxOneNodeTest.class);
+
+ suite.addTestSuite(CacheMvccContinuousQueryClientReconnectTest.class);
+ suite.addTestSuite(CacheMvccContinuousQueryClientTest.class);
+
+ suite.addTestSuite(CacheMvccContinuousQueryMultiNodesFilteringTest.class);
+ suite.addTestSuite(CacheMvccContinuousQueryBackupQueueTest.class);
+ suite.addTestSuite(CacheMvccContinuousQueryImmutableEntryTest.class);
+ suite.addTestSuite(CacheMvccClientReconnectContinuousQueryTest.class);
+
+ suite.addTestSuite(CacheMvccContinuousWithTransformerClientSelfTest.class);
+ suite.addTestSuite(CacheMvccContinuousWithTransformerPartitionedSelfTest.class);
+ suite.addTestSuite(CacheMvccContinuousWithTransformerReplicatedSelfTest.class);
+
return suite;
}
}