You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by nt...@apache.org on 2016/02/16 16:35:10 UTC
ignite git commit: Fixed "IGNITE-2515 Make 'updateCntr' available
through CacheContinuousQueryEvent public API"
Repository: ignite
Updated Branches:
refs/heads/ignite-1.5.7 dd6681046 -> 4de124c41
Fixed "IGNITE-2515 Make 'updateCntr' available through CacheContinuousQueryEvent public API"
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/4de124c4
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/4de124c4
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/4de124c4
Branch: refs/heads/ignite-1.5.7
Commit: 4de124c41a30e8b4ee825c355686e16228a4390d
Parents: dd66810
Author: nikolay_tikhonov <nt...@gridgain.com>
Authored: Tue Feb 16 18:28:51 2016 +0300
Committer: nikolay_tikhonov <nt...@gridgain.com>
Committed: Tue Feb 16 18:30:05 2016 +0300
----------------------------------------------------------------------
.../cache/query/CacheQueryEntryEvent.java | 48 +
.../continuous/CacheContinuousQueryEvent.java | 17 +-
.../continuous/CacheContinuousQueryHandler.java | 96 +-
.../CacheContinuousQueryListener.java | 3 +-
.../continuous/CacheContinuousQueryManager.java | 27 +-
...CacheContinuousQueryCounterAbstractTest.java | 613 ++++++++++
...inuousQueryCounterPartitionedAtomicTest.java | 41 +
...ContinuousQueryCounterPartitionedTxTest.java | 41 +
...tinuousQueryCounterReplicatedAtomicTest.java | 41 +
...eContinuousQueryCounterReplicatedTxTest.java | 41 +
...acheContinuousQueryRandomOperationsTest.java | 1124 ++++++++++++++++++
.../IgniteCacheQuerySelfTestSuite.java | 10 +
12 files changed, 2019 insertions(+), 83 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/4de124c4/modules/core/src/main/java/org/apache/ignite/cache/query/CacheQueryEntryEvent.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/cache/query/CacheQueryEntryEvent.java b/modules/core/src/main/java/org/apache/ignite/cache/query/CacheQueryEntryEvent.java
new file mode 100644
index 0000000..2c1c5e6
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/cache/query/CacheQueryEntryEvent.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.cache.query;
+
+import javax.cache.Cache;
+import javax.cache.event.CacheEntryEvent;
+import javax.cache.event.EventType;
+
+/**
+ * A Cache continuous query entry event.
+ *
+ * @param <K> the type of key
+ * @param <V> the type of value
+ */
+public abstract class CacheQueryEntryEvent<K, V> extends CacheEntryEvent<K, V> {
+ /**
+ * Constructs a cache entry event from a given cache as source.
+ *
+ * @param source the cache that originated the event
+ * @param eventType Event type.
+ */
+ public CacheQueryEntryEvent(Cache source, EventType eventType) {
+ super(source, eventType);
+ }
+
+ /**
+ * Each cache update increases partition counter. The same cache updates have on the same value of counter
+ * on primary and backup nodes. This value can be useful to communicate with external applications.
+ *
+ * @return Value of counter for this event.
+ */
+ public abstract long getPartitionUpdateCounter();
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/4de124c4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEvent.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEvent.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEvent.java
index d26e666..eab5dbd 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEvent.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEvent.java
@@ -18,7 +18,7 @@
package org.apache.ignite.internal.processors.cache.query.continuous;
import javax.cache.Cache;
-import javax.cache.event.CacheEntryEvent;
+import org.apache.ignite.cache.query.CacheQueryEntryEvent;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.util.tostring.GridToStringExclude;
import org.apache.ignite.internal.util.typedef.internal.S;
@@ -26,7 +26,7 @@ import org.apache.ignite.internal.util.typedef.internal.S;
/**
* Continuous query event.
*/
-class CacheContinuousQueryEvent<K, V> extends CacheEntryEvent<K, V> {
+class CacheContinuousQueryEvent<K, V> extends CacheQueryEntryEvent<K, V> {
/** */
private static final long serialVersionUID = 0L;
@@ -57,8 +57,7 @@ class CacheContinuousQueryEvent<K, V> extends CacheEntryEvent<K, V> {
}
/** {@inheritDoc} */
- @Override
- public K getKey() {
+ @Override public K getKey() {
return (K)cctx.cacheObjectContext().unwrapBinaryIfNeeded(e.key(), e.isKeepBinary(), false);
}
@@ -68,8 +67,7 @@ class CacheContinuousQueryEvent<K, V> extends CacheEntryEvent<K, V> {
}
/** {@inheritDoc} */
- @Override
- public V getOldValue() {
+ @Override public V getOldValue() {
return (V)cctx.cacheObjectContext().unwrapBinaryIfNeeded(e.oldValue(), e.isKeepBinary(), false);
}
@@ -79,8 +77,13 @@ class CacheContinuousQueryEvent<K, V> extends CacheEntryEvent<K, V> {
}
/** {@inheritDoc} */
+ @Override public long getPartitionUpdateCounter() {
+ return e.updateCounter();
+ }
+
+ /** {@inheritDoc} */
@Override public <T> T unwrap(Class<T> cls) {
- if(cls.isAssignableFrom(getClass()))
+ if (cls.isAssignableFrom(getClass()))
return cls.cast(this);
throw new IllegalArgumentException("Unwrapping to class is not supported: " + cls);
http://git-wip-us.apache.org/repos/asf/ignite/blob/4de124c4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
index 115b67d..08fe62a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
@@ -325,9 +325,9 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
// skipPrimaryCheck is set only when listen locally for replicated cache events.
assert !skipPrimaryCheck || (cctx.isReplicated() && ctx.localNodeId().equals(nodeId));
- boolean notify = true;
+ boolean notify = !evt.entry().isFiltered();
- if (rmtFilter != null) {
+ if (notify && rmtFilter != null) {
try {
notify = rmtFilter.evaluate(evt);
}
@@ -472,38 +472,15 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
sendBackupAcknowledge(ackBuf.acknowledgeOnTimeout(), routineId, ctx);
}
- @Override public void skipUpdateEvent(CacheContinuousQueryEvent<K, V> evt, AffinityTopologyVersion topVer) {
- try {
- assert evt != null;
-
- CacheContinuousQueryEntry e = evt.entry();
-
- EntryBuffer buf = entryBufs.get(e.partition());
+ @Override public void skipUpdateEvent(CacheContinuousQueryEvent<K, V> evt, AffinityTopologyVersion topVer,
+ boolean primary) {
+ assert evt != null;
- if (buf == null) {
- buf = new EntryBuffer();
-
- EntryBuffer oldRec = entryBufs.putIfAbsent(e.partition(), buf);
-
- if (oldRec != null)
- buf = oldRec;
- }
+ CacheContinuousQueryEntry e = evt.entry();
- e = buf.skipEntry(e);
-
- if (e != null && !ctx.localNodeId().equals(nodeId))
- ctx.continuous().addNotification(nodeId, routineId, e, topic, sync, true);
- }
- catch (ClusterTopologyCheckedException ex) {
- IgniteLogger log = ctx.log(getClass());
+ e.markFiltered();
- if (log.isDebugEnabled())
- log.debug("Failed to send event notification to node, node left cluster " +
- "[node=" + nodeId + ", err=" + ex + ']');
- }
- catch (IgniteCheckedException ex) {
- U.error(ctx.log(getClass()), "Failed to send event notification to node: " + nodeId, ex);
- }
+ onEntryUpdated(evt, primary, false);
}
@Override public void onPartitionEvicted(int part) {
@@ -618,20 +595,22 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
for (CacheContinuousQueryEntry e : entries)
entries0.addAll(handleEvent(ctx, e));
- Iterable<CacheEntryEvent<? extends K, ? extends V>> evts = F.viewReadOnly(entries0,
- new C1<CacheContinuousQueryEntry, CacheEntryEvent<? extends K, ? extends V>>() {
- @Override public CacheEntryEvent<? extends K, ? extends V> apply(CacheContinuousQueryEntry e) {
- return new CacheContinuousQueryEvent<>(cache, cctx, e);
- }
- },
- new IgnitePredicate<CacheContinuousQueryEntry>() {
- @Override public boolean apply(CacheContinuousQueryEntry entry) {
- return !entry.isFiltered();
+ if (!entries0.isEmpty()) {
+ Iterable<CacheEntryEvent<? extends K, ? extends V>> evts = F.viewReadOnly(entries0,
+ new C1<CacheContinuousQueryEntry, CacheEntryEvent<? extends K, ? extends V>>() {
+ @Override public CacheEntryEvent<? extends K, ? extends V> apply(CacheContinuousQueryEntry e) {
+ return new CacheContinuousQueryEvent<>(cache, cctx, e);
+ }
+ },
+ new IgnitePredicate<CacheContinuousQueryEntry>() {
+ @Override public boolean apply(CacheContinuousQueryEntry entry) {
+ return !entry.isFiltered();
+ }
}
- }
- );
+ );
- locLsnr.onUpdated(evts);
+ locLsnr.onUpdated(evts);
+ }
}
/**
@@ -731,11 +710,11 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
* @param initCntr Update counters.
*/
public PartitionRecovery(IgniteLogger log, AffinityTopologyVersion topVer, @Nullable Long initCntr) {
- assert topVer.topologyVersion() > 0 : topVer;
-
this.log = log;
if (initCntr != null) {
+ assert topVer.topologyVersion() > 0 : topVer;
+
this.lastFiredEvt = initCntr;
curTop = topVer;
@@ -878,33 +857,6 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
}
/**
- * @param e Entry.
- * @return Continuous query entry.
- */
- private CacheContinuousQueryEntry skipEntry(CacheContinuousQueryEntry e) {
- if (lastFiredCntr.get() > e.updateCounter() || e.updateCounter() == 1) {
-
- e.markFiltered();
-
- return e;
- }
- else {
- buf.add(e.updateCounter());
-
- // Double check. If another thread sent a event with counter higher than this event.
- if (lastFiredCntr.get() > e.updateCounter() && buf.contains(e.updateCounter())) {
- buf.remove(e.updateCounter());
-
- e.markFiltered();
-
- return e;
- }
- else
- return null;
- }
- }
-
- /**
* Add continuous entry.
*
* @param e Cache continuous query entry.
http://git-wip-us.apache.org/repos/asf/ignite/blob/4de124c4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryListener.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryListener.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryListener.java
index 86abbef..5246fac 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryListener.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryListener.java
@@ -67,8 +67,9 @@ interface CacheContinuousQueryListener<K, V> {
/**
* @param evt Event
* @param topVer Topology version.
+ * @param primary Primary
*/
- public void skipUpdateEvent(CacheContinuousQueryEvent<K, V> evt, AffinityTopologyVersion topVer);
+ public void skipUpdateEvent(CacheContinuousQueryEvent<K, V> evt, AffinityTopologyVersion topVer, boolean primary);
/**
* @param part Partition.
http://git-wip-us.apache.org/repos/asf/ignite/blob/4de124c4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
index 8e4ff09..bb3c479 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
@@ -161,7 +161,25 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter {
* @param updCntr Updated counter.
* @param topVer Topology version.
*/
- public void skipUpdateEvent(KeyCacheObject key, int partId, long updCntr, AffinityTopologyVersion topVer) {
+ public void skipUpdateEvent(KeyCacheObject key,
+ int partId,
+ long updCntr,
+ AffinityTopologyVersion topVer) {
+ skipUpdateEvent(key, partId, updCntr, true, topVer);
+ }
+
+ /**
+ * @param key Entry key.
+ * @param partId Partition id.
+ * @param updCntr Updated counter.
+ * @param topVer Topology version.
+ * @param primary Primary.
+ */
+ public void skipUpdateEvent(KeyCacheObject key,
+ int partId,
+ long updCntr,
+ boolean primary,
+ AffinityTopologyVersion topVer) {
if (lsnrCnt.get() > 0) {
for (CacheContinuousQueryListener lsnr : lsnrs.values()) {
CacheContinuousQueryEntry e0 = new CacheContinuousQueryEntry(
@@ -178,7 +196,7 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter {
CacheContinuousQueryEvent evt = new CacheContinuousQueryEvent<>(
cctx.kernalContext().cache().jcache(cctx.name()), cctx, e0);
- lsnr.skipUpdateEvent(evt, topVer);
+ lsnr.skipUpdateEvent(evt, topVer, primary);
}
}
}
@@ -224,8 +242,11 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter {
boolean hasNewVal = newVal != null;
boolean hasOldVal = oldVal != null;
- if (!hasNewVal && !hasOldVal)
+ if (!hasNewVal && !hasOldVal) {
+ skipUpdateEvent(key, partId, updateCntr, primary, topVer);
+
return;
+ }
EventType evtType = !hasNewVal ? REMOVED : !hasOldVal ? CREATED : UPDATED;
http://git-wip-us.apache.org/repos/asf/ignite/blob/4de124c4/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryCounterAbstractTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryCounterAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryCounterAbstractTest.java
new file mode 100644
index 0000000..2ca9988
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryCounterAbstractTest.java
@@ -0,0 +1,613 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.query.continuous;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicInteger;
+import javax.cache.Cache;
+import javax.cache.configuration.Factory;
+import javax.cache.event.CacheEntryEvent;
+import javax.cache.event.CacheEntryUpdatedListener;
+import javax.cache.integration.CacheWriterException;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.cache.CacheEntryEventSerializableFilter;
+import org.apache.ignite.cache.CacheMode;
+import org.apache.ignite.cache.query.CacheQueryEntryEvent;
+import org.apache.ignite.cache.query.ContinuousQuery;
+import org.apache.ignite.cache.query.QueryCursor;
+import org.apache.ignite.cache.store.CacheStore;
+import org.apache.ignite.cache.store.CacheStoreAdapter;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.configuration.NearCacheConfiguration;
+import org.apache.ignite.internal.util.typedef.PA;
+import org.apache.ignite.internal.util.typedef.T2;
+import org.apache.ignite.lang.IgniteBiInClosure;
+import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
+import org.jsr166.ConcurrentHashMap8;
+
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
+import static org.apache.ignite.cache.CacheMode.LOCAL;
+import static org.apache.ignite.cache.CacheRebalanceMode.ASYNC;
+import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
+
+/**
+ * Continuous queries counter tests.
+ */
+public abstract class CacheContinuousQueryCounterAbstractTest extends GridCommonAbstractTest
+ implements Serializable {
+ /** */
+ protected static final String CACHE_NAME = "test_cache";
+
+ /** IP finder. */
+ private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true);
+
+ /** Latch timeout. */
+ protected static final long LATCH_TIMEOUT = 5000;
+
+ /** */
+ private static final String NO_CACHE_GRID_NAME = "noCacheGrid";
+
+ /** {@inheritDoc} */
+ @SuppressWarnings("unchecked")
+ @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+ cfg.setPeerClassLoadingEnabled(peerClassLoadingEnabled());
+
+ if (gridName.equals(NO_CACHE_GRID_NAME))
+ cfg.setClientMode(true);
+
+ TcpDiscoverySpi disco = new TcpDiscoverySpi();
+
+ disco.setIpFinder(IP_FINDER);
+
+ cfg.setDiscoverySpi(disco);
+
+ ((TcpCommunicationSpi)cfg.getCommunicationSpi()).setSharedMemoryPort(-1);
+
+ return cfg;
+ }
+
+ /**
+ * @return Cache configuration.
+ */
+ @NotNull private CacheConfiguration cacheConfiguration() {
+ CacheConfiguration cacheCfg = defaultCacheConfiguration();
+
+ cacheCfg.setName(CACHE_NAME);
+ cacheCfg.setCacheMode(cacheMode());
+ cacheCfg.setAtomicityMode(atomicityMode());
+ cacheCfg.setNearConfiguration(nearConfiguration());
+ cacheCfg.setRebalanceMode(ASYNC);
+ cacheCfg.setWriteSynchronizationMode(FULL_SYNC);
+ cacheCfg.setCacheStoreFactory(new StoreFactory());
+ cacheCfg.setReadThrough(true);
+ cacheCfg.setWriteThrough(true);
+ cacheCfg.setLoadPreviousValue(true);
+
+ return cacheCfg;
+ }
+
+ /**
+ * @return Peer class loading enabled flag.
+ */
+ protected boolean peerClassLoadingEnabled() {
+ return true;
+ }
+
+ /**
+ * @return Distribution.
+ */
+ protected NearCacheConfiguration nearConfiguration() {
+ return new NearCacheConfiguration();
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void beforeTestsStarted() throws Exception {
+ startGridsMultiThreaded(gridCount());
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTestsStopped() throws Exception {
+ stopAllGrids();
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void beforeTest() throws Exception {
+ GridTestUtils.waitForCondition(new PA() {
+ @Override public boolean apply() {
+ for (int i = 0; i < gridCount(); i++) {
+ if (grid(i).cluster().nodes().size() != gridCount())
+ return false;
+ }
+
+ return true;
+ }
+ }, 3000);
+
+ for (int i = 0; i < gridCount(); i++)
+ grid(i).destroyCache(CACHE_NAME);
+
+ for (int i = 0; i < gridCount(); i++)
+ grid(i).getOrCreateCache(cacheConfiguration());
+ }
+
+ /**
+ * @return Cache mode.
+ */
+ protected abstract CacheMode cacheMode();
+
+ /**
+ * @return Atomicity mode.
+ */
+ protected CacheAtomicityMode atomicityMode() {
+ return TRANSACTIONAL;
+ }
+
+ /**
+ * @return Grids count.
+ */
+ protected abstract int gridCount();
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testAllEntries() throws Exception {
+ IgniteCache<Integer, Integer> cache = grid(0).cache(CACHE_NAME);
+
+ ContinuousQuery<Integer, Integer> qry = new ContinuousQuery<>();
+
+ final Map<Integer, List<T2<Integer, Long>>> map = new HashMap<>();
+ final CountDownLatch latch = new CountDownLatch(5);
+
+ qry.setLocalListener(new CacheEntryUpdatedListener<Integer, Integer>() {
+ @Override public void onUpdated(Iterable<CacheEntryEvent<? extends Integer, ? extends Integer>> evts) {
+ for (CacheEntryEvent<? extends Integer, ? extends Integer> e : evts) {
+ synchronized (map) {
+ List<T2<Integer, Long>> vals = map.get(e.getKey());
+
+ if (vals == null) {
+ vals = new ArrayList<>();
+
+ map.put(e.getKey(), vals);
+ }
+
+ vals.add(new T2<>(e.getValue(), e
+ .unwrap(CacheQueryEntryEvent.class).getPartitionUpdateCounter()));
+ }
+
+ latch.countDown();
+ }
+ }
+ });
+
+ try (QueryCursor<Cache.Entry<Integer, Integer>> ignored = cache.query(qry)) {
+ cache.put(1, 1);
+ cache.put(2, 2);
+ cache.put(3, 3);
+
+ cache.remove(2);
+
+ cache.put(1, 10);
+
+ assert latch.await(LATCH_TIMEOUT, MILLISECONDS);
+
+ assertEquals(3, map.size());
+
+ List<T2<Integer, Long>> vals = map.get(1);
+
+ assertNotNull(vals);
+ assertEquals(2, vals.size());
+ assertEquals(1, (int)vals.get(0).get1());
+ assertEquals(1L, (long)vals.get(0).get2());
+ assertEquals(10, (int)vals.get(1).get1());
+ assertEquals(2L, (long)vals.get(1).get2());
+
+ vals = map.get(2);
+
+ assertNotNull(vals);
+ assertEquals(2, vals.size());
+ assertEquals(2, (int)vals.get(0).get1());
+ assertEquals(1L, (long)vals.get(0).get2());
+ assertNull(vals.get(1).get1());
+
+ vals = map.get(3);
+
+ assertNotNull(vals);
+ assertEquals(1, vals.size());
+ assertEquals(3, (int)vals.get(0).get1());
+ assertEquals(1L, (long)vals.get(0).get2());
+ }
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testTwoQueryListener() throws Exception {
+ if (cacheMode() == LOCAL)
+ return;
+
+ final IgniteCache<Integer, Integer> cache = grid(0).cache(CACHE_NAME);
+ final IgniteCache<Integer, Integer> cache1 = grid(1).cache(CACHE_NAME);
+
+ final AtomicInteger cntr = new AtomicInteger(0);
+ final AtomicInteger cntr1 = new AtomicInteger(0);
+
+ final ContinuousQuery<Integer, Integer> qry1 = new ContinuousQuery<>();
+ final ContinuousQuery<Integer, Integer> qry2 = new ContinuousQuery<>();
+
+ final Map<Integer, List<T2<Integer, Long>>> map1 = new HashMap<>();
+ final Map<Integer, List<T2<Integer, Long>>> map2 = new HashMap<>();
+
+ qry1.setLocalListener(new CacheEntryUpdatedListener<Integer, Integer>() {
+ @Override public void onUpdated(Iterable<CacheEntryEvent<? extends Integer, ? extends Integer>> evts) {
+ for (CacheEntryEvent<? extends Integer, ? extends Integer> e : evts) {
+ cntr.incrementAndGet();
+
+ synchronized (map1) {
+ List<T2<Integer, Long>> vals = map1.get(e.getKey());
+
+ if (vals == null) {
+ vals = new ArrayList<>();
+
+ map1.put(e.getKey(), vals);
+ }
+
+ vals.add(new T2<>(e.getValue(),
+ e.unwrap(CacheQueryEntryEvent.class).getPartitionUpdateCounter()));
+ }
+ }
+ }
+ });
+
+ qry2.setLocalListener(new CacheEntryUpdatedListener<Integer, Integer>() {
+ @Override public void onUpdated(Iterable<CacheEntryEvent<? extends Integer, ? extends Integer>> evts) {
+ for (CacheEntryEvent<? extends Integer, ? extends Integer> e : evts) {
+ cntr1.incrementAndGet();
+
+ synchronized (map2) {
+ List<T2<Integer, Long>> vals = map2.get(e.getKey());
+
+ if (vals == null) {
+ vals = new ArrayList<>();
+
+ map2.put(e.getKey(), vals);
+ }
+
+ vals.add(new T2<>(e.getValue(),
+ e.unwrap(CacheQueryEntryEvent.class).getPartitionUpdateCounter()));
+ }
+ }
+ }
+ });
+
+ try (QueryCursor<Cache.Entry<Integer, Integer>> query2 = cache1.query(qry2);
+ QueryCursor<Cache.Entry<Integer, Integer>> query1 = cache.query(qry1)) {
+ for (int i = 0; i < gridCount(); i++) {
+ IgniteCache<Object, Object> cache0 = grid(i).cache(CACHE_NAME);
+
+ cache0.put(1, 1);
+ cache0.put(2, 2);
+ cache0.put(3, 3);
+
+ cache0.remove(1);
+ cache0.remove(2);
+ cache0.remove(3);
+
+ final int iter = i + 1;
+
+ assert GridTestUtils.waitForCondition(new PA() {
+ @Override public boolean apply() {
+ return iter * 6 /* count operation */ * 2 /* count continues queries*/
+ == (cntr.get() + cntr1.get());
+ }
+ }, 5000L);
+
+ checkEvents(map1, i);
+
+ map1.clear();
+
+ checkEvents(map2, i);
+
+ map2.clear();
+ }
+ }
+ }
+
+ /**
+ * @param evnts Events.
+ * @param iter Iteration.
+ */
+ private void checkEvents(Map<Integer, List<T2<Integer, Long>>> evnts, long iter) {
+ List<T2<Integer, Long>> val = evnts.get(1);
+
+ assertEquals(val.size(), 2);
+
+ // Check put 1
+ assertEquals(iter * 2 + 1, (long)val.get(0).get2());
+ assertEquals(1L, (long)val.get(0).get1());
+
+ // Check remove 1
+ assertEquals(iter * 2 + 2, (long)val.get(1).get2());
+ assertNull(val.get(1).get1());
+
+ val = evnts.get(2);
+
+ assertEquals(val.size(), 2);
+
+ // Check put 2
+ assertEquals(iter * 2 + 1, (long)val.get(0).get2());
+ assertEquals(2L, (long)val.get(0).get1());
+
+ // Check remove 2
+ assertEquals(iter * 2 + 2, (long)val.get(1).get2());
+ assertNull(val.get(1).get1());
+
+ val = evnts.get(3);
+
+ assertEquals(val.size(), 2);
+
+ // Check put 3
+ assertEquals(iter * 2 + 1, (long)val.get(0).get2());
+ assertEquals(3L, (long)val.get(0).get1());
+
+ // Check remove 3
+ assertEquals(iter * 2 + 2, (long)val.get(1).get2());
+ assertNull(val.get(1).get1());
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testRestartQuery() throws Exception {
+ IgniteCache<Integer, Integer> cache = grid(0).cache(CACHE_NAME);
+
+ final int keyCnt = 300;
+
+ final int updateKey = 1;
+
+ for (int i = 0; i < keyCnt; i++)
+ cache.put(updateKey, i);
+
+ for (int i = 0; i < 10; i++) {
+ if (i % 2 == 0) {
+ final AtomicInteger cntr = new AtomicInteger(0);
+
+ ContinuousQuery<Integer, Integer> qry = new ContinuousQuery<>();
+
+ final List<T2<Integer, Long>> vals = new ArrayList<>();
+
+ qry.setLocalListener(new CacheEntryUpdatedListener<Integer, Integer>() {
+ @Override public void onUpdated(
+ Iterable<CacheEntryEvent<? extends Integer, ? extends Integer>> evts) {
+ for (CacheEntryEvent<? extends Integer, ? extends Integer> e : evts) {
+ synchronized (vals) {
+ cntr.incrementAndGet();
+
+ vals.add(new T2<>(e.getValue(),
+ e.unwrap(CacheQueryEntryEvent.class).getPartitionUpdateCounter()));
+ }
+ }
+ }
+ });
+
+ try (QueryCursor<Cache.Entry<Integer, Integer>> ignore = cache.query(qry)) {
+ for (int key = 0; key < keyCnt; key++)
+ cache.put(updateKey, cache.get(updateKey) + 1);
+
+ assert GridTestUtils.waitForCondition(new PA() {
+ @Override public boolean apply() {
+ return cntr.get() == keyCnt;
+ }
+ }, 2000L);
+
+ synchronized (vals) {
+ for (T2<Integer, Long> val : vals)
+ assertEquals((long)val.get1() + 1, (long)val.get2());
+ }
+ }
+ }
+ else {
+ for (int key = 0; key < keyCnt; key++)
+ cache.put(updateKey, cache.get(updateKey) + 1);
+
+ }
+ }
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testEntriesByFilter() throws Exception {
+ IgniteCache<Integer, Integer> cache = grid(0).cache(CACHE_NAME);
+
+ ContinuousQuery<Integer, Integer> qry = new ContinuousQuery<>();
+
+ final Map<Integer, List<T2<Integer, Long>>> map = new HashMap<>();
+ final CountDownLatch latch = new CountDownLatch(8);
+
+ qry.setLocalListener(new CacheEntryUpdatedListener<Integer, Integer>() {
+ @Override public void onUpdated(Iterable<CacheEntryEvent<? extends Integer, ? extends Integer>> evts) {
+ for (CacheEntryEvent<? extends Integer, ? extends Integer> e : evts) {
+ synchronized (map) {
+ List<T2<Integer, Long>> vals = map.get(e.getKey());
+
+ if (vals == null) {
+ vals = new ArrayList<>();
+
+ map.put(e.getKey(), vals);
+ }
+
+ vals.add(new T2<>(e.getValue(),
+ e.unwrap(CacheQueryEntryEvent.class).getPartitionUpdateCounter()));
+ }
+
+ latch.countDown();
+ }
+ }
+ });
+
+ qry.setRemoteFilter(new CacheEntryEventSerializableFilter<Integer,Integer>() {
+ @Override public boolean evaluate(CacheEntryEvent<? extends Integer,? extends Integer> evt) {
+ return evt.getValue() % 2 == 0;
+ }
+ });
+
+ try (QueryCursor<Cache.Entry<Integer, Integer>> ignored = cache.query(qry)) {
+ cache.put(1, 1);
+ cache.put(1, 2);
+ cache.put(1, 3);
+ cache.put(1, 4);
+
+ cache.put(2, 1);
+ cache.put(2, 2);
+ cache.put(2, 3);
+ cache.put(2, 4);
+
+ cache.remove(1);
+ cache.remove(2);
+
+ cache.put(1, 10);
+ cache.put(2, 40);
+
+ assert latch.await(LATCH_TIMEOUT, MILLISECONDS);
+
+ assertEquals(2, map.size());
+
+ List<T2<Integer, Long>> vals = map.get(1);
+
+ assertNotNull(vals);
+ assertEquals(4, vals.size());
+
+ assertEquals((int)vals.get(0).get1(), 2);
+ assertEquals((long)vals.get(0).get1(), (long)vals.get(0).get2());
+
+ assertEquals((int)vals.get(1).get1(), 4);
+ assertEquals((long)vals.get(1).get1(), (long)vals.get(1).get2());
+
+ assertNull(vals.get(2).get1());
+ assertEquals(5, (long)vals.get(2).get2());
+
+ assertEquals((int)vals.get(3).get1(), 10);
+ assertEquals(6, (long)vals.get(3).get2());
+
+ vals = map.get(2);
+
+ assertNotNull(vals);
+ assertEquals(4, vals.size());
+
+ assertEquals((int)vals.get(0).get1(), 2);
+ assertEquals((long)vals.get(0).get1(), (long)vals.get(0).get2());
+
+ assertEquals((int)vals.get(1).get1(), 4);
+ assertEquals((long)vals.get(1).get1(), (long)vals.get(1).get2());
+
+ assertNull(vals.get(2).get1());
+ assertEquals(5, (long)vals.get(2).get2());
+
+ assertEquals((int)vals.get(3).get1(), 40);
+ assertEquals(6, (long)vals.get(3).get2());
+ }
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testLoadCache() throws Exception {
+ IgniteCache<Integer, Integer> cache = grid(0).cache(CACHE_NAME);
+
+ ContinuousQuery<Integer, Integer> qry = new ContinuousQuery<>();
+
+ final Map<Integer, T2<Integer, Long>> map = new ConcurrentHashMap8<>();
+ final CountDownLatch latch = new CountDownLatch(10);
+
+ qry.setLocalListener(new CacheEntryUpdatedListener<Integer, Integer>() {
+ @Override public void onUpdated(Iterable<CacheEntryEvent<? extends Integer, ? extends Integer>> evts) {
+ for (CacheEntryEvent<? extends Integer, ? extends Integer> e : evts) {
+ map.put(e.getKey(), new T2<>(e.getValue(),
+ e.unwrap(CacheQueryEntryEvent.class).getPartitionUpdateCounter()));
+
+ latch.countDown();
+ }
+ }
+ });
+
+ try (QueryCursor<Cache.Entry<Integer, Integer>> ignored = cache.query(qry)) {
+ cache.loadCache(null, 0);
+
+ assert latch.await(LATCH_TIMEOUT, MILLISECONDS) : "Count: " + latch.getCount();
+
+ assertEquals(10, map.size());
+
+ for (int i = 0; i < 10; i++) {
+ assertEquals(i, (int)map.get(i).get1());
+ assertEquals((long)1, (long)map.get(i).get2());
+ }
+ }
+ }
+
+ /**
+ *
+ */
+ private static class StoreFactory implements Factory<CacheStore> {
+ @Override public CacheStore create() {
+ return new TestStore();
+ }
+ }
+
+ /**
+ * Store.
+ */
+ private static class TestStore extends CacheStoreAdapter<Object, Object> {
+ /** {@inheritDoc} */
+ @Override public void loadCache(IgniteBiInClosure<Object, Object> clo, Object... args) {
+ for (int i = 0; i < 10; i++)
+ clo.apply(i, i);
+ }
+
+ /** {@inheritDoc} */
+ @Nullable @Override public Object load(Object key) {
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void write(Cache.Entry<?, ?> entry) throws CacheWriterException {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
+ @Override public void delete(Object key) throws CacheWriterException {
+ // No-op.
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/4de124c4/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryCounterPartitionedAtomicTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryCounterPartitionedAtomicTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryCounterPartitionedAtomicTest.java
new file mode 100644
index 0000000..7b97928
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryCounterPartitionedAtomicTest.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.query.continuous;
+
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.cache.CacheMode;
+
+/**
+ *
+ */
+public class CacheContinuousQueryCounterPartitionedAtomicTest extends CacheContinuousQueryCounterAbstractTest {
+ /** {@inheritDoc} */
+ @Override protected CacheMode cacheMode() {
+ return CacheMode.PARTITIONED;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected CacheAtomicityMode atomicityMode() {
+ return CacheAtomicityMode.ATOMIC;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected int gridCount() {
+ return 2;
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/4de124c4/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryCounterPartitionedTxTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryCounterPartitionedTxTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryCounterPartitionedTxTest.java
new file mode 100644
index 0000000..aa42832
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryCounterPartitionedTxTest.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.query.continuous;
+
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.cache.CacheMode;
+
+/**
+ *
+ */
+public class CacheContinuousQueryCounterPartitionedTxTest extends CacheContinuousQueryCounterAbstractTest {
+ /** {@inheritDoc} */
+ @Override protected CacheMode cacheMode() {
+ return CacheMode.PARTITIONED;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected CacheAtomicityMode atomicityMode() {
+ return CacheAtomicityMode.TRANSACTIONAL;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected int gridCount() {
+ return 2;
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/4de124c4/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryCounterReplicatedAtomicTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryCounterReplicatedAtomicTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryCounterReplicatedAtomicTest.java
new file mode 100644
index 0000000..afa7a22
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryCounterReplicatedAtomicTest.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.query.continuous;
+
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.cache.CacheMode;
+
+/**
+ *
+ */
+public class CacheContinuousQueryCounterReplicatedAtomicTest extends CacheContinuousQueryCounterAbstractTest {
+ /** {@inheritDoc} */
+ @Override protected CacheMode cacheMode() {
+ return CacheMode.REPLICATED;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected CacheAtomicityMode atomicityMode() {
+ return CacheAtomicityMode.ATOMIC;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected int gridCount() {
+ return 2;
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/4de124c4/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryCounterReplicatedTxTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryCounterReplicatedTxTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryCounterReplicatedTxTest.java
new file mode 100644
index 0000000..4ee12de
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryCounterReplicatedTxTest.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.query.continuous;
+
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.cache.CacheMode;
+
+/**
+ *
+ */
+public class CacheContinuousQueryCounterReplicatedTxTest extends CacheContinuousQueryCounterAbstractTest {
+ /** {@inheritDoc} */
+ @Override protected CacheMode cacheMode() {
+ return CacheMode.PARTITIONED;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected CacheAtomicityMode atomicityMode() {
+ return CacheAtomicityMode.TRANSACTIONAL;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected int gridCount() {
+ return 2;
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/4de124c4/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryRandomOperationsTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryRandomOperationsTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryRandomOperationsTest.java
new file mode 100644
index 0000000..dc86f4e
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryRandomOperationsTest.java
@@ -0,0 +1,1124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.query.continuous;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.SortedMap;
+import java.util.TreeMap;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import javax.cache.Cache;
+import javax.cache.configuration.Factory;
+import javax.cache.event.CacheEntryEvent;
+import javax.cache.event.CacheEntryUpdatedListener;
+import javax.cache.integration.CacheLoaderException;
+import javax.cache.integration.CacheWriterException;
+import javax.cache.processor.EntryProcessor;
+import javax.cache.processor.MutableEntry;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.cache.CacheMemoryMode;
+import org.apache.ignite.cache.CacheMode;
+import org.apache.ignite.cache.affinity.Affinity;
+import org.apache.ignite.cache.query.CacheQueryEntryEvent;
+import org.apache.ignite.cache.query.ContinuousQuery;
+import org.apache.ignite.cache.query.QueryCursor;
+import org.apache.ignite.cache.store.CacheStore;
+import org.apache.ignite.cache.store.CacheStoreAdapter;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.apache.ignite.transactions.Transaction;
+import org.apache.ignite.transactions.TransactionConcurrency;
+import org.apache.ignite.transactions.TransactionIsolation;
+
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static java.util.concurrent.TimeUnit.SECONDS;
+import static org.apache.ignite.cache.CacheAtomicWriteOrderMode.PRIMARY;
+import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC;
+import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
+import static org.apache.ignite.cache.CacheMemoryMode.ONHEAP_TIERED;
+import static org.apache.ignite.cache.CacheMode.PARTITIONED;
+import static org.apache.ignite.cache.CacheMode.REPLICATED;
+import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
+import static org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryRandomOperationsTest.ContinuousDeploy.ALL;
+import static org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryRandomOperationsTest.ContinuousDeploy.CLIENT;
+import static org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryRandomOperationsTest.ContinuousDeploy.SERVER;
+import static org.apache.ignite.transactions.TransactionIsolation.READ_COMMITTED;
+import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ;
+import static org.apache.ignite.transactions.TransactionIsolation.SERIALIZABLE;
+
+/**
+ *
+ */
+public class CacheContinuousQueryRandomOperationsTest extends GridCommonAbstractTest {
+ /** */
+ private static TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true);
+
+ /** */
+ private static final int NODES = 5;
+
+ /** */
+ private static final int KEYS = 50;
+
+ /** */
+ private static final int VALS = 10;
+
+ /** */
+ public static final int ITERATION_CNT = 100;
+
+ /** */
+ private boolean client;
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+ ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(ipFinder);
+
+ cfg.setClientMode(client);
+
+ return cfg;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void beforeTestsStarted() throws Exception {
+ super.beforeTestsStarted();
+
+ startGridsMultiThreaded(NODES - 1);
+
+ client = true;
+
+ startGrid(NODES - 1);
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTestsStopped() throws Exception {
+ stopAllGrids();
+
+ super.afterTestsStopped();
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testAtomicClient() throws Exception {
+ CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED,
+ 1,
+ ATOMIC,
+ ONHEAP_TIERED,
+ false);
+
+ testContinuousQuery(ccfg, CLIENT);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testAtomic() throws Exception {
+ CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED,
+ 1,
+ ATOMIC,
+ ONHEAP_TIERED,
+ false);
+
+ testContinuousQuery(ccfg, SERVER);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testAtomicAllNodes() throws Exception {
+ CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED,
+ 1,
+ ATOMIC,
+ ONHEAP_TIERED,
+ false);
+
+ testContinuousQuery(ccfg, ALL);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testAtomicReplicated() throws Exception {
+ CacheConfiguration<Object, Object> ccfg = cacheConfiguration(REPLICATED,
+ 0,
+ ATOMIC,
+ ONHEAP_TIERED,
+ false);
+
+ testContinuousQuery(ccfg, SERVER);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testAtomicReplicatedAllNodes() throws Exception {
+ CacheConfiguration<Object, Object> ccfg = cacheConfiguration(REPLICATED,
+ 0,
+ ATOMIC,
+ ONHEAP_TIERED,
+ false);
+
+ testContinuousQuery(ccfg, ALL);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testAtomicReplicatedClient() throws Exception {
+ CacheConfiguration<Object, Object> ccfg = cacheConfiguration(REPLICATED,
+ 0,
+ ATOMIC,
+ ONHEAP_TIERED,
+ false);
+
+ testContinuousQuery(ccfg, CLIENT);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testAtomicNoBackups() throws Exception {
+ CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED,
+ 0,
+ ATOMIC,
+ ONHEAP_TIERED,
+ false);
+
+ testContinuousQuery(ccfg, SERVER);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testAtomicNoBackupsAllNodes() throws Exception {
+ CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED,
+ 0,
+ ATOMIC,
+ ONHEAP_TIERED,
+ false);
+
+ testContinuousQuery(ccfg, ALL);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testAtomicNoBackupsClient() throws Exception {
+ CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED,
+ 0,
+ ATOMIC,
+ ONHEAP_TIERED,
+ false);
+
+ testContinuousQuery(ccfg, CLIENT);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testTx() throws Exception {
+ CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED,
+ 1,
+ TRANSACTIONAL,
+ ONHEAP_TIERED,
+ false);
+
+ testContinuousQuery(ccfg, SERVER);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testTxAllNodes() throws Exception {
+ CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED,
+ 1,
+ TRANSACTIONAL,
+ ONHEAP_TIERED,
+ false);
+
+ testContinuousQuery(ccfg, ALL);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testTxExplicit() throws Exception {
+ CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED,
+ 1,
+ TRANSACTIONAL,
+ ONHEAP_TIERED,
+ false);
+
+ testContinuousQuery(ccfg, SERVER);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testTxClient() throws Exception {
+ CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED,
+ 1,
+ TRANSACTIONAL,
+ ONHEAP_TIERED,
+ false);
+
+ testContinuousQuery(ccfg, CLIENT);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testTxClientExplicit() throws Exception {
+ CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED,
+ 1,
+ TRANSACTIONAL,
+ ONHEAP_TIERED,
+ false);
+
+ testContinuousQuery(ccfg, CLIENT);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testTxReplicated() throws Exception {
+ CacheConfiguration<Object, Object> ccfg = cacheConfiguration(REPLICATED,
+ 0,
+ TRANSACTIONAL,
+ ONHEAP_TIERED,
+ false);
+
+ testContinuousQuery(ccfg, SERVER);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testTxReplicatedClient() throws Exception {
+ CacheConfiguration<Object, Object> ccfg = cacheConfiguration(REPLICATED,
+ 0,
+ TRANSACTIONAL,
+ ONHEAP_TIERED,
+ false);
+
+ testContinuousQuery(ccfg, CLIENT);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testTxNoBackups() throws Exception {
+ CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED,
+ 0,
+ TRANSACTIONAL,
+ ONHEAP_TIERED,
+ false);
+
+ testContinuousQuery(ccfg, SERVER);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testTxNoBackupsAllNodes() throws Exception {
+ CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED,
+ 0,
+ TRANSACTIONAL,
+ ONHEAP_TIERED,
+ false);
+
+ testContinuousQuery(ccfg, ALL);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testTxNoBackupsExplicit() throws Exception {
+ CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED,
+ 0,
+ TRANSACTIONAL,
+ ONHEAP_TIERED,
+ false);
+
+ testContinuousQuery(ccfg, SERVER);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testTxNoBackupsClient() throws Exception {
+ CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED,
+ 0,
+ TRANSACTIONAL,
+ ONHEAP_TIERED,
+ false);
+
+ testContinuousQuery(ccfg, CLIENT);
+ }
+
+ /**
+ * @param ccfg Cache configuration.
+ * @param deploy The place where continuous query will be started.
+ * @throws Exception If failed.
+ */
+ private void testContinuousQuery(CacheConfiguration<Object, Object> ccfg, ContinuousDeploy deploy)
+ throws Exception {
+ ignite(0).createCache(ccfg);
+
+ try {
+ long seed = System.currentTimeMillis();
+
+ Random rnd = new Random(seed);
+
+ log.info("Random seed: " + seed);
+
+ List<BlockingQueue<CacheEntryEvent<?, ?>>> evtsQueues = new ArrayList<>();
+
+ Collection<QueryCursor<?>> curs = new ArrayList<>();
+
+ if (deploy == CLIENT) {
+ ContinuousQuery<Object, Object> qry = new ContinuousQuery<>();
+
+ final BlockingQueue<CacheEntryEvent<?, ?>> evtsQueue = new ArrayBlockingQueue<>(50_000);
+
+ qry.setLocalListener(new CacheEntryUpdatedListener<Object, Object>() {
+ @Override public void onUpdated(Iterable<CacheEntryEvent<?, ?>> evts) {
+ for (CacheEntryEvent<?, ?> evt : evts)
+ evtsQueue.add(evt);
+ }
+ });
+
+ evtsQueues.add(evtsQueue);
+
+ QueryCursor<?> cur = grid(NODES - 1).cache(ccfg.getName()).query(qry);
+
+ curs.add(cur);
+ }
+ else if (deploy == SERVER) {
+ ContinuousQuery<Object, Object> qry = new ContinuousQuery<>();
+
+ final BlockingQueue<CacheEntryEvent<?, ?>> evtsQueue = new ArrayBlockingQueue<>(50_000);
+
+ qry.setLocalListener(new CacheEntryUpdatedListener<Object, Object>() {
+ @Override public void onUpdated(Iterable<CacheEntryEvent<?, ?>> evts) {
+ for (CacheEntryEvent<?, ?> evt : evts)
+ evtsQueue.add(evt);
+ }
+ });
+
+ evtsQueues.add(evtsQueue);
+
+ QueryCursor<?> cur = grid(rnd.nextInt(NODES - 1)).cache(ccfg.getName()).query(qry);
+
+ curs.add(cur);
+ }
+ else {
+ for (int i = 0; i < NODES - 1; i++) {
+ ContinuousQuery<Object, Object> qry = new ContinuousQuery<>();
+
+ final BlockingQueue<CacheEntryEvent<?, ?>> evtsQueue = new ArrayBlockingQueue<>(50_000);
+
+ qry.setLocalListener(new CacheEntryUpdatedListener<Object, Object>() {
+ @Override public void onUpdated(Iterable<CacheEntryEvent<?, ?>> evts) {
+ for (CacheEntryEvent<?, ?> evt : evts)
+ evtsQueue.add(evt);
+ }
+ });
+
+ evtsQueues.add(evtsQueue);
+
+ QueryCursor<?> cur = ignite(i).cache(ccfg.getName()).query(qry);
+
+ curs.add(cur);
+ }
+ }
+
+ ConcurrentMap<Object, Object> expData = new ConcurrentHashMap<>();
+
+ Map<Integer, Long> partCntr = new ConcurrentHashMap<>();
+
+ try {
+ for (int i = 0; i < ITERATION_CNT; i++) {
+ if (i % 20 == 0)
+ log.info("Iteration: " + i);
+
+ for (int idx = 0; idx < NODES; idx++)
+ randomUpdate(rnd, evtsQueues, expData, partCntr, grid(idx).cache(ccfg.getName()));
+ }
+ }
+ finally {
+ for (QueryCursor<?> cur : curs)
+ cur.close();
+ }
+ }
+ finally {
+ ignite(0).destroyCache(ccfg.getName());
+ }
+ }
+
+ /**
+ * @param rnd Random generator.
+ * @param evtsQueues Events queue.
+ * @param expData Expected cache data.
+ * @param partCntr Partition counter.
+ * @param cache Cache.
+ * @throws Exception If failed.
+ */
+ private void randomUpdate(
+ Random rnd,
+ List<BlockingQueue<CacheEntryEvent<?, ?>>> evtsQueues,
+ ConcurrentMap<Object, Object> expData,
+ Map<Integer, Long> partCntr,
+ IgniteCache<Object, Object> cache)
+ throws Exception {
+ Object key = new QueryTestKey(rnd.nextInt(KEYS));
+ Object newVal = value(rnd);
+ Object oldVal = expData.get(key);
+
+ int op = rnd.nextInt(13);
+
+ Ignite ignite = cache.unwrap(Ignite.class);
+
+ Transaction tx = null;
+
+ if (cache.getConfiguration(CacheConfiguration.class).getAtomicityMode() == TRANSACTIONAL && rnd.nextBoolean())
+ tx = ignite.transactions().txStart(txRandomConcurrency(rnd), txRandomIsolation(rnd));
+
+ try {
+ // log.info("Random operation [key=" + key + ", op=" + op + ']');
+
+ switch (op) {
+ case 0: {
+ cache.put(key, newVal);
+
+ if (tx != null)
+ tx.commit();
+
+ updatePartitionCounter(cache, key, partCntr);
+
+ waitAndCheckEvent(evtsQueues, partCntr, affinity(cache), key, newVal, oldVal);
+
+ expData.put(key, newVal);
+
+ break;
+ }
+
+ case 1: {
+ cache.getAndPut(key, newVal);
+
+ if (tx != null)
+ tx.commit();
+
+ updatePartitionCounter(cache, key, partCntr);
+
+ waitAndCheckEvent(evtsQueues, partCntr, affinity(cache), key, newVal, oldVal);
+
+ expData.put(key, newVal);
+
+ break;
+ }
+
+ case 2: {
+ cache.remove(key);
+
+ if (tx != null)
+ tx.commit();
+
+ updatePartitionCounter(cache, key, partCntr);
+
+ waitAndCheckEvent(evtsQueues, partCntr, affinity(cache), key, null, oldVal);
+
+ expData.remove(key);
+
+ break;
+ }
+
+ case 3: {
+ cache.getAndRemove(key);
+
+ if (tx != null)
+ tx.commit();
+
+ updatePartitionCounter(cache, key, partCntr);
+
+ waitAndCheckEvent(evtsQueues, partCntr, affinity(cache), key, null, oldVal);
+
+ expData.remove(key);
+
+ break;
+ }
+
+ case 4: {
+ cache.invoke(key, new EntrySetValueProcessor(newVal, rnd.nextBoolean()));
+
+ if (tx != null)
+ tx.commit();
+
+ updatePartitionCounter(cache, key, partCntr);
+
+ waitAndCheckEvent(evtsQueues, partCntr, affinity(cache), key, newVal, oldVal);
+
+ expData.put(key, newVal);
+
+ break;
+ }
+
+ case 5: {
+ cache.invoke(key, new EntrySetValueProcessor(null, rnd.nextBoolean()));
+
+ if (tx != null)
+ tx.commit();
+
+ updatePartitionCounter(cache, key, partCntr);
+
+ waitAndCheckEvent(evtsQueues, partCntr, affinity(cache), key, null, oldVal);
+
+ expData.remove(key);
+
+ break;
+ }
+
+ case 6: {
+ cache.putIfAbsent(key, newVal);
+
+ if (tx != null)
+ tx.commit();
+
+ if (oldVal == null) {
+ updatePartitionCounter(cache, key, partCntr);
+
+ waitAndCheckEvent(evtsQueues, partCntr, affinity(cache), key, newVal, null);
+
+ expData.put(key, newVal);
+ }
+ else
+ checkNoEvent(evtsQueues);
+
+ break;
+ }
+
+ case 7: {
+ cache.getAndPutIfAbsent(key, newVal);
+
+ if (tx != null)
+ tx.commit();
+
+ if (oldVal == null) {
+ updatePartitionCounter(cache, key, partCntr);
+
+ waitAndCheckEvent(evtsQueues, partCntr, affinity(cache), key, newVal, null);
+
+ expData.put(key, newVal);
+ }
+ else
+ checkNoEvent(evtsQueues);
+
+ break;
+ }
+
+ case 8: {
+ cache.replace(key, newVal);
+
+ if (tx != null)
+ tx.commit();
+
+ if (oldVal != null) {
+ updatePartitionCounter(cache, key, partCntr);
+
+ waitAndCheckEvent(evtsQueues, partCntr, affinity(cache), key, newVal, oldVal);
+
+ expData.put(key, newVal);
+ }
+ else
+ checkNoEvent(evtsQueues);
+
+ break;
+ }
+
+ case 9: {
+ cache.getAndReplace(key, newVal);
+
+ if (tx != null)
+ tx.commit();
+
+ if (oldVal != null) {
+ updatePartitionCounter(cache, key, partCntr);
+
+ waitAndCheckEvent(evtsQueues, partCntr, affinity(cache), key, newVal, oldVal);
+
+ expData.put(key, newVal);
+ }
+ else
+ checkNoEvent(evtsQueues);
+
+ break;
+ }
+
+ case 10: {
+ if (oldVal != null) {
+ Object replaceVal = value(rnd);
+
+ boolean success = replaceVal.equals(oldVal);
+
+ if (success) {
+ cache.replace(key, replaceVal, newVal);
+
+ if (tx != null)
+ tx.commit();
+
+ updatePartitionCounter(cache, key, partCntr);
+
+ waitAndCheckEvent(evtsQueues, partCntr, affinity(cache), key, newVal, oldVal);
+
+ expData.put(key, newVal);
+ }
+ else {
+ cache.replace(key, replaceVal, newVal);
+
+ if (tx != null)
+ tx.commit();
+
+ checkNoEvent(evtsQueues);
+ }
+ }
+ else {
+ cache.replace(key, value(rnd), newVal);
+
+ if (tx != null)
+ tx.commit();
+
+ checkNoEvent(evtsQueues);
+ }
+
+ break;
+ }
+
+ case 11: {
+ SortedMap<Object, Object> vals = new TreeMap<>();
+
+ while (vals.size() < KEYS / 5)
+ vals.put(new QueryTestKey(rnd.nextInt(KEYS)), value(rnd));
+
+ cache.putAll(vals);
+
+ if (tx != null)
+ tx.commit();
+
+ for (Map.Entry<Object, Object> e : vals.entrySet())
+ updatePartitionCounter(cache, e.getKey(), partCntr);
+
+ waitAndCheckEvent(evtsQueues, partCntr, affinity(cache), vals, expData);
+
+ expData.putAll(vals);
+
+ break;
+ }
+
+ case 12: {
+ SortedMap<Object, Object> vals = new TreeMap<>();
+
+ while (vals.size() < KEYS / 5)
+ vals.put(new QueryTestKey(rnd.nextInt(KEYS)), newVal);
+
+ cache.invokeAll(vals.keySet(), new EntrySetValueProcessor(newVal, rnd.nextBoolean()));
+
+ if (tx != null)
+ tx.commit();
+
+ for (Map.Entry<Object, Object> e : vals.entrySet())
+ updatePartitionCounter(cache, e.getKey(), partCntr);
+
+ waitAndCheckEvent(evtsQueues, partCntr, affinity(cache), vals, expData);
+
+ for (Object o : vals.keySet())
+ expData.put(o, newVal);
+
+ break;
+ }
+
+ default:
+ fail("Op:" + op);
+ }
+ } finally {
+ if (tx != null)
+ tx.close();
+ }
+ }
+
+ /**
+ * @param evtsQueues Queue.
+ * @param partCntrs Counters.
+ * @param aff Affinity.
+ * @param vals Values.
+ * @param expData Expected data.
+ */
+ private void waitAndCheckEvent(List<BlockingQueue<CacheEntryEvent<?, ?>>> evtsQueues,
+ Map<Integer, Long> partCntrs,
+ Affinity<Object> aff,
+ SortedMap<Object, Object> vals,
+ Map<Object, Object> expData)
+ throws Exception {
+ for (BlockingQueue<CacheEntryEvent<?, ?>> evtsQueue : evtsQueues) {
+ Map<Object, CacheEntryEvent> rcvEvts = new HashMap<>();
+
+ for (int i = 0; i < vals.size(); i++) {
+ CacheEntryEvent<?, ?> evt = evtsQueue.poll(5, SECONDS);
+
+ rcvEvts.put(evt.getKey(), evt);
+ }
+
+ assertEquals(vals.size(), rcvEvts.size());
+
+ for (Map.Entry<Object, Object> e : vals.entrySet()) {
+ Object key = e.getKey();
+ Object val = e.getValue();
+ Object oldVal = expData.get(key);
+
+ if (val == null && oldVal == null) {
+ checkNoEvent(evtsQueues);
+
+ continue;
+ }
+
+ CacheEntryEvent evt = rcvEvts.get(key);
+
+ assertNotNull("Failed to wait for event [key=" + key + ", val=" + val + ", oldVal=" + oldVal + ']',
+ evt);
+ assertEquals(key, evt.getKey());
+ assertEquals(val, evt.getValue());
+ assertEquals(oldVal, evt.getOldValue());
+
+ long cntr = partCntrs.get(aff.partition(key));
+ CacheQueryEntryEvent qryEntryEvt = (CacheQueryEntryEvent)evt.unwrap(CacheQueryEntryEvent.class);
+
+ assertNotNull(cntr);
+ assertNotNull(qryEntryEvt);
+
+ assertEquals(cntr, qryEntryEvt.getPartitionUpdateCounter());
+ }
+ }
+ }
+
+ /**
+ * @param rnd {@link Random}.
+ * @return {@link TransactionIsolation}.
+ */
+ private TransactionIsolation txRandomIsolation(Random rnd) {
+ int val = rnd.nextInt(3);
+
+ if (val == 0)
+ return READ_COMMITTED;
+ else if (val == 1)
+ return REPEATABLE_READ;
+ else
+ return SERIALIZABLE;
+ }
+
+ /**
+ * @param rnd {@link Random}.
+ * @return {@link TransactionConcurrency}.
+ */
+ private TransactionConcurrency txRandomConcurrency(Random rnd) {
+ return rnd.nextBoolean() ? TransactionConcurrency.OPTIMISTIC : TransactionConcurrency.PESSIMISTIC;
+ }
+
+ /**
+ * @param cache Cache.
+ * @param key Key
+ * @param cntrs Partition counters.
+ */
+ private void updatePartitionCounter(IgniteCache<Object, Object> cache, Object key, Map<Integer, Long> cntrs) {
+ Affinity<Object> aff = cache.unwrap(Ignite.class).affinity(cache.getName());
+
+ int part = aff.partition(key);
+
+ Long partCntr = cntrs.get(part);
+
+ if (partCntr == null)
+ partCntr = 0L;
+
+ cntrs.put(part, ++partCntr);
+ }
+
+ /**
+ * @param rnd Random generator.
+ * @return Cache value.
+ */
+ private static Object value(Random rnd) {
+ return new QueryTestValue(rnd.nextInt(VALS));
+ }
+
+ /**
+ * @param evtsQueues Event queue.
+ * @param partCntrs Partition counters.
+ * @param aff Affinity function.
+ * @param key Key.
+ * @param val Value.
+ * @param oldVal Old value.
+ * @throws Exception If failed.
+ */
+ private void waitAndCheckEvent(List<BlockingQueue<CacheEntryEvent<?, ?>>> evtsQueues,
+ Map<Integer, Long> partCntrs,
+ Affinity<Object> aff,
+ Object key,
+ Object val,
+ Object oldVal)
+ throws Exception {
+ if (val == null && oldVal == null) {
+ checkNoEvent(evtsQueues);
+
+ return;
+ }
+
+ for (BlockingQueue<CacheEntryEvent<?, ?>> evtsQueue : evtsQueues) {
+ CacheEntryEvent<?, ?> evt = evtsQueue.poll(5, SECONDS);
+
+ assertNotNull("Failed to wait for event [key=" + key + ", val=" + val + ", oldVal=" + oldVal + ']', evt);
+ assertEquals(key, evt.getKey());
+ assertEquals(val, evt.getValue());
+ assertEquals(oldVal, evt.getOldValue());
+
+ long cntr = partCntrs.get(aff.partition(key));
+ CacheQueryEntryEvent qryEntryEvt = evt.unwrap(CacheQueryEntryEvent.class);
+
+ assertNotNull(cntr);
+ assertNotNull(qryEntryEvt);
+
+ assertEquals(cntr, qryEntryEvt.getPartitionUpdateCounter());
+ }
+ }
+
+ /**
+ * @param evtsQueues Event queue.
+ * @throws Exception If failed.
+ */
+ private void checkNoEvent(List<BlockingQueue<CacheEntryEvent<?, ?>>> evtsQueues) throws Exception {
+ for (BlockingQueue<CacheEntryEvent<?, ?>> evtsQueue : evtsQueues) {
+ CacheEntryEvent<?, ?> evt = evtsQueue.poll(50, MILLISECONDS);
+
+ assertNull(evt);
+ }
+ }
+
+ /**
+ *
+ * @param cacheMode Cache mode.
+ * @param backups Number of backups.
+ * @param atomicityMode Cache atomicity mode.
+ * @param memoryMode Cache memory mode.
+ * @param store If {@code true} configures dummy cache store.
+ * @return Cache configuration.
+ */
+ private CacheConfiguration<Object, Object> cacheConfiguration(
+ CacheMode cacheMode,
+ int backups,
+ CacheAtomicityMode atomicityMode,
+ CacheMemoryMode memoryMode,
+ boolean store) {
+ CacheConfiguration<Object, Object> ccfg = new CacheConfiguration<>();
+
+ ccfg.setAtomicityMode(atomicityMode);
+ ccfg.setCacheMode(cacheMode);
+ ccfg.setMemoryMode(memoryMode);
+ ccfg.setWriteSynchronizationMode(FULL_SYNC);
+ ccfg.setAtomicWriteOrderMode(PRIMARY);
+
+ if (cacheMode == PARTITIONED)
+ ccfg.setBackups(backups);
+
+ if (store) {
+ ccfg.setCacheStoreFactory(new TestStoreFactory());
+ ccfg.setReadThrough(true);
+ ccfg.setWriteThrough(true);
+ }
+
+ return ccfg;
+ }
+
+ /**
+ *
+ */
+ private static class TestStoreFactory implements Factory<CacheStore<Object, Object>> {
+ /** {@inheritDoc} */
+ @SuppressWarnings("unchecked")
+ @Override public CacheStore<Object, Object> create() {
+ return new CacheStoreAdapter() {
+ @Override public Object load(Object key) throws CacheLoaderException {
+ return null;
+ }
+
+ @Override public void write(Cache.Entry entry) throws CacheWriterException {
+ // No-op.
+ }
+
+ @Override public void delete(Object key) throws CacheWriterException {
+ // No-op.
+ }
+ };
+ }
+ }
+
+ /**
+ *
+ */
+ static class QueryTestKey implements Serializable, Comparable {
+ /** */
+ private final Integer key;
+
+ /**
+ * @param key Key.
+ */
+ public QueryTestKey(Integer key) {
+ this.key = key;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean equals(Object o) {
+ if (this == o)
+ return true;
+
+ if (o == null || getClass() != o.getClass())
+ return false;
+
+ QueryTestKey that = (QueryTestKey)o;
+
+ return key.equals(that.key);
+ }
+
+ /** {@inheritDoc} */
+ @Override public int hashCode() {
+ return key.hashCode();
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(QueryTestKey.class, this);
+ }
+
+ /** {@inheritDoc} */
+ @Override public int compareTo(Object o) {
+ return key - ((QueryTestKey)o).key;
+ }
+ }
+
+ /**
+ *
+ */
+ static class QueryTestValue implements Serializable {
+ /** */
+ private final Integer val1;
+
+ /** */
+ private final String val2;
+
+ /**
+ * @param val Value.
+ */
+ public QueryTestValue(Integer val) {
+ this.val1 = val;
+ this.val2 = String.valueOf(val);
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean equals(Object o) {
+ if (this == o)
+ return true;
+
+ if (o == null || getClass() != o.getClass())
+ return false;
+
+ QueryTestValue that = (QueryTestValue) o;
+
+ return val1.equals(that.val1) && val2.equals(that.val2);
+ }
+
+ /** {@inheritDoc} */
+ @Override public int hashCode() {
+ int res = val1.hashCode();
+
+ res = 31 * res + val2.hashCode();
+
+ return res;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(QueryTestValue.class, this);
+ }
+ }
+
+ /**
+ *
+ */
+ protected static class EntrySetValueProcessor implements EntryProcessor<Object, Object, Object> {
+ /** */
+ private Object val;
+
+ /** */
+ private boolean retOld;
+
+ /**
+ * @param val Value to set.
+ * @param retOld Return old value flag.
+ */
+ public EntrySetValueProcessor(Object val, boolean retOld) {
+ this.val = val;
+ this.retOld = retOld;
+ }
+
+ /** {@inheritDoc} */
+ @Override public Object process(MutableEntry<Object, Object> e, Object... args) {
+ Object old = retOld ? e.getValue() : null;
+
+ if (val != null)
+ e.setValue(val);
+ else
+ e.remove();
+
+ return old;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(EntrySetValueProcessor.class, this);
+ }
+ }
+
+ /**
+ *
+ */
+ protected enum ContinuousDeploy {
+ CLIENT, SERVER, ALL
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/4de124c4/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java
index 4dfe741..4136d21 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java
@@ -71,12 +71,17 @@ import org.apache.ignite.internal.processors.cache.local.IgniteCacheLocalAtomicQ
import org.apache.ignite.internal.processors.cache.local.IgniteCacheLocalFieldsQuerySelfTest;
import org.apache.ignite.internal.processors.cache.local.IgniteCacheLocalQuerySelfTest;
import org.apache.ignite.internal.processors.cache.query.GridCacheSwapScanQuerySelfTest;
+import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryCounterPartitionedAtomicTest;
+import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryCounterPartitionedTxTest;
+import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryCounterReplicatedAtomicTest;
+import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryCounterReplicatedTxTest;
import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousBatchAckTest;
import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousBatchForceServerModeAckTest;
import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryFailoverAtomicPrimaryWriteOrderSelfTest;
import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryFailoverAtomicReplicatedSelfTest;
import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryFailoverTxReplicatedSelfTest;
import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryFailoverTxSelfTest;
+import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryRandomOperationsTest;
import org.apache.ignite.internal.processors.cache.query.continuous.GridCacheContinuousQueryAtomicNearEnabledSelfTest;
import org.apache.ignite.internal.processors.cache.query.continuous.GridCacheContinuousQueryAtomicP2PDisabledSelfTest;
import org.apache.ignite.internal.processors.cache.query.continuous.GridCacheContinuousQueryAtomicSelfTest;
@@ -201,6 +206,11 @@ public class IgniteCacheQuerySelfTestSuite extends TestSuite {
suite.addTestSuite(CacheContinuousQueryFailoverAtomicReplicatedSelfTest.class);
suite.addTestSuite(CacheContinuousQueryFailoverTxSelfTest.class);
suite.addTestSuite(CacheContinuousQueryFailoverTxReplicatedSelfTest.class);
+ suite.addTestSuite(CacheContinuousQueryCounterPartitionedAtomicTest.class);
+ suite.addTestSuite(CacheContinuousQueryCounterPartitionedTxTest.class);
+ suite.addTestSuite(CacheContinuousQueryCounterReplicatedAtomicTest.class);
+ suite.addTestSuite(CacheContinuousQueryCounterReplicatedTxTest.class);
+ suite.addTestSuite(CacheContinuousQueryRandomOperationsTest.class);
suite.addTestSuite(CacheContinuousBatchAckTest.class);
suite.addTestSuite(CacheContinuousBatchForceServerModeAckTest.class);