You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by nt...@apache.org on 2016/03/16 17:12:07 UTC
[1/2] ignite git commit: IGNITE-2791 Updated test.
Repository: ignite
Updated Branches:
refs/heads/ignite-2791 ecb2d58e0 -> b794817a7
IGNITE-2791 Updated test.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/f97e709b
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/f97e709b
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/f97e709b
Branch: refs/heads/ignite-2791
Commit: f97e709bfb3343a57e448fb8a25d2dbe0a625d7e
Parents: ecb2d58
Author: nikolay_tikhonov <nt...@gridgain.com>
Authored: Wed Mar 16 17:37:13 2016 +0300
Committer: nikolay_tikhonov <nt...@gridgain.com>
Committed: Wed Mar 16 17:37:13 2016 +0300
----------------------------------------------------------------------
.../GridCacheContinuousQueryConcurrentTest.java | 72 ++++++++++++++++----
1 file changed, 57 insertions(+), 15 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/f97e709b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryConcurrentTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryConcurrentTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryConcurrentTest.java
index bab2bad..8803e8e 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryConcurrentTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryConcurrentTest.java
@@ -33,7 +33,6 @@ import javax.cache.event.CacheEntryCreatedListener;
import javax.cache.event.CacheEntryEvent;
import javax.cache.event.CacheEntryEventFilter;
import org.apache.ignite.IgniteCache;
-import org.apache.ignite.cache.CacheAtomicWriteOrderMode;
import org.apache.ignite.cache.CacheAtomicityMode;
import org.apache.ignite.cache.CacheMode;
import org.apache.ignite.cache.CacheWriteSynchronizationMode;
@@ -49,7 +48,6 @@ import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
-import static java.lang.Thread.currentThread;
import static java.util.concurrent.Executors.newSingleThreadExecutor;
import static javax.cache.configuration.FactoryBuilder.factoryOf;
@@ -63,9 +61,6 @@ public class GridCacheContinuousQueryConcurrentTest extends GridCommonAbstractTe
/** */
private static final int NODES = 2;
- /** */
- public static final ExecutorService EXECUTOR_SERVICE = newSingleThreadExecutor();
-
/** {@inheritDoc} */
@Override protected void beforeTestsStarted() throws Exception {
super.beforeTestsStarted();
@@ -92,9 +87,53 @@ public class GridCacheContinuousQueryConcurrentTest extends GridCommonAbstractTe
/**
* @throws Exception If failed.
*/
- public void testRegistration() throws Exception {
+ public void testReplicatedTx() throws Exception {
+ testRegistration(cacheConfiguration(CacheMode.REPLICATED, CacheAtomicityMode.TRANSACTIONAL, 1));
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testReplicatedAtomic() throws Exception {
+ testRegistration(cacheConfiguration(CacheMode.REPLICATED, CacheAtomicityMode.ATOMIC, 1));
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testPartitionTx() throws Exception {
+ testRegistration(cacheConfiguration(CacheMode.PARTITIONED, CacheAtomicityMode.TRANSACTIONAL, 1));
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testPartitionAtomic() throws Exception {
+ testRegistration(cacheConfiguration(CacheMode.PARTITIONED, CacheAtomicityMode.ATOMIC, 2));
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testPartitionTxWithoutBackup() throws Exception {
+ testRegistration(cacheConfiguration(CacheMode.PARTITIONED, CacheAtomicityMode.TRANSACTIONAL, 0));
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testPartitionAtomicWithoutBackup() throws Exception {
+ testRegistration(cacheConfiguration(CacheMode.PARTITIONED, CacheAtomicityMode.ATOMIC, 0));
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testRegistration(CacheConfiguration ccfg) throws Exception {
+ ExecutorService execSrv = newSingleThreadExecutor();
+
try {
- final IgniteCache<Integer, String> cache = grid(0).getOrCreateCache(cacheConfiguration());
+ final IgniteCache<Integer, String> cache = grid(0).getOrCreateCache(ccfg);
for (int i = 0; i < 100; i++) {
log.info("Start iteration: " + i);
@@ -104,7 +143,7 @@ public class GridCacheContinuousQueryConcurrentTest extends GridCommonAbstractTe
final CountDownLatch latch = new CountDownLatch(1);
final int conQryCnt = 50;
- Future<List<IgniteFuture<String>>> fut = EXECUTOR_SERVICE.submit(
+ Future<List<IgniteFuture<String>>> fut = execSrv.submit(
new Callable<List<IgniteFuture<String>>>() {
@Override public List<IgniteFuture<String>> call() throws Exception {
int count = 0;
@@ -137,7 +176,7 @@ public class GridCacheContinuousQueryConcurrentTest extends GridCommonAbstractTe
}
}
finally {
- EXECUTOR_SERVICE.shutdownNow();
+ execSrv.shutdownNow();
}
}
@@ -210,16 +249,19 @@ public class GridCacheContinuousQueryConcurrentTest extends GridCommonAbstractTe
/**
+ * @param cacheMode Cache mode.
+ * @param atomicMode Atomicy mode.
+ * @param backups Backups.
* @return Cache configuration.
*/
- private CacheConfiguration<Integer, String> cacheConfiguration() {
+ private CacheConfiguration<Integer, String> cacheConfiguration(CacheMode cacheMode,
+ CacheAtomicityMode atomicMode, int backups) {
CacheConfiguration<Integer, String> cfg = new CacheConfiguration<>("test");
- cfg.setCacheMode(CacheMode.REPLICATED);
- cfg.setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL);
+
+ cfg.setCacheMode(cacheMode);
+ cfg.setAtomicityMode(atomicMode);
cfg.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC);
- cfg.setAtomicWriteOrderMode(CacheAtomicWriteOrderMode.PRIMARY);
- cfg.setReadFromBackup(false);
- cfg.setStartSize(1024);
+ cfg.setBackups(backups);
return cfg;
}
[2/2] ignite git commit: IGNITE-2791 Updated test.
Posted by nt...@apache.org.
IGNITE-2791 Updated test.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/b794817a
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/b794817a
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/b794817a
Branch: refs/heads/ignite-2791
Commit: b794817a7372853d3bca2307e6511006cd4d092e
Parents: f97e709
Author: nikolay_tikhonov <nt...@gridgain.com>
Authored: Wed Mar 16 19:11:52 2016 +0300
Committer: nikolay_tikhonov <nt...@gridgain.com>
Committed: Wed Mar 16 19:11:52 2016 +0300
----------------------------------------------------------------------
.../distributed/dht/GridClientPartitionTopology.java | 2 +-
.../distributed/dht/GridDhtPartitionTopology.java | 2 +-
.../dht/GridDhtPartitionTopologyImpl.java | 15 ++++++++-------
.../preloader/GridDhtPartitionsExchangeFuture.java | 8 ++++----
.../continuous/CacheContinuousQueryHandler.java | 2 +-
.../continuous/GridContinuousProcessor.java | 6 ++++--
...CacheContinuousQueryFailoverAbstractSelfTest.java | 2 +-
7 files changed, 20 insertions(+), 17 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/b794817a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
index ad4943e..ce9ff64 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
@@ -874,7 +874,7 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology {
}
/** {@inheritDoc} */
- @Override public Map<Integer, Long> updateCounters() {
+ @Override public Map<Integer, Long> updateCounters(AffinityTopologyVersion topVer) {
lock.readLock().lock();
try {
http://git-wip-us.apache.org/repos/asf/ignite/blob/b794817a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java
index 84889f8..fe3318c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java
@@ -221,7 +221,7 @@ public interface GridDhtPartitionTopology {
/**
* @return Partition update counters.
*/
- public Map<Integer, Long> updateCounters();
+ public Map<Integer, Long> updateCounters(@Nullable AffinityTopologyVersion topVer);
/**
* @param part Partition to own.
http://git-wip-us.apache.org/repos/asf/ignite/blob/b794817a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
index 0f89997..48b8d25 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
@@ -1316,20 +1316,21 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
}
/** {@inheritDoc} */
- @Override public Map<Integer, Long> updateCounters() {
+ @Override public Map<Integer, Long> updateCounters(AffinityTopologyVersion topVer0) {
lock.readLock().lock();
try {
Map<Integer, Long> res = new HashMap<>(cntrMap);
for (GridDhtLocalPartition part : locParts.values()) {
- if (part.primary(topVer)) {
- Long cntr0 = res.get(part.id());
- Long cntr1 = part.updateCounter();
+ if (topVer0 != null && !part.primary(topVer0))
+ continue;
- if (cntr0 == null || cntr1 > cntr0)
- res.put(part.id(), cntr1);
- }
+ Long cntr0 = res.get(part.id());
+ Long cntr1 = part.updateCounter();
+
+ if (cntr0 == null || cntr1 > cntr0)
+ res.put(part.id(), cntr1);
}
return res;
http://git-wip-us.apache.org/repos/asf/ignite/blob/b794817a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
index 68a05e4..a0e03eb 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
@@ -631,7 +631,7 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
if (top.cacheId() == cacheCtx.cacheId()) {
cacheCtx.topology().update(exchId,
top.partitionMap(true),
- top.updateCounters());
+ top.updateCounters(null));
break;
}
@@ -995,7 +995,7 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
m.addLocalPartitionMap(cacheCtx.cacheId(), locMap);
- m.partitionUpdateCounters(cacheCtx.cacheId(), cacheCtx.topology().updateCounters());
+ m.partitionUpdateCounters(cacheCtx.cacheId(), cacheCtx.topology().updateCounters(null));
}
}
@@ -1037,7 +1037,7 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
m.addFullPartitionsMap(cacheCtx.cacheId(), locMap);
- m.addPartitionUpdateCounters(cacheCtx.cacheId(), cacheCtx.topology().updateCounters());
+ m.addPartitionUpdateCounters(cacheCtx.cacheId(), cacheCtx.topology().updateCounters(null));
}
}
}
@@ -1046,7 +1046,7 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
for (GridClientPartitionTopology top : cctx.exchange().clientTopologies()) {
m.addFullPartitionsMap(top.cacheId(), top.partitionMap(true));
- m.addPartitionUpdateCounters(top.cacheId(), top.updateCounters());
+ m.addPartitionUpdateCounters(top.cacheId(), top.updateCounters(null));
}
if (log.isDebugEnabled())
http://git-wip-us.apache.org/repos/asf/ignite/blob/b794817a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
index 10fbd89..3498470 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
@@ -299,7 +299,7 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
final GridCacheContext<K, V> cctx = cacheContext(ctx);
if (!internal && cctx != null && initUpdCntrs != null) {
- Map<Integer, Long> map = cctx.topology().updateCounters();
+ Map<Integer, Long> map = cctx.topology().updateCounters(cctx.affinity().affinityTopologyVersion());
for (Map.Entry<Integer, Long> e : map.entrySet()) {
Long cntr0 = initUpdCntrs.get(e.getKey());
http://git-wip-us.apache.org/repos/asf/ignite/blob/b794817a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
index 1776748..0eb22d2 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
@@ -227,7 +227,8 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
if (interCache != null && cntrs != null && interCache.context() != null
&& !interCache.isLocal() && !CU.clientNode(ctx.grid().localNode())) {
- Map<Integer, Long> map = interCache.context().topology().updateCounters();
+ Map<Integer, Long> map = interCache.context().topology()
+ .updateCounters(interCache.context().affinity().affinityTopologyVersion());
for (Map.Entry<Integer, Long> e : map.entrySet()) {
Long cntr0 = cntrs.get(e.getKey());
@@ -924,7 +925,8 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
GridCacheAdapter cache = ctx.cache().internalCache(hnd0.cacheName());
if (cache != null && !cache.isLocal()) {
- Map<Integer, Long> cntrs = cache.context().topology().updateCounters();
+ Map<Integer, Long> cntrs = cache.context().topology().updateCounters(
+ cache.context().affinity().affinityTopologyVersion());
req.addUpdateCounters(cntrs);
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/b794817a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractSelfTest.java
index f104f21..d8f7257 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractSelfTest.java
@@ -416,7 +416,7 @@ public abstract class CacheContinuousQueryFailoverAbstractSelfTest extends GridC
Affinity<Object> aff = grid(i).affinity(null);
- Map<Integer, Long> act = grid(i).cachex(null).context().topology().updateCounters();
+ Map<Integer, Long> act = grid(i).cachex(null).context().topology().updateCounters(null);
for (Map.Entry<Integer, Long> e : updCntrs.entrySet()) {
if (aff.mapPartitionToPrimaryAndBackups(e.getKey()).contains(grid(i).localNode()))