You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by nt...@apache.org on 2016/03/24 15:40:57 UTC
[1/3] ignite git commit: IGNITE-GG-11048 WIP
Repository: ignite
Updated Branches:
refs/heads/ignite-11048 [created] 041f7c2b3
IGNITE-GG-11048 WIP
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/27fc9d10
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/27fc9d10
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/27fc9d10
Branch: refs/heads/ignite-11048
Commit: 27fc9d10172f1803049623a9151b8b34aa84450a
Parents: fc9730a
Author: nikolay_tikhonov <nt...@gridgain.com>
Authored: Wed Mar 23 19:30:46 2016 +0300
Committer: nikolay_tikhonov <nt...@gridgain.com>
Committed: Wed Mar 23 19:30:46 2016 +0300
----------------------------------------------------------------------
.../processors/cache/GridCacheMapEntry.java | 18 +++---
...acheContinuousQueryRandomOperationsTest.java | 63 +++++++++++++++++++-
.../config/GridTestProperties.java | 4 +-
3 files changed, 71 insertions(+), 14 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/27fc9d10/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
index fb6aeef..323cbd6 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
@@ -2355,11 +2355,6 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
update(updated, newExpireTime, newTtl, newVer, true);
- updateCntr0 = nextPartCounter(topVer);
-
- if (updateCntr != null)
- updateCntr0 = updateCntr;
-
drReplicate(drType, updated, newVer, topVer);
recordNodeId(affNodeId, topVer);
@@ -2453,11 +2448,6 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
recordNodeId(affNodeId, topVer);
- updateCntr0 = nextPartCounter(topVer);
-
- if (updateCntr != null)
- updateCntr0 = updateCntr;
-
drReplicate(drType, null, newVer, topVer);
if (evt) {
@@ -2485,9 +2475,15 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
res = hadVal;
}
- if (res)
+ if (res) {
updateMetrics(op, metrics);
+ updateCntr0 = nextPartCounter(topVer);
+
+ if (updateCntr != null)
+ updateCntr0 = updateCntr;
+ }
+
cctx.dataStructures().onEntryUpdated(key, op == GridCacheOperation.DELETE, keepBinary);
if (intercept) {
http://git-wip-us.apache.org/repos/asf/ignite/blob/27fc9d10/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryRandomOperationsTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryRandomOperationsTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryRandomOperationsTest.java
index cdf4ffd..3d8d455 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryRandomOperationsTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryRandomOperationsTest.java
@@ -31,6 +31,8 @@ import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.atomic.AtomicInteger;
import javax.cache.Cache;
import javax.cache.configuration.Factory;
import javax.cache.event.CacheEntryEvent;
@@ -56,6 +58,7 @@ import org.apache.ignite.cache.store.CacheStore;
import org.apache.ignite.cache.store.CacheStoreAdapter;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.util.typedef.PA;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
@@ -77,6 +80,7 @@ import static org.apache.ignite.cache.CacheMemoryMode.ONHEAP_TIERED;
import static org.apache.ignite.cache.CacheMode.PARTITIONED;
import static org.apache.ignite.cache.CacheMode.REPLICATED;
import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
+import static org.apache.ignite.cache.CacheWriteSynchronizationMode.PRIMARY_SYNC;
import static org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryRandomOperationsTest.ContinuousDeploy.ALL;
import static org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryRandomOperationsTest.ContinuousDeploy.CLIENT;
import static org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryRandomOperationsTest.ContinuousDeploy.SERVER;
@@ -92,7 +96,7 @@ public class CacheContinuousQueryRandomOperationsTest extends GridCommonAbstract
private static TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true);
/** */
- private static final int NODES = 5;
+ private static final int NODES = 2;
/** */
private static final int KEYS = 50;
@@ -414,6 +418,63 @@ public class CacheContinuousQueryRandomOperationsTest extends GridCommonAbstract
testContinuousQuery(ccfg, SERVER);
}
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testDoubleRemove() throws Exception {
+ CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED,
+ 0,
+ ATOMIC,
+ ONHEAP_TIERED,
+ false);
+
+ try {
+ IgniteCache<Object, Object> cache = grid(0).getOrCreateCache(ccfg);
+
+ final AtomicInteger cnrt = new AtomicInteger(0);
+
+ ContinuousQuery qry = new ContinuousQuery();
+
+ final List<Object> evts = new CopyOnWriteArrayList<>();
+
+ qry.setLocalListener(new CacheEntryUpdatedListener() {
+ @Override public void onUpdated(Iterable iterable) throws CacheEntryListenerException {
+ for (Object o : iterable) {
+ evts.add(o);
+
+ cnrt.incrementAndGet();
+ }
+ }
+ });
+
+ try (QueryCursor qryCur = cache.query(qry)) {
+ cache.put(1, 1);
+
+ cache.remove(1);
+ cache.remove(1);
+ cache.remove(1);
+ cache.remove(1);
+
+ cache.put(1, 1);
+
+ cache.remove(1);
+ cache.remove(1);
+
+ cache.put(1, 1);
+
+ assert GridTestUtils.waitForCondition(new PA() {
+ @Override public boolean apply() {
+ return cnrt.get() == 5;
+ }
+ }, 5_000);
+ }
+ }
+ finally {
+ grid(0).destroyCache(ccfg.getName());
+ }
+ }
+
/**
* @throws Exception If failed.
*/
http://git-wip-us.apache.org/repos/asf/ignite/blob/27fc9d10/modules/core/src/test/java/org/apache/ignite/testframework/config/GridTestProperties.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/config/GridTestProperties.java b/modules/core/src/test/java/org/apache/ignite/testframework/config/GridTestProperties.java
index d13f117..7bf26b1 100644
--- a/modules/core/src/test/java/org/apache/ignite/testframework/config/GridTestProperties.java
+++ b/modules/core/src/test/java/org/apache/ignite/testframework/config/GridTestProperties.java
@@ -58,7 +58,7 @@ public final class GridTestProperties {
public static final String TESTS_PROP_FILE = "tests.properties";
/** */
- public static final String TESTS_CFG_PATH = "modules/core/src/test/config";
+ public static final String TESTS_CFG_PATH = "D:\\projects\\ggprivate\\modules\\core\\src\\test\\config\\";
/** */
private static final Pattern PROP_REGEX = Pattern.compile("[@$]\\{[^@${}]+\\}");
@@ -94,7 +94,7 @@ public final class GridTestProperties {
}
// Load default properties.
- File cfgFile = getTestConfigurationFile(null, TESTS_PROP_FILE);
+ File cfgFile = new File("D:\\projects\\ggprivate\\modules\\core\\src\\test\\config\\tests.properties");
assert cfgFile.exists();
assert !cfgFile.isDirectory();
[3/3] ignite git commit: Merge branch 'master' into ignite-11048
Posted by nt...@apache.org.
Merge branch 'master' into ignite-11048
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/041f7c2b
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/041f7c2b
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/041f7c2b
Branch: refs/heads/ignite-11048
Commit: 041f7c2b3308b48ae58b62d8a3c0b11135d66b62
Parents: 060f4ce 34a9b66
Author: nikolay_tikhonov <nt...@gridgain.com>
Authored: Thu Mar 24 16:41:28 2016 +0300
Committer: nikolay_tikhonov <nt...@gridgain.com>
Committed: Thu Mar 24 16:41:28 2016 +0300
----------------------------------------------------------------------
.../store/CacheLoadOnlyStoreExample.java | 171 +++++++++++
examples/src/main/resources/person.csv | 20 ++
.../ignite/examples/CacheExamplesSelfTest.java | 8 +
.../igfs/IgfsIpcEndpointConfiguration.java | 28 ++
.../ignite/internal/binary/BinaryContext.java | 2 +
.../managers/communication/GridIoManager.java | 11 +
.../managers/communication/GridIoPolicy.java | 3 +
.../continuous/CacheContinuousQueryManager.java | 13 +-
.../continuous/GridContinuousProcessor.java | 2 +-
.../internal/processors/igfs/IgfsBlockKey.java | 30 +-
.../processors/igfs/IgfsDataManager.java | 29 +-
.../processors/igfs/IgfsDeleteWorker.java | 2 +-
.../processors/igfs/IgfsDirectoryInfo.java | 33 +-
.../internal/processors/igfs/IgfsEntryInfo.java | 8 +-
.../igfs/IgfsFragmentizerManager.java | 4 +-
.../processors/igfs/IgfsIpcHandler.java | 81 +++--
.../internal/processors/igfs/IgfsProcessor.java | 11 +-
.../internal/processors/igfs/IgfsServer.java | 2 +-
.../internal/processors/igfs/IgfsUtils.java | 127 ++++++++
.../meta/IgfsMetaDirectoryCreateProcessor.java | 24 +-
.../IgfsMetaDirectoryListingAddProcessor.java | 6 +-
...gfsMetaDirectoryListingReplaceProcessor.java | 4 +-
.../igfs/meta/IgfsMetaFileCreateProcessor.java | 16 +-
.../meta/IgfsMetaUpdatePropertiesProcessor.java | 5 +-
.../cache/IgniteCacheAbstractTest.java | 6 +
.../IgniteCacheEntryListenerAbstractTest.java | 16 +-
.../distributed/IgniteCacheManyClientsTest.java | 6 +
...ContinuousQueryFailoverAbstractSelfTest.java | 8 +-
.../processors/igfs/IgfsAbstractSelfTest.java | 11 +
...lockMessageSystemPoolStarvationSelfTest.java | 299 +++++++++++++++++++
.../IgfsPrimaryOptimziedMarshallerSelfTest.java | 28 ++
.../igfs/IgfsProcessorValidationSelfTest.java | 16 +
.../ignite/testsuites/IgniteIgfsTestSuite.java | 5 +
33 files changed, 950 insertions(+), 85 deletions(-)
----------------------------------------------------------------------
[2/3] ignite git commit: IGNITE-11048 WIP
Posted by nt...@apache.org.
IGNITE-11048 WIP
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/060f4ce3
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/060f4ce3
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/060f4ce3
Branch: refs/heads/ignite-11048
Commit: 060f4ce353a47f968ad514a74aa71363235dd145
Parents: 27fc9d1
Author: nikolay_tikhonov <nt...@gridgain.com>
Authored: Thu Mar 24 16:26:01 2016 +0300
Committer: nikolay_tikhonov <nt...@gridgain.com>
Committed: Thu Mar 24 16:26:01 2016 +0300
----------------------------------------------------------------------
.../processors/cache/GridCacheMapEntry.java | 20 +-
.../dht/atomic/GridDhtAtomicCache.java | 4 +-
...ContinuousQueryFailoverAbstractSelfTest.java | 5 -
...acheContinuousQueryRandomOperationsTest.java | 196 +++++++++++++++----
...inuousQueryRandomOperationsTwoNodesTest.java | 28 +++
5 files changed, 202 insertions(+), 51 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/060f4ce3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
index 323cbd6..c5df29b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
@@ -2092,7 +2092,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
null,
null,
false,
- updateCntr0 == null ? 0 : updateCntr0);
+ 0);
}
}
else
@@ -2355,6 +2355,11 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
update(updated, newExpireTime, newTtl, newVer, true);
+ updateCntr0 = nextPartCounter(topVer);
+
+ if (updateCntr != null)
+ updateCntr0 = updateCntr;
+
drReplicate(drType, updated, newVer, topVer);
recordNodeId(affNodeId, topVer);
@@ -2448,6 +2453,11 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
recordNodeId(affNodeId, topVer);
+ updateCntr0 = nextPartCounter(topVer);
+
+ if (updateCntr != null)
+ updateCntr0 = updateCntr;
+
drReplicate(drType, null, newVer, topVer);
if (evt) {
@@ -2475,15 +2485,9 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
res = hadVal;
}
- if (res) {
+ if (res)
updateMetrics(op, metrics);
- updateCntr0 = nextPartCounter(topVer);
-
- if (updateCntr != null)
- updateCntr0 = updateCntr;
- }
-
cctx.dataStructures().onEntryUpdated(key, op == GridCacheOperation.DELETE, keepBinary);
if (intercept) {
http://git-wip-us.apache.org/repos/asf/ignite/blob/060f4ce3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
index e908c05..4938794 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
@@ -2115,7 +2115,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
"[entry=" + entry + ", filter=" + Arrays.toString(req.filter()) + ']');
}
}
- else if (lsnrs != null && updRes.success()) {
+ else if (lsnrs != null && updRes.updateCounter() != 0) {
ctx.continuousQueries().onEntryUpdated(
lsnrs,
entry.key(),
@@ -2868,7 +2868,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
if (updRes.removeVersion() != null)
ctx.onDeferredDelete(entry, updRes.removeVersion());
- if (lsnrs != null && updRes.success()) {
+ if (lsnrs != null && updRes.updateCounter() != 0) {
ctx.continuousQueries().onEntryUpdated(
lsnrs,
entry.key(),
http://git-wip-us.apache.org/repos/asf/ignite/blob/060f4ce3/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractSelfTest.java
index f104f21..0596cdb 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractSelfTest.java
@@ -809,11 +809,6 @@ public abstract class CacheContinuousQueryFailoverAbstractSelfTest extends GridC
checkBackupQueue(3, false);
}
- /** {@inheritDoc} */
- @Override public boolean isDebug() {
- return true;
- }
-
/**
* @param backups Number of backups.
* @param updateFromClient If {@code true} executes cache update from client node.
http://git-wip-us.apache.org/repos/asf/ignite/blob/060f4ce3/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryRandomOperationsTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryRandomOperationsTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryRandomOperationsTest.java
index 3d8d455..509d99d 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryRandomOperationsTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryRandomOperationsTest.java
@@ -32,13 +32,13 @@ import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CopyOnWriteArrayList;
-import java.util.concurrent.atomic.AtomicInteger;
import javax.cache.Cache;
import javax.cache.configuration.Factory;
import javax.cache.event.CacheEntryEvent;
import javax.cache.event.CacheEntryEventFilter;
import javax.cache.event.CacheEntryListenerException;
import javax.cache.event.CacheEntryUpdatedListener;
+import javax.cache.event.EventType;
import javax.cache.integration.CacheLoaderException;
import javax.cache.integration.CacheWriterException;
import javax.cache.processor.EntryProcessor;
@@ -58,6 +58,7 @@ import org.apache.ignite.cache.store.CacheStore;
import org.apache.ignite.cache.store.CacheStoreAdapter;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.util.tostring.GridToStringInclude;
import org.apache.ignite.internal.util.typedef.PA;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
@@ -80,7 +81,6 @@ import static org.apache.ignite.cache.CacheMemoryMode.ONHEAP_TIERED;
import static org.apache.ignite.cache.CacheMode.PARTITIONED;
import static org.apache.ignite.cache.CacheMode.REPLICATED;
import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
-import static org.apache.ignite.cache.CacheWriteSynchronizationMode.PRIMARY_SYNC;
import static org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryRandomOperationsTest.ContinuousDeploy.ALL;
import static org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryRandomOperationsTest.ContinuousDeploy.CLIENT;
import static org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryRandomOperationsTest.ContinuousDeploy.SERVER;
@@ -96,7 +96,7 @@ public class CacheContinuousQueryRandomOperationsTest extends GridCommonAbstract
private static TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true);
/** */
- private static final int NODES = 2;
+ private static final int NODES = 5;
/** */
private static final int KEYS = 50;
@@ -125,11 +125,11 @@ public class CacheContinuousQueryRandomOperationsTest extends GridCommonAbstract
@Override protected void beforeTestsStarted() throws Exception {
super.beforeTestsStarted();
- startGridsMultiThreaded(NODES - 1);
+ startGridsMultiThreaded(getClientIndex());
client = true;
- startGrid(NODES - 1);
+ startGrid(getClientIndex());
}
/** {@inheritDoc} */
@@ -418,64 +418,172 @@ public class CacheContinuousQueryRandomOperationsTest extends GridCommonAbstract
testContinuousQuery(ccfg, SERVER);
}
+ /**
+ * @throws Exception If failed.
+ */
+ public void testDoubleRemoveAtomicWithoutBackup() throws Exception {
+ CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED,
+ 0,
+ ATOMIC,
+ ONHEAP_TIERED,
+ false);
+
+ testDoubleRemove(ccfg);
+ }
/**
* @throws Exception If failed.
*/
- public void testDoubleRemove() throws Exception {
+ public void testDoubleRemoveAtomic() throws Exception {
+ CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED,
+ 1,
+ ATOMIC,
+ ONHEAP_TIERED,
+ false);
+
+ testDoubleRemove(ccfg);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testDoubleRemoveAtomicOffheap() throws Exception {
CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED,
0,
ATOMIC,
+ OFFHEAP_TIERED,
+ false);
+
+ testDoubleRemove(ccfg);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testDoubleRemoveTx() throws Exception {
+ CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED,
+ 1,
+ TRANSACTIONAL,
ONHEAP_TIERED,
false);
- try {
- IgniteCache<Object, Object> cache = grid(0).getOrCreateCache(ccfg);
+ testDoubleRemove(ccfg);
+ }
- final AtomicInteger cnrt = new AtomicInteger(0);
+ /**
+ * @throws Exception If failed.
+ */
+ public void testDoubleRemoveReplicatedTx() throws Exception {
+ CacheConfiguration<Object, Object> ccfg = cacheConfiguration(REPLICATED,
+ 0,
+ TRANSACTIONAL,
+ ONHEAP_TIERED,
+ false);
- ContinuousQuery qry = new ContinuousQuery();
+ testDoubleRemove(ccfg);
+ }
- final List<Object> evts = new CopyOnWriteArrayList<>();
+ /**
+ * @throws Exception If failed.
+ */
+ public void testDoubleRemoveReplicatedAtomic() throws Exception {
+ CacheConfiguration<Object, Object> ccfg = cacheConfiguration(REPLICATED,
+ 0,
+ ATOMIC,
+ ONHEAP_TIERED,
+ false);
- qry.setLocalListener(new CacheEntryUpdatedListener() {
- @Override public void onUpdated(Iterable iterable) throws CacheEntryListenerException {
- for (Object o : iterable) {
- evts.add(o);
+ testDoubleRemove(ccfg);
+ }
- cnrt.incrementAndGet();
- }
+ /**
+ * @throws Exception If failed.
+ */
+ public void testDoubleRemove(CacheConfiguration ccfg) throws Exception {
+ IgniteCache<QueryTestKey, QueryTestValue> cache = grid(getClientIndex()).createCache(ccfg);
+
+ try {
+ ContinuousQuery<QueryTestKey, QueryTestValue> qry = new ContinuousQuery<>();
+
+ final List<CacheEntryEvent<? extends QueryTestKey, ? extends QueryTestValue>> evts =
+ new CopyOnWriteArrayList<>();
+
+ qry.setLocalListener(new CacheEntryUpdatedListener<QueryTestKey, QueryTestValue>() {
+ @Override public void onUpdated(Iterable<CacheEntryEvent<? extends QueryTestKey,
+ ? extends QueryTestValue>> events) throws CacheEntryListenerException {
+ for (CacheEntryEvent<? extends QueryTestKey, ? extends QueryTestValue> e : events)
+ evts.add(e);
}
});
+ QueryTestKey key = new QueryTestKey(1);
+
try (QueryCursor qryCur = cache.query(qry)) {
- cache.put(1, 1);
+ for (int i = 0; i < ITERATION_CNT; i++) {
+ log.info("Start iteration: " + i);
- cache.remove(1);
- cache.remove(1);
- cache.remove(1);
- cache.remove(1);
+ cache.put(key, new QueryTestValue(1));
- cache.put(1, 1);
+ cache.remove(key);
+ cache.invoke(key, (EntryProcessor<QueryTestKey, QueryTestValue, ? extends Object>)
+ (Object)new EntrySetValueProcessor(null, false));
+ cache.invoke(key, (EntryProcessor<QueryTestKey, QueryTestValue, ? extends Object>)
+ (Object)new EntrySetValueProcessor(null, false));
+ cache.remove(key);
- cache.remove(1);
- cache.remove(1);
+ cache.put(key, new QueryTestValue(2));
- cache.put(1, 1);
+ cache.invoke(key, (EntryProcessor<QueryTestKey, QueryTestValue, ? extends Object>)
+ (Object)new EntrySetValueProcessor(null, false));
+ cache.remove(key);
- assert GridTestUtils.waitForCondition(new PA() {
- @Override public boolean apply() {
- return cnrt.get() == 5;
- }
- }, 5_000);
+ cache.put(key, new QueryTestValue(3));
+ cache.put(key, new QueryTestValue(4));
+
+ assert GridTestUtils.waitForCondition(new PA() {
+ @Override public boolean apply() {
+ return evts.size() == 6;
+ }
+ }, 5_000);
+
+ checkSingleEvent(evts.get(0), EventType.CREATED, new QueryTestValue(1), null);
+ checkSingleEvent(evts.get(1), EventType.REMOVED, null, new QueryTestValue(1));
+ checkSingleEvent(evts.get(2), EventType.CREATED, new QueryTestValue(2), null);
+ checkSingleEvent(evts.get(3), EventType.REMOVED, null, new QueryTestValue(2));
+ checkSingleEvent(evts.get(4), EventType.CREATED, new QueryTestValue(3), null);
+ checkSingleEvent(evts.get(5), EventType.UPDATED, new QueryTestValue(4), new QueryTestValue(3));
+
+ cache.remove(key);
+ cache.remove(key);
+
+ evts.clear();
+
+ log.info("Finish iteration: " + i);
+ }
}
}
finally {
- grid(0).destroyCache(ccfg.getName());
+ grid(getClientIndex()).destroyCache(ccfg.getName());
}
}
/**
+ * @param event Event.
+ * @param type Event type.
+ * @param val Value.
+ * @param oldVal Old value.
+ */
+ private void checkSingleEvent(
+ CacheEntryEvent<? extends QueryTestKey, ? extends QueryTestValue> event,
+ EventType type,
+ QueryTestValue val,
+ QueryTestValue oldVal) {
+ assertEquals(event.getEventType(), type);
+ assertEquals(event.getValue(), val);
+ assertEquals(event.getOldValue(), oldVal);
+ }
+
+ /**
* @throws Exception If failed.
*/
public void testTxClient() throws Exception {
@@ -717,7 +825,7 @@ public class CacheContinuousQueryRandomOperationsTest extends GridCommonAbstract
evtsQueues.add(evtsQueue);
- QueryCursor<?> cur = grid(NODES - 1).cache(ccfg.getName()).query(qry);
+ QueryCursor<?> cur = grid(getClientIndex()).cache(ccfg.getName()).query(qry);
curs.add(cur);
}
@@ -735,12 +843,12 @@ public class CacheContinuousQueryRandomOperationsTest extends GridCommonAbstract
evtsQueues.add(evtsQueue);
- QueryCursor<?> cur = grid(rnd.nextInt(NODES - 1)).cache(ccfg.getName()).query(qry);
+ QueryCursor<?> cur = grid(rnd.nextInt(getClientIndex())).cache(ccfg.getName()).query(qry);
curs.add(cur);
}
else {
- for (int i = 0; i < NODES - 1; i++) {
+ for (int i = 0; i < getClientIndex(); i++) {
ContinuousQuery<Object, Object> qry = new ContinuousQuery<>();
final BlockingQueue<CacheEntryEvent<?, ?>> evtsQueue = new ArrayBlockingQueue<>(50_000);
@@ -769,7 +877,7 @@ public class CacheContinuousQueryRandomOperationsTest extends GridCommonAbstract
if (i % 20 == 0)
log.info("Iteration: " + i);
- for (int idx = 0; idx < NODES; idx++)
+ for (int idx = 0; idx < getNodes(); idx++)
randomUpdate(rnd, evtsQueues, expData, partCntr, grid(idx).cache(ccfg.getName()));
}
}
@@ -784,6 +892,20 @@ public class CacheContinuousQueryRandomOperationsTest extends GridCommonAbstract
}
/**
+ * @return Client node index.
+ */
+ private int getClientIndex() {
+ return getNodes() - 1;
+ }
+
+ /**
+ * @return Count nodes.
+ */
+ protected int getNodes() {
+ return NODES;
+ }
+
+ /**
* @param rnd Random generator.
* @param evtsQueues Events queue.
* @param expData Expected cache data.
@@ -1333,9 +1455,11 @@ public class CacheContinuousQueryRandomOperationsTest extends GridCommonAbstract
*/
public static class QueryTestValue implements Serializable {
/** */
+ @GridToStringInclude
protected final Integer val1;
/** */
+ @GridToStringInclude
protected final String val2;
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/060f4ce3/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryRandomOperationsTwoNodesTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryRandomOperationsTwoNodesTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryRandomOperationsTwoNodesTest.java
new file mode 100644
index 0000000..2bbbb9f
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryRandomOperationsTwoNodesTest.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.query.continuous;
+
+/**
+ *
+ */
+public class CacheContinuousQueryRandomOperationsTwoNodesTest extends CacheContinuousQueryRandomOperationsTest {
+ /** {@inheritDoc} */
+ @Override protected int getNodes() {
+ return 2;
+ }
+}