You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by nt...@apache.org on 2016/02/10 19:18:49 UTC
[1/2] ignite git commit: IGNITE-2515 Implemented that unwrap(Long)
returns update counter. Added tests.
Repository: ignite
Updated Branches:
refs/heads/ignite-2515 [created] 301abcfa6
IGNITE-2515 Implemented that unwrap(Long) returns update counter. Added tests.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/4252c5db
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/4252c5db
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/4252c5db
Branch: refs/heads/ignite-2515
Commit: 4252c5db69161dcff56b1b0690b3c992bbb904ba
Parents: d08a779
Author: nikolay_tikhonov <nt...@gridgain.com>
Authored: Tue Feb 9 16:12:16 2016 +0300
Committer: nikolay_tikhonov <nt...@gridgain.com>
Committed: Wed Feb 10 18:22:12 2016 +0300
----------------------------------------------------------------------
.../continuous/CacheContinuousQueryEvent.java | 10 +-
...CacheContinuousQueryCounterAbstractTest.java | 614 +++++++++++++++++++
...inuousQueryCounterPartitionedAtomicTest.java | 41 ++
...ContinuousQueryCounterPartitionedTxTest.java | 41 ++
...tinuousQueryCounterReplicatedAtomicTest.java | 41 ++
...eContinuousQueryCounterReplicatedTxTest.java | 41 ++
.../IgniteCacheQuerySelfTestSuite.java | 8 +
7 files changed, 791 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/4252c5db/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEvent.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEvent.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEvent.java
index d26e666..d1c7c28 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEvent.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEvent.java
@@ -57,8 +57,7 @@ class CacheContinuousQueryEvent<K, V> extends CacheEntryEvent<K, V> {
}
/** {@inheritDoc} */
- @Override
- public K getKey() {
+ @Override public K getKey() {
return (K)cctx.cacheObjectContext().unwrapBinaryIfNeeded(e.key(), e.isKeepBinary(), false);
}
@@ -68,8 +67,7 @@ class CacheContinuousQueryEvent<K, V> extends CacheEntryEvent<K, V> {
}
/** {@inheritDoc} */
- @Override
- public V getOldValue() {
+ @Override public V getOldValue() {
return (V)cctx.cacheObjectContext().unwrapBinaryIfNeeded(e.oldValue(), e.isKeepBinary(), false);
}
@@ -80,8 +78,10 @@ class CacheContinuousQueryEvent<K, V> extends CacheEntryEvent<K, V> {
/** {@inheritDoc} */
@Override public <T> T unwrap(Class<T> cls) {
- if(cls.isAssignableFrom(getClass()))
+ if (cls.isAssignableFrom(getClass()))
return cls.cast(this);
+ else if (cls == Long.class)
+ return cls.cast(e.updateCounter());
throw new IllegalArgumentException("Unwrapping to class is not supported: " + cls);
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/4252c5db/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryCounterAbstractTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryCounterAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryCounterAbstractTest.java
new file mode 100644
index 0000000..3a10e1c
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryCounterAbstractTest.java
@@ -0,0 +1,614 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.query.continuous;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicInteger;
+import javax.cache.Cache;
+import javax.cache.configuration.Factory;
+import javax.cache.event.CacheEntryEvent;
+import javax.cache.event.CacheEntryUpdatedListener;
+import javax.cache.integration.CacheWriterException;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.cache.CacheEntryEventSerializableFilter;
+import org.apache.ignite.cache.CacheMode;
+import org.apache.ignite.cache.query.ContinuousQuery;
+import org.apache.ignite.cache.query.QueryCursor;
+import org.apache.ignite.cache.query.ScanQuery;
+import org.apache.ignite.cache.store.CacheStore;
+import org.apache.ignite.cache.store.CacheStoreAdapter;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.configuration.NearCacheConfiguration;
+import org.apache.ignite.internal.processors.datastructures.GridCacheInternalKeyImpl;
+import org.apache.ignite.internal.util.typedef.P2;
+import org.apache.ignite.internal.util.typedef.PA;
+import org.apache.ignite.internal.util.typedef.T2;
+import org.apache.ignite.lang.IgniteBiInClosure;
+import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
+import org.jsr166.ConcurrentHashMap8;
+
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC;
+import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
+import static org.apache.ignite.cache.CacheMode.LOCAL;
+import static org.apache.ignite.cache.CacheMode.PARTITIONED;
+import static org.apache.ignite.cache.CacheRebalanceMode.ASYNC;
+import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
+
+/**
+ * Continuous queries counter tests.
+ */
+public abstract class CacheContinuousQueryCounterAbstractTest extends GridCommonAbstractTest
+ implements Serializable {
+ /** */
+ protected static final String CACHE_NAME = "test_cache";
+
+ /** IP finder. */
+ private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true);
+
+ /** Latch timeout. */
+ protected static final long LATCH_TIMEOUT = 5000;
+
+ /** */
+ private static final String NO_CACHE_GRID_NAME = "noCacheGrid";
+
+ /** {@inheritDoc} */
+ @SuppressWarnings("unchecked")
+ @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+ cfg.setPeerClassLoadingEnabled(peerClassLoadingEnabled());
+
+ if (gridName.equals(NO_CACHE_GRID_NAME))
+ cfg.setClientMode(true);
+
+ TcpDiscoverySpi disco = new TcpDiscoverySpi();
+
+ disco.setIpFinder(IP_FINDER);
+
+ cfg.setDiscoverySpi(disco);
+
+ ((TcpCommunicationSpi)cfg.getCommunicationSpi()).setSharedMemoryPort(-1);
+
+ return cfg;
+ }
+
+ /**
+ * @return Cache configuration.
+ */
+ @NotNull private CacheConfiguration cacheConfiguration() {
+ CacheConfiguration cacheCfg = defaultCacheConfiguration();
+
+ cacheCfg.setName(CACHE_NAME);
+ cacheCfg.setCacheMode(cacheMode());
+ cacheCfg.setAtomicityMode(atomicityMode());
+ cacheCfg.setNearConfiguration(nearConfiguration());
+ cacheCfg.setRebalanceMode(ASYNC);
+ cacheCfg.setWriteSynchronizationMode(FULL_SYNC);
+ cacheCfg.setCacheStoreFactory(new StoreFactory());
+ cacheCfg.setReadThrough(true);
+ cacheCfg.setWriteThrough(true);
+ cacheCfg.setLoadPreviousValue(true);
+
+ return cacheCfg;
+ }
+
+ /**
+ * @return Peer class loading enabled flag.
+ */
+ protected boolean peerClassLoadingEnabled() {
+ return true;
+ }
+
+ /**
+ * @return Distribution.
+ */
+ protected NearCacheConfiguration nearConfiguration() {
+ return new NearCacheConfiguration();
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void beforeTestsStarted() throws Exception {
+ startGridsMultiThreaded(gridCount());
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTestsStopped() throws Exception {
+ stopAllGrids();
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void beforeTest() throws Exception {
+ GridTestUtils.waitForCondition(new PA() {
+ @Override public boolean apply() {
+ for (int i = 0; i < gridCount(); i++) {
+ if (grid(i).cluster().nodes().size() != gridCount())
+ return false;
+ }
+
+ return true;
+ }
+ }, 3000);
+
+ for (int i = 0; i < gridCount(); i++)
+ grid(i).destroyCache(CACHE_NAME);
+
+ for (int i = 0; i < gridCount(); i++)
+ grid(i).getOrCreateCache(cacheConfiguration());
+ }
+
+ /**
+ * @return Cache mode.
+ */
+ protected abstract CacheMode cacheMode();
+
+ /**
+ * @return Atomicity mode.
+ */
+ protected CacheAtomicityMode atomicityMode() {
+ return TRANSACTIONAL;
+ }
+
+ /**
+ * @return Grids count.
+ */
+ protected abstract int gridCount();
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testAllEntries() throws Exception {
+ IgniteCache<Integer, Integer> cache = grid(0).cache(CACHE_NAME);
+
+ ContinuousQuery<Integer, Integer> qry = new ContinuousQuery<>();
+
+ final Map<Integer, List<T2<Integer, Long>>> map = new HashMap<>();
+ final CountDownLatch latch = new CountDownLatch(5);
+
+ qry.setLocalListener(new CacheEntryUpdatedListener<Integer, Integer>() {
+ @Override public void onUpdated(Iterable<CacheEntryEvent<? extends Integer, ? extends Integer>> evts) {
+ for (CacheEntryEvent<? extends Integer, ? extends Integer> e : evts) {
+ synchronized (map) {
+ List<T2<Integer, Long>> vals = map.get(e.getKey());
+
+ if (vals == null) {
+ vals = new ArrayList<>();
+
+ map.put(e.getKey(), vals);
+ }
+
+ vals.add(new T2<>(e.getValue(), e.unwrap(Long.class)));
+ }
+
+ latch.countDown();
+ }
+ }
+ });
+
+ try (QueryCursor<Cache.Entry<Integer, Integer>> ignored = cache.query(qry)) {
+ cache.put(1, 1);
+ cache.put(2, 2);
+ cache.put(3, 3);
+
+ cache.remove(2);
+
+ cache.put(1, 10);
+
+ assert latch.await(LATCH_TIMEOUT, MILLISECONDS);
+
+ assertEquals(3, map.size());
+
+ List<T2<Integer, Long>> vals = map.get(1);
+
+ assertNotNull(vals);
+ assertEquals(2, vals.size());
+ assertEquals(1, (int)vals.get(0).get1());
+ assertEquals(1L, (long)vals.get(0).get2());
+ assertEquals(10, (int)vals.get(1).get1());
+ assertEquals(2L, (long)vals.get(1).get2());
+
+ vals = map.get(2);
+
+ assertNotNull(vals);
+ assertEquals(2, vals.size());
+ assertEquals(2, (int)vals.get(0).get1());
+ assertEquals(1L, (long)vals.get(0).get2());
+ assertNull(vals.get(1).get1());
+
+ vals = map.get(3);
+
+ assertNotNull(vals);
+ assertEquals(1, vals.size());
+ assertEquals(3, (int)vals.get(0).get1());
+ assertEquals(1L, (long)vals.get(0).get2());
+ }
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testTwoQueryListener() throws Exception {
+ if (cacheMode() == LOCAL)
+ return;
+
+ final IgniteCache<Integer, Integer> cache = grid(0).cache(CACHE_NAME);
+ final IgniteCache<Integer, Integer> cache1 = grid(1).cache(CACHE_NAME);
+
+ final AtomicInteger cntr = new AtomicInteger(0);
+ final AtomicInteger cntr1 = new AtomicInteger(0);
+
+ final ContinuousQuery<Integer, Integer> qry1 = new ContinuousQuery<>();
+ final ContinuousQuery<Integer, Integer> qry2 = new ContinuousQuery<>();
+
+ final Map<Integer, List<T2<Integer, Long>>> map1 = new HashMap<>();
+ final Map<Integer, List<T2<Integer, Long>>> map2 = new HashMap<>();
+
+ qry1.setLocalListener(new CacheEntryUpdatedListener<Integer, Integer>() {
+ @Override public void onUpdated(Iterable<CacheEntryEvent<? extends Integer, ? extends Integer>> evts) {
+ for (CacheEntryEvent<? extends Integer, ? extends Integer> e : evts) {
+ cntr.incrementAndGet();
+
+ synchronized (map1) {
+ List<T2<Integer, Long>> vals = map1.get(e.getKey());
+
+ if (vals == null) {
+ vals = new ArrayList<>();
+
+ map1.put(e.getKey(), vals);
+ }
+
+ vals.add(new T2<>(e.getValue(), e.unwrap(Long.class)));
+ }
+ }
+ }
+ });
+
+ qry2.setLocalListener(new CacheEntryUpdatedListener<Integer, Integer>() {
+ @Override public void onUpdated(Iterable<CacheEntryEvent<? extends Integer, ? extends Integer>> evts) {
+ for (CacheEntryEvent<? extends Integer, ? extends Integer> e : evts) {
+ cntr1.incrementAndGet();
+
+ synchronized (map2) {
+ List<T2<Integer, Long>> vals = map2.get(e.getKey());
+
+ if (vals == null) {
+ vals = new ArrayList<>();
+
+ map2.put(e.getKey(), vals);
+ }
+
+ vals.add(new T2<>(e.getValue(), e.unwrap(Long.class)));
+ }
+ }
+ }
+ });
+
+ try (QueryCursor<Cache.Entry<Integer, Integer>> query2 = cache1.query(qry2);
+ QueryCursor<Cache.Entry<Integer, Integer>> query1 = cache.query(qry1)) {
+ for (int i = 0; i < gridCount(); i++) {
+ IgniteCache<Object, Object> cache0 = grid(i).cache(CACHE_NAME);
+
+ cache0.put(1, 1);
+ cache0.put(2, 2);
+ cache0.put(3, 3);
+
+ cache0.remove(1);
+ cache0.remove(2);
+ cache0.remove(3);
+
+ final int iter = i + 1;
+
+ assert GridTestUtils.waitForCondition(new PA() {
+ @Override public boolean apply() {
+ return iter * 6 /* count operation */ * 2 /* count continues queries*/
+ == (cntr.get() + cntr1.get());
+ }
+ }, 5000L);
+
+ checkEvents(map1, i);
+
+ map1.clear();
+
+ checkEvents(map2, i);
+
+ map2.clear();
+ }
+ }
+ }
+
+ /**
+ * @param evnts Events.
+ * @param iter Iteration.
+ */
+ private void checkEvents(Map<Integer, List<T2<Integer, Long>>> evnts, long iter) {
+ List<T2<Integer, Long>> val = evnts.get(1);
+
+ assertEquals(val.size(), 2);
+
+ // Check put 1
+ assertEquals(iter * 2 + 1, (long)val.get(0).get2());
+ assertEquals(1L, (long)val.get(0).get1());
+
+ // Check remove 1
+ assertEquals(iter * 2 + 2, (long)val.get(1).get2());
+ assertNull(val.get(1).get1());
+
+ val = evnts.get(2);
+
+ assertEquals(val.size(), 2);
+
+ // Check put 2
+ assertEquals(iter * 2 + 1, (long)val.get(0).get2());
+ assertEquals(2L, (long)val.get(0).get1());
+
+ // Check remove 2
+ assertEquals(iter * 2 + 2, (long)val.get(1).get2());
+ assertNull(val.get(1).get1());
+
+ val = evnts.get(3);
+
+ assertEquals(val.size(), 2);
+
+ // Check put 3
+ assertEquals(iter * 2 + 1, (long)val.get(0).get2());
+ assertEquals(3L, (long)val.get(0).get1());
+
+ // Check remove 3
+ assertEquals(iter * 2 + 2, (long)val.get(1).get2());
+ assertNull(val.get(1).get1());
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testRestartQuery() throws Exception {
+ IgniteCache<Integer, Integer> cache = grid(0).cache(CACHE_NAME);
+
+ final int keyCnt = 300;
+
+ final int updateKey = 1;
+
+ for (int i = 0; i < keyCnt; i++)
+ cache.put(updateKey, i);
+
+ for (int i = 0; i < 10; i++) {
+ if (i % 2 == 0) {
+ final AtomicInteger cntr = new AtomicInteger(0);
+
+ ContinuousQuery<Integer, Integer> qry = new ContinuousQuery<>();
+
+ final List<T2<Integer, Long>> vals = new ArrayList<>();
+
+ qry.setLocalListener(new CacheEntryUpdatedListener<Integer, Integer>() {
+ @Override public void onUpdated(
+ Iterable<CacheEntryEvent<? extends Integer, ? extends Integer>> evts) {
+ for (CacheEntryEvent<? extends Integer, ? extends Integer> e : evts) {
+ cntr.incrementAndGet();
+
+ synchronized (vals) {
+ vals.add(new T2<>(e.getValue(), e.unwrap(Long.class)));
+ }
+ }
+ }
+ });
+
+ try (QueryCursor<Cache.Entry<Integer, Integer>> ignore = cache.query(qry)) {
+ for (int key = 0; key < keyCnt; key++)
+ cache.put(updateKey, cache.get(updateKey) + 1);
+
+ assert GridTestUtils.waitForCondition(new PA() {
+ @Override public boolean apply() {
+ return cntr.get() == keyCnt;
+ }
+ }, 2000L);
+
+ for (T2<Integer, Long> val : vals) {
+ assertEquals(vals.size(), keyCnt);
+
+ assertEquals((long)val.get1() + 1, (long)val.get2());
+ }
+ }
+ }
+ else {
+ for (int key = 0; key < keyCnt; key++)
+ cache.put(updateKey, cache.get(updateKey) + 1);
+ }
+ }
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testEntriesByFilter() throws Exception {
+ IgniteCache<Integer, Integer> cache = grid(0).cache(CACHE_NAME);
+
+ ContinuousQuery<Integer, Integer> qry = new ContinuousQuery<>();
+
+ final Map<Integer, List<T2<Integer, Long>>> map = new HashMap<>();
+ final CountDownLatch latch = new CountDownLatch(8);
+
+ qry.setLocalListener(new CacheEntryUpdatedListener<Integer, Integer>() {
+ @Override public void onUpdated(Iterable<CacheEntryEvent<? extends Integer, ? extends Integer>> evts) {
+ for (CacheEntryEvent<? extends Integer, ? extends Integer> e : evts) {
+ synchronized (map) {
+ List<T2<Integer, Long>> vals = map.get(e.getKey());
+
+ if (vals == null) {
+ vals = new ArrayList<>();
+
+ map.put(e.getKey(), vals);
+ }
+
+ vals.add(new T2<>(e.getValue(), e.unwrap(Long.class)));
+ }
+
+ latch.countDown();
+ }
+ }
+ });
+
+ qry.setRemoteFilter(new CacheEntryEventSerializableFilter<Integer,Integer>() {
+ @Override public boolean evaluate(CacheEntryEvent<? extends Integer,? extends Integer> evt) {
+ return evt.getValue() % 2 == 0;
+ }
+ });
+
+ try (QueryCursor<Cache.Entry<Integer, Integer>> ignored = cache.query(qry)) {
+ cache.put(1, 1);
+ cache.put(1, 2);
+ cache.put(1, 3);
+ cache.put(1, 4);
+
+ cache.put(2, 1);
+ cache.put(2, 2);
+ cache.put(2, 3);
+ cache.put(2, 4);
+
+ cache.remove(1);
+ cache.remove(2);
+
+ cache.put(1, 10);
+ cache.put(2, 40);
+
+ assert latch.await(LATCH_TIMEOUT, MILLISECONDS);
+
+ assertEquals(2, map.size());
+
+ List<T2<Integer, Long>> vals = map.get(1);
+
+ assertNotNull(vals);
+ assertEquals(4, vals.size());
+
+ assertEquals((int)vals.get(0).get1(), 2);
+ assertEquals((long)vals.get(0).get1(), (long)vals.get(0).get2());
+
+ assertEquals((int)vals.get(1).get1(), 4);
+ assertEquals((long)vals.get(1).get1(), (long)vals.get(1).get2());
+
+ assertNull(vals.get(2).get1());
+ assertEquals(5, (long)vals.get(2).get2());
+
+ assertEquals((int)vals.get(3).get1(), 10);
+ assertEquals(6, (long)vals.get(3).get2());
+
+ vals = map.get(2);
+
+ assertNotNull(vals);
+ assertEquals(4, vals.size());
+
+ assertEquals((int)vals.get(0).get1(), 2);
+ assertEquals((long)vals.get(0).get1(), (long)vals.get(0).get2());
+
+ assertEquals((int)vals.get(1).get1(), 4);
+ assertEquals((long)vals.get(1).get1(), (long)vals.get(1).get2());
+
+ assertNull(vals.get(2).get1());
+ assertEquals(5, (long)vals.get(2).get2());
+
+ assertEquals((int)vals.get(3).get1(), 40);
+ assertEquals(6, (long)vals.get(3).get2());
+ }
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testLoadCache() throws Exception {
+ IgniteCache<Integer, Integer> cache = grid(0).cache(CACHE_NAME);
+
+ ContinuousQuery<Integer, Integer> qry = new ContinuousQuery<>();
+
+ final Map<Integer, T2<Integer, Long>> map = new ConcurrentHashMap8<>();
+ final CountDownLatch latch = new CountDownLatch(10);
+
+ qry.setLocalListener(new CacheEntryUpdatedListener<Integer, Integer>() {
+ @Override public void onUpdated(Iterable<CacheEntryEvent<? extends Integer, ? extends Integer>> evts) {
+ for (CacheEntryEvent<? extends Integer, ? extends Integer> e : evts) {
+ map.put(e.getKey(), new T2<>(e.getValue(), e.unwrap(Long.class)));
+
+ latch.countDown();
+ }
+ }
+ });
+
+ try (QueryCursor<Cache.Entry<Integer, Integer>> ignored = cache.query(qry)) {
+ cache.loadCache(null, 0);
+
+ assert latch.await(LATCH_TIMEOUT, MILLISECONDS) : "Count: " + latch.getCount();
+
+ assertEquals(10, map.size());
+
+ for (int i = 0; i < 10; i++) {
+ assertEquals(i, (int)map.get(i).get1());
+ assertEquals((long)1, (long)map.get(i).get2());
+ }
+ }
+ }
+
+ /**
+ *
+ */
+ private static class StoreFactory implements Factory<CacheStore> {
+ @Override public CacheStore create() {
+ return new TestStore();
+ }
+ }
+
+ /**
+ * Store.
+ */
+ private static class TestStore extends CacheStoreAdapter<Object, Object> {
+ /** {@inheritDoc} */
+ @Override public void loadCache(IgniteBiInClosure<Object, Object> clo, Object... args) {
+ for (int i = 0; i < 10; i++)
+ clo.apply(i, i);
+ }
+
+ /** {@inheritDoc} */
+ @Nullable @Override public Object load(Object key) {
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void write(Cache.Entry<?, ?> entry) throws CacheWriterException {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
+ @Override public void delete(Object key) throws CacheWriterException {
+ // No-op.
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/4252c5db/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryCounterPartitionedAtomicTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryCounterPartitionedAtomicTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryCounterPartitionedAtomicTest.java
new file mode 100644
index 0000000..7b97928
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryCounterPartitionedAtomicTest.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.query.continuous;
+
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.cache.CacheMode;
+
+/**
+ *
+ */
+public class CacheContinuousQueryCounterPartitionedAtomicTest extends CacheContinuousQueryCounterAbstractTest {
+ /** {@inheritDoc} */
+ @Override protected CacheMode cacheMode() {
+ return CacheMode.PARTITIONED;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected CacheAtomicityMode atomicityMode() {
+ return CacheAtomicityMode.ATOMIC;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected int gridCount() {
+ return 2;
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/4252c5db/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryCounterPartitionedTxTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryCounterPartitionedTxTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryCounterPartitionedTxTest.java
new file mode 100644
index 0000000..aa42832
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryCounterPartitionedTxTest.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.query.continuous;
+
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.cache.CacheMode;
+
+/**
+ *
+ */
+public class CacheContinuousQueryCounterPartitionedTxTest extends CacheContinuousQueryCounterAbstractTest {
+ /** {@inheritDoc} */
+ @Override protected CacheMode cacheMode() {
+ return CacheMode.PARTITIONED;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected CacheAtomicityMode atomicityMode() {
+ return CacheAtomicityMode.TRANSACTIONAL;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected int gridCount() {
+ return 2;
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/4252c5db/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryCounterReplicatedAtomicTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryCounterReplicatedAtomicTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryCounterReplicatedAtomicTest.java
new file mode 100644
index 0000000..afa7a22
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryCounterReplicatedAtomicTest.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.query.continuous;
+
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.cache.CacheMode;
+
+/**
+ *
+ */
+public class CacheContinuousQueryCounterReplicatedAtomicTest extends CacheContinuousQueryCounterAbstractTest {
+ /** {@inheritDoc} */
+ @Override protected CacheMode cacheMode() {
+ return CacheMode.REPLICATED;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected CacheAtomicityMode atomicityMode() {
+ return CacheAtomicityMode.ATOMIC;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected int gridCount() {
+ return 2;
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/4252c5db/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryCounterReplicatedTxTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryCounterReplicatedTxTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryCounterReplicatedTxTest.java
new file mode 100644
index 0000000..4ee12de
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryCounterReplicatedTxTest.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.query.continuous;
+
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.cache.CacheMode;
+
+/**
+ *
+ */
+public class CacheContinuousQueryCounterReplicatedTxTest extends CacheContinuousQueryCounterAbstractTest {
+ /** {@inheritDoc} */
+ @Override protected CacheMode cacheMode() {
+ return CacheMode.PARTITIONED;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected CacheAtomicityMode atomicityMode() {
+ return CacheAtomicityMode.TRANSACTIONAL;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected int gridCount() {
+ return 2;
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/4252c5db/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java
index 3cd4579..e161b3a 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java
@@ -70,6 +70,10 @@ import org.apache.ignite.internal.processors.cache.local.IgniteCacheLocalAtomicQ
import org.apache.ignite.internal.processors.cache.local.IgniteCacheLocalFieldsQuerySelfTest;
import org.apache.ignite.internal.processors.cache.local.IgniteCacheLocalQuerySelfTest;
import org.apache.ignite.internal.processors.cache.query.GridCacheSwapScanQuerySelfTest;
+import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryCounterPartitionedAtomicTest;
+import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryCounterPartitionedTxTest;
+import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryCounterReplicatedAtomicTest;
+import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryCounterReplicatedTxTest;
import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryFailoverAtomicPrimaryWriteOrderOffheapTieredTest;
import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryFailoverAtomicPrimaryWriteOrderSelfTest;
import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryFailoverAtomicReplicatedSelfTest;
@@ -206,6 +210,10 @@ public class IgniteCacheQuerySelfTestSuite extends TestSuite {
suite.addTestSuite(CacheContinuousQueryFailoverAtomicReplicatedSelfTest.class);
suite.addTestSuite(CacheContinuousQueryFailoverTxSelfTest.class);
suite.addTestSuite(CacheContinuousQueryFailoverTxReplicatedSelfTest.class);
+ suite.addTestSuite(CacheContinuousQueryCounterPartitionedAtomicTest.class);
+ suite.addTestSuite(CacheContinuousQueryCounterPartitionedTxTest.class);
+ suite.addTestSuite(CacheContinuousQueryCounterReplicatedAtomicTest.class);
+ suite.addTestSuite(CacheContinuousQueryCounterReplicatedTxTest.class);
suite.addTestSuite(CacheContinuousQueryFailoverAtomicPrimaryWriteOrderOffheapTieredTest.class);
suite.addTestSuite(CacheContinuousQueryFailoverTxOffheapTieredTest.class);
suite.addTestSuite(CacheContinuousQueryRandomOperationsTest.class);
[2/2] ignite git commit: IGNITE-2515 Improvement tests.
Posted by nt...@apache.org.
IGNITE-2515 Improvement tests.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/301abcfa
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/301abcfa
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/301abcfa
Branch: refs/heads/ignite-2515
Commit: 301abcfa63290ac27407979c33849663f14f5d9b
Parents: 4252c5d
Author: nikolay_tikhonov <nt...@gridgain.com>
Authored: Wed Feb 10 21:18:05 2016 +0300
Committer: nikolay_tikhonov <nt...@gridgain.com>
Committed: Wed Feb 10 21:18:56 2016 +0300
----------------------------------------------------------------------
.../continuous/CacheContinuousQueryHandler.java | 91 +---
.../CacheContinuousQueryListener.java | 5 +-
.../continuous/CacheContinuousQueryManager.java | 50 +-
...acheContinuousQueryRandomOperationsTest.java | 543 +++++++++++++++----
4 files changed, 499 insertions(+), 190 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/301abcfa/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
index cf9b439..9f725a6 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
@@ -327,7 +327,7 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
boolean notify = true;
- if (rmtFilter != null) {
+ if (!evt.entry().isFiltered() && rmtFilter != null) {
try {
notify = rmtFilter.evaluate(evt);
}
@@ -472,38 +472,15 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
sendBackupAcknowledge(ackBuf.acknowledgeOnTimeout(), routineId, ctx);
}
- @Override public void skipUpdateEvent(CacheContinuousQueryEvent<K, V> evt, AffinityTopologyVersion topVer) {
- try {
- assert evt != null;
-
- CacheContinuousQueryEntry e = evt.entry();
-
- EntryBuffer buf = entryBufs.get(e.partition());
-
- if (buf == null) {
- buf = new EntryBuffer();
-
- EntryBuffer oldRec = entryBufs.putIfAbsent(e.partition(), buf);
+ @Override public void skipUpdateEvent(CacheContinuousQueryEvent<K, V> evt, AffinityTopologyVersion topVer,
+ boolean primary, boolean recordIgniteEvt) {
+ assert evt != null;
- if (oldRec != null)
- buf = oldRec;
- }
-
- e = buf.skipEntry(e);
+ CacheContinuousQueryEntry e = evt.entry();
- if (e != null && !ctx.localNodeId().equals(nodeId))
- ctx.continuous().addNotification(nodeId, routineId, e, topic, sync, true);
- }
- catch (ClusterTopologyCheckedException ex) {
- IgniteLogger log = ctx.log(getClass());
+ e.markFiltered();
- if (log.isDebugEnabled())
- log.debug("Failed to send event notification to node, node left cluster " +
- "[node=" + nodeId + ", err=" + ex + ']');
- }
- catch (IgniteCheckedException ex) {
- U.error(ctx.log(getClass()), "Failed to send event notification to node: " + nodeId, ex);
- }
+ onEntryUpdated(evt, primary, recordIgniteEvt);
}
@Override public void onPartitionEvicted(int part) {
@@ -618,20 +595,22 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
for (CacheContinuousQueryEntry e : entries)
entries0.addAll(handleEvent(ctx, e));
- Iterable<CacheEntryEvent<? extends K, ? extends V>> evts = F.viewReadOnly(entries0,
- new C1<CacheContinuousQueryEntry, CacheEntryEvent<? extends K, ? extends V>>() {
- @Override public CacheEntryEvent<? extends K, ? extends V> apply(CacheContinuousQueryEntry e) {
- return new CacheContinuousQueryEvent<>(cache, cctx, e);
- }
- },
- new IgnitePredicate<CacheContinuousQueryEntry>() {
- @Override public boolean apply(CacheContinuousQueryEntry entry) {
- return !entry.isFiltered();
+ if (!entries0.isEmpty()) {
+ Iterable<CacheEntryEvent<? extends K, ? extends V>> evts = F.viewReadOnly(entries0,
+ new C1<CacheContinuousQueryEntry, CacheEntryEvent<? extends K, ? extends V>>() {
+ @Override public CacheEntryEvent<? extends K, ? extends V> apply(CacheContinuousQueryEntry e) {
+ return new CacheContinuousQueryEvent<>(cache, cctx, e);
+ }
+ },
+ new IgnitePredicate<CacheContinuousQueryEntry>() {
+ @Override public boolean apply(CacheContinuousQueryEntry entry) {
+ return !entry.isFiltered();
+ }
}
- }
- );
+ );
- locLsnr.onUpdated(evts);
+ locLsnr.onUpdated(evts);
+ }
}
/**
@@ -878,32 +857,6 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
}
/**
- * @param e Entry.
- * @return Continuous query entry.
- */
- private CacheContinuousQueryEntry skipEntry(CacheContinuousQueryEntry e) {
- if (lastFiredCntr.get() > e.updateCounter() || e.updateCounter() == 1L) {
- e.markFiltered();
-
- return e;
- }
- else {
- buf.add(e.updateCounter());
-
- // Double check. If another thread sent a event with counter higher than this event.
- if (lastFiredCntr.get() > e.updateCounter() && buf.contains(e.updateCounter())) {
- buf.remove(e.updateCounter());
-
- e.markFiltered();
-
- return e;
- }
- else
- return null;
- }
- }
-
- /**
* Add continuous entry.
*
* @param e Cache continuous query entry.
@@ -1037,7 +990,7 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
nodes.addAll(ctx.discovery().cacheNodes(topVer));
for (ClusterNode node : nodes) {
- if (!node.id().equals(ctx.localNodeId())) {
+ if (!node.id().equals(ctx.localNodeId()) && !node.isClient()) {
try {
cctx.io().send(node, msg, GridIoPolicy.SYSTEM_POOL);
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/301abcfa/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryListener.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryListener.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryListener.java
index dce04de..6a5c7e6 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryListener.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryListener.java
@@ -67,8 +67,11 @@ public interface CacheContinuousQueryListener<K, V> {
/**
* @param evt Event
* @param topVer Topology version.
+ * @param primary Primary
+ * @param recordIgniteEvt Whether to record event.
*/
- public void skipUpdateEvent(CacheContinuousQueryEvent<K, V> evt, AffinityTopologyVersion topVer);
+ public void skipUpdateEvent(CacheContinuousQueryEvent<K, V> evt, AffinityTopologyVersion topVer,
+ boolean primary, boolean recordIgniteEvt);
/**
* @param part Partition.
http://git-wip-us.apache.org/repos/asf/ignite/blob/301abcfa/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
index cc59989..afc3f24 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
@@ -163,7 +163,10 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter {
* @param topVer Topology version.
*/
public void skipUpdateEvent(Map<UUID, CacheContinuousQueryListener> lsnrs,
- KeyCacheObject key, int partId, long updCntr, AffinityTopologyVersion topVer) {
+ KeyCacheObject key,
+ int partId,
+ long updCntr,
+ AffinityTopologyVersion topVer) {
assert lsnrs != null;
for (CacheContinuousQueryListener lsnr : lsnrs.values()) {
@@ -181,7 +184,44 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter {
CacheContinuousQueryEvent evt = new CacheContinuousQueryEvent<>(
cctx.kernalContext().cache().jcache(cctx.name()), cctx, e0);
- lsnr.skipUpdateEvent(evt, topVer);
+ lsnr.skipUpdateEvent(evt, topVer, true, false);
+ }
+ }
+
+ /**
+ * @param lsnrs Listeners to notify.
+ * @param key Entry key.
+ * @param partId Partition id.
+ * @param updCntr Updated counter.
+ * @param topVer Topology version.
+ * @param primary Primary.
+ * @param recordIgniteEvt Whether to record event.
+ */
+ public void skipUpdateEvent(Map<UUID, CacheContinuousQueryListener> lsnrs,
+ KeyCacheObject key,
+ int partId,
+ long updCntr,
+ boolean primary,
+ boolean recordIgniteEvt,
+ AffinityTopologyVersion topVer) {
+ assert lsnrs != null;
+
+ for (CacheContinuousQueryListener lsnr : lsnrs.values()) {
+ CacheContinuousQueryEntry e0 = new CacheContinuousQueryEntry(
+ cctx.cacheId(),
+ UPDATED,
+ key,
+ null,
+ null,
+ lsnr.keepBinary(),
+ partId,
+ updCntr,
+ topVer);
+
+ CacheContinuousQueryEvent evt = new CacheContinuousQueryEvent<>(
+ cctx.kernalContext().cache().jcache(cctx.name()), cctx, e0);
+
+ lsnr.skipUpdateEvent(evt, topVer, primary, recordIgniteEvt);
}
}
@@ -277,8 +317,10 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter {
boolean hasNewVal = newVal != null;
boolean hasOldVal = oldVal != null;
+ boolean recordIgniteEvt = primary && !internal && cctx.gridEvents().isRecordable(EVT_CACHE_QUERY_OBJECT_READ);
+
if (!hasNewVal && !hasOldVal) {
- skipUpdateEvent(lsnrCol, key, partId, updateCntr, topVer);
+ skipUpdateEvent(lsnrCol, key, partId, updateCntr, primary, recordIgniteEvt, topVer);
return;
}
@@ -287,8 +329,6 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter {
boolean initialized = false;
- boolean recordIgniteEvt = primary && !internal && cctx.gridEvents().isRecordable(EVT_CACHE_QUERY_OBJECT_READ);
-
for (CacheContinuousQueryListener lsnr : lsnrCol.values()) {
if (preload && !lsnr.notifyExisting())
continue;
http://git-wip-us.apache.org/repos/asf/ignite/blob/301abcfa/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryRandomOperationsTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryRandomOperationsTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryRandomOperationsTest.java
index d9b2091..4eb133e 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryRandomOperationsTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryRandomOperationsTest.java
@@ -18,6 +18,7 @@
package org.apache.ignite.internal.processors.cache.query.continuous;
import java.io.Serializable;
+import java.util.Map;
import java.util.Random;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
@@ -31,10 +32,12 @@ import javax.cache.integration.CacheLoaderException;
import javax.cache.integration.CacheWriterException;
import javax.cache.processor.EntryProcessor;
import javax.cache.processor.MutableEntry;
+import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.cache.CacheAtomicityMode;
import org.apache.ignite.cache.CacheMemoryMode;
import org.apache.ignite.cache.CacheMode;
+import org.apache.ignite.cache.affinity.Affinity;
import org.apache.ignite.cache.query.ContinuousQuery;
import org.apache.ignite.cache.query.QueryCursor;
import org.apache.ignite.cache.store.CacheStore;
@@ -46,6 +49,7 @@ import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.apache.ignite.transactions.Transaction;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static java.util.concurrent.TimeUnit.SECONDS;
@@ -76,6 +80,9 @@ public class CacheContinuousQueryRandomOperationsTest extends GridCommonAbstract
private static final int VALS = 10;
/** */
+ public static final int ITERATION_CNT = 1000;
+
+ /** */
private boolean client;
/** {@inheritDoc} */
@@ -110,6 +117,19 @@ public class CacheContinuousQueryRandomOperationsTest extends GridCommonAbstract
/**
* @throws Exception If failed.
*/
+ public void testAtomicClient() throws Exception {
+ CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED,
+ 1,
+ ATOMIC,
+ ONHEAP_TIERED,
+ false);
+
+ testContinuousQuery(ccfg, true, false);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
public void testAtomic() throws Exception {
CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED,
1,
@@ -117,7 +137,7 @@ public class CacheContinuousQueryRandomOperationsTest extends GridCommonAbstract
ONHEAP_TIERED,
false);
- testContinuousQuery(ccfg);
+ testContinuousQuery(ccfg, false, false);
}
/**
@@ -130,7 +150,20 @@ public class CacheContinuousQueryRandomOperationsTest extends GridCommonAbstract
ONHEAP_TIERED,
false);
- testContinuousQuery(ccfg);
+ testContinuousQuery(ccfg, false, false);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testAtomicReplicatedClient() throws Exception {
+ CacheConfiguration<Object, Object> ccfg = cacheConfiguration(REPLICATED,
+ 0,
+ ATOMIC,
+ ONHEAP_TIERED,
+ false);
+
+ testContinuousQuery(ccfg, true, false);
}
/**
@@ -143,7 +176,20 @@ public class CacheContinuousQueryRandomOperationsTest extends GridCommonAbstract
OFFHEAP_VALUES,
false);
- testContinuousQuery(ccfg);
+ testContinuousQuery(ccfg, false, false);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testAtomicOffheapValuesClient() throws Exception {
+ CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED,
+ 1,
+ ATOMIC,
+ OFFHEAP_VALUES,
+ false);
+
+ testContinuousQuery(ccfg, true, false);
}
/**
@@ -156,7 +202,20 @@ public class CacheContinuousQueryRandomOperationsTest extends GridCommonAbstract
OFFHEAP_TIERED,
false);
- testContinuousQuery(ccfg);
+ testContinuousQuery(ccfg, false, false);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testAtomicOffheapTieredClient() throws Exception {
+ CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED,
+ 1,
+ ATOMIC,
+ OFFHEAP_TIERED,
+ false);
+
+ testContinuousQuery(ccfg, true, false);
}
/**
@@ -169,7 +228,20 @@ public class CacheContinuousQueryRandomOperationsTest extends GridCommonAbstract
ONHEAP_TIERED,
false);
- testContinuousQuery(ccfg);
+ testContinuousQuery(ccfg, false, false);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testAtomicNoBackupsClient() throws Exception {
+ CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED,
+ 0,
+ ATOMIC,
+ ONHEAP_TIERED,
+ false);
+
+ testContinuousQuery(ccfg, true, false);
}
/**
@@ -182,7 +254,46 @@ public class CacheContinuousQueryRandomOperationsTest extends GridCommonAbstract
ONHEAP_TIERED,
false);
- testContinuousQuery(ccfg);
+ testContinuousQuery(ccfg, false, false);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testTxExplicit() throws Exception {
+ CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED,
+ 1,
+ TRANSACTIONAL,
+ ONHEAP_TIERED,
+ false);
+
+ testContinuousQuery(ccfg, false, true);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testTxClient() throws Exception {
+ CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED,
+ 1,
+ TRANSACTIONAL,
+ ONHEAP_TIERED,
+ false);
+
+ testContinuousQuery(ccfg, true, false);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testTxClientExplicit() throws Exception {
+ CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED,
+ 1,
+ TRANSACTIONAL,
+ ONHEAP_TIERED,
+ false);
+
+ testContinuousQuery(ccfg, true, false);
}
/**
@@ -195,7 +306,20 @@ public class CacheContinuousQueryRandomOperationsTest extends GridCommonAbstract
ONHEAP_TIERED,
false);
- testContinuousQuery(ccfg);
+ testContinuousQuery(ccfg, false, false);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testTxReplicatedClient() throws Exception {
+ CacheConfiguration<Object, Object> ccfg = cacheConfiguration(REPLICATED,
+ 0,
+ TRANSACTIONAL,
+ ONHEAP_TIERED,
+ false);
+
+ testContinuousQuery(ccfg, true, false);
}
/**
@@ -208,7 +332,33 @@ public class CacheContinuousQueryRandomOperationsTest extends GridCommonAbstract
OFFHEAP_VALUES,
false);
- testContinuousQuery(ccfg);
+ testContinuousQuery(ccfg, false, false);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testTxOffheapValuesExplicit() throws Exception {
+ CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED,
+ 1,
+ TRANSACTIONAL,
+ OFFHEAP_VALUES,
+ false);
+
+ testContinuousQuery(ccfg, false, true);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testTxOffheapValuesClient() throws Exception {
+ CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED,
+ 1,
+ TRANSACTIONAL,
+ OFFHEAP_VALUES,
+ false);
+
+ testContinuousQuery(ccfg, true, false);
}
/**
@@ -221,7 +371,33 @@ public class CacheContinuousQueryRandomOperationsTest extends GridCommonAbstract
OFFHEAP_TIERED,
false);
- testContinuousQuery(ccfg);
+ testContinuousQuery(ccfg, false, false);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testTxOffheapTieredClient() throws Exception {
+ CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED,
+ 1,
+ TRANSACTIONAL,
+ OFFHEAP_TIERED,
+ false);
+
+ testContinuousQuery(ccfg, true, false);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testTxOffheapTieredClientExplicit() throws Exception {
+ CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED,
+ 1,
+ TRANSACTIONAL,
+ OFFHEAP_TIERED,
+ false);
+
+ testContinuousQuery(ccfg, true, true);
}
/**
@@ -234,18 +410,52 @@ public class CacheContinuousQueryRandomOperationsTest extends GridCommonAbstract
ONHEAP_TIERED,
false);
- testContinuousQuery(ccfg);
+ testContinuousQuery(ccfg, false, false);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testTxNoBackupsExplicit() throws Exception {
+ CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED,
+ 0,
+ TRANSACTIONAL,
+ ONHEAP_TIERED,
+ false);
+
+ testContinuousQuery(ccfg, false, false);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testTxNoBackupsClient() throws Exception {
+ CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED,
+ 0,
+ TRANSACTIONAL,
+ ONHEAP_TIERED,
+ false);
+
+ testContinuousQuery(ccfg, true, false);
}
/**
* @param ccfg Cache configuration.
+ * @param client Client.
+ * @param expTx Explicit tx.
* @throws Exception If failed.
*/
- private void testContinuousQuery(CacheConfiguration<Object, Object> ccfg) throws Exception {
+ private void testContinuousQuery(CacheConfiguration<Object, Object> ccfg, boolean client, boolean expTx)
+ throws Exception {
ignite(0).createCache(ccfg);
try {
- IgniteCache<Object, Object> cache = ignite(NODES - 1).cache(ccfg.getName());
+ IgniteCache<Object, Object> cache;
+
+ if (client)
+ cache = ignite(NODES - 1).cache(ccfg.getName());
+ else
+ cache = ignite(0).cache(ccfg.getName());
long seed = System.currentTimeMillis();
@@ -256,15 +466,12 @@ public class CacheContinuousQueryRandomOperationsTest extends GridCommonAbstract
ContinuousQuery<Object, Object> qry = new ContinuousQuery<>();
final BlockingQueue<CacheEntryEvent<?, ?>> evtsQueue =
- new ArrayBlockingQueue<>(10_000);
+ new ArrayBlockingQueue<>(50_000);
qry.setLocalListener(new CacheEntryUpdatedListener<Object, Object>() {
@Override public void onUpdated(Iterable<CacheEntryEvent<?, ?>> evts) {
- for (CacheEntryEvent<?, ?> evt : evts) {
- // System.out.println("Event: " + evt);
-
+ for (CacheEntryEvent<?, ?> evt : evts)
evtsQueue.add(evt);
- }
}
});
@@ -272,12 +479,14 @@ public class CacheContinuousQueryRandomOperationsTest extends GridCommonAbstract
ConcurrentMap<Object, Object> expData = new ConcurrentHashMap<>();
+ Map<Integer, Long> partCntr = new ConcurrentHashMap<>();
+
try {
- for (int i = 0; i < 1000; i++) {
+ for (int i = 0; i < ITERATION_CNT; i++) {
if (i % 100 == 0)
log.info("Iteration: " + i);
- randomUpdate(rnd, evtsQueue, expData, cache);
+ randomUpdate(rnd, evtsQueue, expData, partCntr, cache, expTx);
}
}
finally {
@@ -293,14 +502,18 @@ public class CacheContinuousQueryRandomOperationsTest extends GridCommonAbstract
* @param rnd Random generator.
* @param evtsQueue Events queue.
* @param expData Expected cache data.
+ * @param partCntr Partition counter.
* @param cache Cache.
+ * @param expTx Explicit TX.
* @throws Exception If failed.
*/
private void randomUpdate(
Random rnd,
BlockingQueue<CacheEntryEvent<?, ?>> evtsQueue,
ConcurrentMap<Object, Object> expData,
- IgniteCache<Object, Object> cache)
+ Map<Integer, Long> partCntr,
+ IgniteCache<Object, Object> cache,
+ boolean expTx)
throws Exception {
Object key = new QueryTestKey(rnd.nextInt(KEYS));
Object newVal = value(rnd);
@@ -308,159 +521,250 @@ public class CacheContinuousQueryRandomOperationsTest extends GridCommonAbstract
int op = rnd.nextInt(11);
- // log.info("Random operation [key=" + key + ", op=" + op + ']');
+ Ignite ignite = cache.unwrap(Ignite.class);
- switch (op) {
- case 0: {
- cache.put(key, newVal);
+ Transaction tx = null;
- waitEvent(evtsQueue, key, newVal, oldVal);
+ if (expTx && cache.getConfiguration(CacheConfiguration.class).getAtomicityMode() == TRANSACTIONAL)
+ tx = ignite.transactions().txStart();
- expData.put(key, newVal);
+ try {
+ // log.info("Random operation [key=" + key + ", op=" + op + ']');
- break;
- }
+ switch (op) {
+ case 0: {
+ cache.put(key, newVal);
- case 1: {
- cache.getAndPut(key, newVal);
+ if (tx != null)
+ tx.commit();
- waitEvent(evtsQueue, key, newVal, oldVal);
+ updatePartitionCounter(cache, key, partCntr);
- expData.put(key, newVal);
+ waitAndCheckEvent(evtsQueue, partCntr, affinity(cache), key, newVal, oldVal);
- break;
- }
+ expData.put(key, newVal);
- case 2: {
- cache.remove(key);
+ break;
+ }
- waitEvent(evtsQueue, key, null, oldVal);
+ case 1: {
+ cache.getAndPut(key, newVal);
- expData.remove(key);
+ if (tx != null)
+ tx.commit();
- break;
- }
+ updatePartitionCounter(cache, key, partCntr);
- case 3: {
- cache.getAndRemove(key);
+ waitAndCheckEvent(evtsQueue, partCntr, affinity(cache), key, newVal, oldVal);
- waitEvent(evtsQueue, key, null, oldVal);
+ expData.put(key, newVal);
- expData.remove(key);
+ break;
+ }
- break;
- }
+ case 2: {
+ cache.remove(key);
- case 4: {
- cache.invoke(key, new EntrySetValueProcessor(newVal, rnd.nextBoolean()));
+ if (tx != null)
+ tx.commit();
- waitEvent(evtsQueue, key, newVal, oldVal);
+ updatePartitionCounter(cache, key, partCntr);
- expData.put(key, newVal);
+ waitAndCheckEvent(evtsQueue, partCntr, affinity(cache), key, null, oldVal);
- break;
- }
+ expData.remove(key);
- case 5: {
- cache.invoke(key, new EntrySetValueProcessor(null, rnd.nextBoolean()));
+ break;
+ }
- waitEvent(evtsQueue, key, null, oldVal);
+ case 3: {
+ cache.getAndRemove(key);
- expData.remove(key);
+ if (tx != null)
+ tx.commit();
- break;
- }
+ updatePartitionCounter(cache, key, partCntr);
- case 6: {
- cache.putIfAbsent(key, newVal);
+ waitAndCheckEvent(evtsQueue, partCntr, affinity(cache), key, null, oldVal);
- if (oldVal == null) {
- waitEvent(evtsQueue, key, newVal, null);
+ expData.remove(key);
- expData.put(key, newVal);
+ break;
}
- else
- checkNoEvent(evtsQueue);
- break;
- }
+ case 4: {
+ cache.invoke(key, new EntrySetValueProcessor(newVal, rnd.nextBoolean()));
- case 7: {
- cache.getAndPutIfAbsent(key, newVal);
+ if (tx != null)
+ tx.commit();
- if (oldVal == null) {
- waitEvent(evtsQueue, key, newVal, null);
+ updatePartitionCounter(cache, key, partCntr);
+
+ waitAndCheckEvent(evtsQueue, partCntr, affinity(cache), key, newVal, oldVal);
expData.put(key, newVal);
+
+ break;
}
- else
- checkNoEvent(evtsQueue);
- break;
- }
+ case 5: {
+ cache.invoke(key, new EntrySetValueProcessor(null, rnd.nextBoolean()));
- case 8: {
- cache.replace(key, newVal);
+ if (tx != null)
+ tx.commit();
- if (oldVal != null) {
- waitEvent(evtsQueue, key, newVal, oldVal);
+ updatePartitionCounter(cache, key, partCntr);
- expData.put(key, newVal);
+ waitAndCheckEvent(evtsQueue, partCntr, affinity(cache), key, null, oldVal);
+
+ expData.remove(key);
+
+ break;
}
- else
- checkNoEvent(evtsQueue);
- break;
- }
+ case 6: {
+ cache.putIfAbsent(key, newVal);
- case 9: {
- cache.getAndReplace(key, newVal);
+ if (tx != null)
+ tx.commit();
- if (oldVal != null) {
- waitEvent(evtsQueue, key, newVal, oldVal);
+ if (oldVal == null) {
+ updatePartitionCounter(cache, key, partCntr);
- expData.put(key, newVal);
+ waitAndCheckEvent(evtsQueue, partCntr, affinity(cache), key, newVal, null);
+
+ expData.put(key, newVal);
+ }
+ else
+ checkNoEvent(evtsQueue);
+
+ break;
}
- else
- checkNoEvent(evtsQueue);
- break;
- }
+ case 7: {
+ cache.getAndPutIfAbsent(key, newVal);
+
+ if (tx != null)
+ tx.commit();
+
+ if (oldVal == null) {
+ updatePartitionCounter(cache, key, partCntr);
+
+ waitAndCheckEvent(evtsQueue, partCntr, affinity(cache), key, newVal, null);
+
+ expData.put(key, newVal);
+ }
+ else
+ checkNoEvent(evtsQueue);
+
+ break;
+ }
+
+ case 8: {
+ cache.replace(key, newVal);
+
+ if (tx != null)
+ tx.commit();
+
+ if (oldVal != null) {
+ updatePartitionCounter(cache, key, partCntr);
+
+ waitAndCheckEvent(evtsQueue, partCntr, affinity(cache), key, newVal, oldVal);
+
+ expData.put(key, newVal);
+ }
+ else
+ checkNoEvent(evtsQueue);
- case 10: {
- if (oldVal != null) {
- Object replaceVal = value(rnd);
+ break;
+ }
+
+ case 9: {
+ cache.getAndReplace(key, newVal);
- boolean success = replaceVal.equals(oldVal);
+ if (tx != null)
+ tx.commit();
- if (success) {
- cache.replace(key, replaceVal, newVal);
+ if (oldVal != null) {
+ updatePartitionCounter(cache, key, partCntr);
- waitEvent(evtsQueue, key, newVal, oldVal);
+ waitAndCheckEvent(evtsQueue, partCntr, affinity(cache), key, newVal, oldVal);
expData.put(key, newVal);
}
+ else
+ checkNoEvent(evtsQueue);
+
+ break;
+ }
+
+ case 10: {
+ if (oldVal != null) {
+ Object replaceVal = value(rnd);
+
+ boolean success = replaceVal.equals(oldVal);
+
+ if (success) {
+ cache.replace(key, replaceVal, newVal);
+
+ if (tx != null)
+ tx.commit();
+
+ updatePartitionCounter(cache, key, partCntr);
+
+ waitAndCheckEvent(evtsQueue, partCntr, affinity(cache), key, newVal, oldVal);
+
+ expData.put(key, newVal);
+ }
+ else {
+ cache.replace(key, replaceVal, newVal);
+
+ if (tx != null)
+ tx.commit();
+
+ checkNoEvent(evtsQueue);
+ }
+ }
else {
- cache.replace(key, replaceVal, newVal);
+ cache.replace(key, value(rnd), newVal);
+
+ if (tx != null)
+ tx.commit();
checkNoEvent(evtsQueue);
}
- }
- else {
- cache.replace(key, value(rnd), newVal);
- checkNoEvent(evtsQueue);
+ break;
}
- break;
+ default:
+ fail();
}
-
- default:
- fail();
+ } finally {
+ if (tx != null)
+ tx.close();
}
}
/**
+ * @param cache Cache.
+ * @param key Key
+ * @param cntrs Partition counters.
+ */
+ private void updatePartitionCounter(IgniteCache<Object, Object> cache, Object key, Map<Integer, Long> cntrs) {
+ Affinity<Object> aff = cache.unwrap(Ignite.class).affinity(cache.getName());
+
+ int part = aff.partition(key);
+
+ Long partCntr = cntrs.get(part);
+
+ if (partCntr == null)
+ partCntr = 0L;
+
+ cntrs.put(part, ++partCntr);
+ }
+
+ /**
* @param rnd Random generator.
* @return Cache value.
*/
@@ -470,13 +774,19 @@ public class CacheContinuousQueryRandomOperationsTest extends GridCommonAbstract
/**
* @param evtsQueue Event queue.
+ * @param partCntrs Partition counters.
+ * @param aff Affinity function.
* @param key Key.
* @param val Value.
* @param oldVal Old value.
* @throws Exception If failed.
*/
- private void waitEvent(BlockingQueue<CacheEntryEvent<?, ?>> evtsQueue,
- Object key, Object val, Object oldVal) throws Exception {
+ private void waitAndCheckEvent(BlockingQueue<CacheEntryEvent<?, ?>> evtsQueue,
+ Map<Integer, Long> partCntrs,
+ Affinity<Object> aff,
+ Object key,
+ Object val,
+ Object oldVal) throws Exception {
if (val == null && oldVal == null) {
checkNoEvent(evtsQueue);
@@ -485,12 +795,15 @@ public class CacheContinuousQueryRandomOperationsTest extends GridCommonAbstract
CacheEntryEvent<?, ?> evt = evtsQueue.poll(5, SECONDS);
- assertNotNull("Failed to wait for event [key=" + key +
- ", val=" + val +
- ", oldVal=" + oldVal + ']', evt);
+ assertNotNull("Failed to wait for event [key=" + key + ", val=" + val + ", oldVal=" + oldVal + ']', evt);
assertEquals(key, evt.getKey());
assertEquals(val, evt.getValue());
assertEquals(oldVal, evt.getOldValue());
+
+ Long cntr = partCntrs.get(aff.partition(key));
+
+ assertNotNull(cntr);
+ assertEquals(cntr, evt.unwrap(Long.class));
}
/**
@@ -644,6 +957,7 @@ public class CacheContinuousQueryRandomOperationsTest extends GridCommonAbstract
return S.toString(QueryTestValue.class, this);
}
}
+
/**
*
*/
@@ -680,5 +994,4 @@ public class CacheContinuousQueryRandomOperationsTest extends GridCommonAbstract
return S.toString(EntrySetValueProcessor.class, this);
}
}
-
}