You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by nt...@apache.org on 2016/02/10 19:18:49 UTC

[1/2] ignite git commit: IGNITE-2515 Implemented that unwrap(Long) returns update counter. Added tests.

Repository: ignite
Updated Branches:
  refs/heads/ignite-2515 [created] 301abcfa6


IGNITE-2515 Implemented that unwrap(Long) returns update counter. Added tests.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/4252c5db
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/4252c5db
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/4252c5db

Branch: refs/heads/ignite-2515
Commit: 4252c5db69161dcff56b1b0690b3c992bbb904ba
Parents: d08a779
Author: nikolay_tikhonov <nt...@gridgain.com>
Authored: Tue Feb 9 16:12:16 2016 +0300
Committer: nikolay_tikhonov <nt...@gridgain.com>
Committed: Wed Feb 10 18:22:12 2016 +0300

----------------------------------------------------------------------
 .../continuous/CacheContinuousQueryEvent.java   |  10 +-
 ...CacheContinuousQueryCounterAbstractTest.java | 614 +++++++++++++++++++
 ...inuousQueryCounterPartitionedAtomicTest.java |  41 ++
 ...ContinuousQueryCounterPartitionedTxTest.java |  41 ++
 ...tinuousQueryCounterReplicatedAtomicTest.java |  41 ++
 ...eContinuousQueryCounterReplicatedTxTest.java |  41 ++
 .../IgniteCacheQuerySelfTestSuite.java          |   8 +
 7 files changed, 791 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/4252c5db/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEvent.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEvent.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEvent.java
index d26e666..d1c7c28 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEvent.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEvent.java
@@ -57,8 +57,7 @@ class CacheContinuousQueryEvent<K, V> extends CacheEntryEvent<K, V> {
     }
 
     /** {@inheritDoc} */
-    @Override
-    public K getKey() {
+    @Override public K getKey() {
         return (K)cctx.cacheObjectContext().unwrapBinaryIfNeeded(e.key(), e.isKeepBinary(), false);
     }
 
@@ -68,8 +67,7 @@ class CacheContinuousQueryEvent<K, V> extends CacheEntryEvent<K, V> {
     }
 
     /** {@inheritDoc} */
-    @Override
-    public V getOldValue() {
+    @Override public V getOldValue() {
         return (V)cctx.cacheObjectContext().unwrapBinaryIfNeeded(e.oldValue(), e.isKeepBinary(), false);
     }
 
@@ -80,8 +78,10 @@ class CacheContinuousQueryEvent<K, V> extends CacheEntryEvent<K, V> {
 
     /** {@inheritDoc} */
     @Override public <T> T unwrap(Class<T> cls) {
-        if(cls.isAssignableFrom(getClass()))
+        if (cls.isAssignableFrom(getClass()))
             return cls.cast(this);
+        else if (cls == Long.class)
+            return cls.cast(e.updateCounter());
 
         throw new IllegalArgumentException("Unwrapping to class is not supported: " + cls);
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/4252c5db/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryCounterAbstractTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryCounterAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryCounterAbstractTest.java
new file mode 100644
index 0000000..3a10e1c
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryCounterAbstractTest.java
@@ -0,0 +1,614 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.query.continuous;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicInteger;
+import javax.cache.Cache;
+import javax.cache.configuration.Factory;
+import javax.cache.event.CacheEntryEvent;
+import javax.cache.event.CacheEntryUpdatedListener;
+import javax.cache.integration.CacheWriterException;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.cache.CacheEntryEventSerializableFilter;
+import org.apache.ignite.cache.CacheMode;
+import org.apache.ignite.cache.query.ContinuousQuery;
+import org.apache.ignite.cache.query.QueryCursor;
+import org.apache.ignite.cache.query.ScanQuery;
+import org.apache.ignite.cache.store.CacheStore;
+import org.apache.ignite.cache.store.CacheStoreAdapter;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.configuration.NearCacheConfiguration;
+import org.apache.ignite.internal.processors.datastructures.GridCacheInternalKeyImpl;
+import org.apache.ignite.internal.util.typedef.P2;
+import org.apache.ignite.internal.util.typedef.PA;
+import org.apache.ignite.internal.util.typedef.T2;
+import org.apache.ignite.lang.IgniteBiInClosure;
+import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
+import org.jsr166.ConcurrentHashMap8;
+
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC;
+import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
+import static org.apache.ignite.cache.CacheMode.LOCAL;
+import static org.apache.ignite.cache.CacheMode.PARTITIONED;
+import static org.apache.ignite.cache.CacheRebalanceMode.ASYNC;
+import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
+
+/**
+ * Continuous queries counter tests.
+ */
+public abstract class CacheContinuousQueryCounterAbstractTest extends GridCommonAbstractTest
+    implements Serializable {
+    /** */
+    protected static final String CACHE_NAME = "test_cache";
+
+    /** IP finder. */
+    private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true);
+
+    /** Latch timeout. */
+    protected static final long LATCH_TIMEOUT = 5000;
+
+    /** */
+    private static final String NO_CACHE_GRID_NAME = "noCacheGrid";
+
+    /** {@inheritDoc} */
+    @SuppressWarnings("unchecked")
+    @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+        cfg.setPeerClassLoadingEnabled(peerClassLoadingEnabled());
+
+        if (gridName.equals(NO_CACHE_GRID_NAME))
+            cfg.setClientMode(true);
+
+        TcpDiscoverySpi disco = new TcpDiscoverySpi();
+
+        disco.setIpFinder(IP_FINDER);
+
+        cfg.setDiscoverySpi(disco);
+
+        ((TcpCommunicationSpi)cfg.getCommunicationSpi()).setSharedMemoryPort(-1);
+
+        return cfg;
+    }
+
+    /**
+     * @return Cache configuration.
+     */
+    @NotNull private CacheConfiguration cacheConfiguration() {
+        CacheConfiguration cacheCfg = defaultCacheConfiguration();
+
+        cacheCfg.setName(CACHE_NAME);
+        cacheCfg.setCacheMode(cacheMode());
+        cacheCfg.setAtomicityMode(atomicityMode());
+        cacheCfg.setNearConfiguration(nearConfiguration());
+        cacheCfg.setRebalanceMode(ASYNC);
+        cacheCfg.setWriteSynchronizationMode(FULL_SYNC);
+        cacheCfg.setCacheStoreFactory(new StoreFactory());
+        cacheCfg.setReadThrough(true);
+        cacheCfg.setWriteThrough(true);
+        cacheCfg.setLoadPreviousValue(true);
+
+        return cacheCfg;
+    }
+
+    /**
+     * @return Peer class loading enabled flag.
+     */
+    protected boolean peerClassLoadingEnabled() {
+        return true;
+    }
+
+    /**
+     * @return Distribution.
+     */
+    protected NearCacheConfiguration nearConfiguration() {
+        return new NearCacheConfiguration();
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTestsStarted() throws Exception {
+        startGridsMultiThreaded(gridCount());
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTestsStopped() throws Exception {
+        stopAllGrids();
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTest() throws Exception {
+        GridTestUtils.waitForCondition(new PA() {
+            @Override public boolean apply() {
+                for (int i = 0; i < gridCount(); i++) {
+                    if (grid(i).cluster().nodes().size() != gridCount())
+                        return false;
+                }
+
+                return true;
+            }
+        }, 3000);
+
+        for (int i = 0; i < gridCount(); i++)
+            grid(i).destroyCache(CACHE_NAME);
+
+        for (int i = 0; i < gridCount(); i++)
+            grid(i).getOrCreateCache(cacheConfiguration());
+    }
+
+    /**
+     * @return Cache mode.
+     */
+    protected abstract CacheMode cacheMode();
+
+    /**
+     * @return Atomicity mode.
+     */
+    protected CacheAtomicityMode atomicityMode() {
+        return TRANSACTIONAL;
+    }
+
+    /**
+     * @return Grids count.
+     */
+    protected abstract int gridCount();
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testAllEntries() throws Exception {
+        IgniteCache<Integer, Integer> cache = grid(0).cache(CACHE_NAME);
+
+        ContinuousQuery<Integer, Integer> qry = new ContinuousQuery<>();
+
+        final Map<Integer, List<T2<Integer, Long>>> map = new HashMap<>();
+        final CountDownLatch latch = new CountDownLatch(5);
+
+        qry.setLocalListener(new CacheEntryUpdatedListener<Integer, Integer>() {
+            @Override public void onUpdated(Iterable<CacheEntryEvent<? extends Integer, ? extends Integer>> evts) {
+                for (CacheEntryEvent<? extends Integer, ? extends Integer> e : evts) {
+                    synchronized (map) {
+                        List<T2<Integer, Long>> vals = map.get(e.getKey());
+
+                        if (vals == null) {
+                            vals = new ArrayList<>();
+
+                            map.put(e.getKey(), vals);
+                        }
+
+                        vals.add(new T2<>(e.getValue(), e.unwrap(Long.class)));
+                    }
+
+                    latch.countDown();
+                }
+            }
+        });
+
+        try (QueryCursor<Cache.Entry<Integer, Integer>> ignored = cache.query(qry)) {
+            cache.put(1, 1);
+            cache.put(2, 2);
+            cache.put(3, 3);
+
+            cache.remove(2);
+
+            cache.put(1, 10);
+
+            assert latch.await(LATCH_TIMEOUT, MILLISECONDS);
+
+            assertEquals(3, map.size());
+
+            List<T2<Integer, Long>> vals = map.get(1);
+
+            assertNotNull(vals);
+            assertEquals(2, vals.size());
+            assertEquals(1, (int)vals.get(0).get1());
+            assertEquals(1L, (long)vals.get(0).get2());
+            assertEquals(10, (int)vals.get(1).get1());
+            assertEquals(2L, (long)vals.get(1).get2());
+
+            vals = map.get(2);
+
+            assertNotNull(vals);
+            assertEquals(2, vals.size());
+            assertEquals(2, (int)vals.get(0).get1());
+            assertEquals(1L, (long)vals.get(0).get2());
+            assertNull(vals.get(1).get1());
+
+            vals = map.get(3);
+
+            assertNotNull(vals);
+            assertEquals(1, vals.size());
+            assertEquals(3, (int)vals.get(0).get1());
+            assertEquals(1L, (long)vals.get(0).get2());
+        }
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testTwoQueryListener() throws Exception {
+        if (cacheMode() == LOCAL)
+            return;
+
+        final IgniteCache<Integer, Integer> cache = grid(0).cache(CACHE_NAME);
+        final IgniteCache<Integer, Integer> cache1 = grid(1).cache(CACHE_NAME);
+
+        final AtomicInteger cntr = new AtomicInteger(0);
+        final AtomicInteger cntr1 = new AtomicInteger(0);
+
+        final ContinuousQuery<Integer, Integer> qry1 = new ContinuousQuery<>();
+        final ContinuousQuery<Integer, Integer> qry2 = new ContinuousQuery<>();
+
+        final Map<Integer, List<T2<Integer, Long>>> map1 = new HashMap<>();
+        final Map<Integer, List<T2<Integer, Long>>> map2 = new HashMap<>();
+
+        qry1.setLocalListener(new CacheEntryUpdatedListener<Integer, Integer>() {
+            @Override public void onUpdated(Iterable<CacheEntryEvent<? extends Integer, ? extends Integer>> evts) {
+                for (CacheEntryEvent<? extends Integer, ? extends Integer> e : evts) {
+                    cntr.incrementAndGet();
+
+                    synchronized (map1) {
+                        List<T2<Integer, Long>> vals = map1.get(e.getKey());
+
+                        if (vals == null) {
+                            vals = new ArrayList<>();
+
+                            map1.put(e.getKey(), vals);
+                        }
+
+                        vals.add(new T2<>(e.getValue(), e.unwrap(Long.class)));
+                    }
+                }
+            }
+        });
+
+        qry2.setLocalListener(new CacheEntryUpdatedListener<Integer, Integer>() {
+            @Override public void onUpdated(Iterable<CacheEntryEvent<? extends Integer, ? extends Integer>> evts) {
+                for (CacheEntryEvent<? extends Integer, ? extends Integer> e : evts) {
+                    cntr1.incrementAndGet();
+
+                    synchronized (map2) {
+                        List<T2<Integer, Long>> vals = map2.get(e.getKey());
+
+                        if (vals == null) {
+                            vals = new ArrayList<>();
+
+                            map2.put(e.getKey(), vals);
+                        }
+
+                        vals.add(new T2<>(e.getValue(), e.unwrap(Long.class)));
+                    }
+                }
+            }
+        });
+
+        try (QueryCursor<Cache.Entry<Integer, Integer>> query2 = cache1.query(qry2);
+            QueryCursor<Cache.Entry<Integer, Integer>> query1 = cache.query(qry1)) {
+            for (int i = 0; i < gridCount(); i++) {
+                IgniteCache<Object, Object> cache0 = grid(i).cache(CACHE_NAME);
+
+                cache0.put(1, 1);
+                cache0.put(2, 2);
+                cache0.put(3, 3);
+
+                cache0.remove(1);
+                cache0.remove(2);
+                cache0.remove(3);
+
+                final int iter = i + 1;
+
+                assert GridTestUtils.waitForCondition(new PA() {
+                    @Override public boolean apply() {
+                        return iter * 6 /* count operation */ * 2 /* count continues queries*/
+                            == (cntr.get() + cntr1.get());
+                    }
+                }, 5000L);
+
+                checkEvents(map1, i);
+
+                map1.clear();
+
+                checkEvents(map2, i);
+
+                map2.clear();
+            }
+        }
+    }
+
+    /**
+     * @param evnts Events.
+     * @param iter Iteration.
+     */
+    private void checkEvents(Map<Integer, List<T2<Integer, Long>>> evnts, long iter) {
+        List<T2<Integer, Long>> val = evnts.get(1);
+
+        assertEquals(val.size(), 2);
+
+        // Check put 1
+        assertEquals(iter * 2 + 1, (long)val.get(0).get2());
+        assertEquals(1L, (long)val.get(0).get1());
+
+        // Check remove 1
+        assertEquals(iter * 2 + 2, (long)val.get(1).get2());
+        assertNull(val.get(1).get1());
+
+        val = evnts.get(2);
+
+        assertEquals(val.size(), 2);
+
+        // Check put 2
+        assertEquals(iter * 2 + 1, (long)val.get(0).get2());
+        assertEquals(2L, (long)val.get(0).get1());
+
+        // Check remove 2
+        assertEquals(iter * 2 + 2, (long)val.get(1).get2());
+        assertNull(val.get(1).get1());
+
+        val = evnts.get(3);
+
+        assertEquals(val.size(), 2);
+
+        // Check put 3
+        assertEquals(iter * 2 + 1, (long)val.get(0).get2());
+        assertEquals(3L, (long)val.get(0).get1());
+
+        // Check remove 3
+        assertEquals(iter * 2 + 2, (long)val.get(1).get2());
+        assertNull(val.get(1).get1());
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testRestartQuery() throws Exception {
+        IgniteCache<Integer, Integer> cache = grid(0).cache(CACHE_NAME);
+
+        final int keyCnt = 300;
+
+        final int updateKey = 1;
+
+        for (int i = 0; i < keyCnt; i++)
+            cache.put(updateKey, i);
+
+        for (int i = 0; i < 10; i++) {
+            if (i % 2 == 0) {
+                final AtomicInteger cntr = new AtomicInteger(0);
+
+                ContinuousQuery<Integer, Integer> qry = new ContinuousQuery<>();
+
+                final List<T2<Integer, Long>> vals = new ArrayList<>();
+
+                qry.setLocalListener(new CacheEntryUpdatedListener<Integer, Integer>() {
+                    @Override public void onUpdated(
+                        Iterable<CacheEntryEvent<? extends Integer, ? extends Integer>> evts) {
+                        for (CacheEntryEvent<? extends Integer, ? extends Integer> e : evts) {
+                            cntr.incrementAndGet();
+
+                            synchronized (vals) {
+                                vals.add(new T2<>(e.getValue(), e.unwrap(Long.class)));
+                            }
+                        }
+                    }
+                });
+
+                try (QueryCursor<Cache.Entry<Integer, Integer>> ignore = cache.query(qry)) {
+                    for (int key = 0; key < keyCnt; key++)
+                        cache.put(updateKey, cache.get(updateKey) + 1);
+
+                    assert GridTestUtils.waitForCondition(new PA() {
+                        @Override public boolean apply() {
+                            return cntr.get() == keyCnt;
+                        }
+                    }, 2000L);
+
+                    for (T2<Integer, Long> val : vals) {
+                        assertEquals(vals.size(), keyCnt);
+
+                        assertEquals((long)val.get1() + 1, (long)val.get2());
+                    }
+                }
+            }
+            else {
+                for (int key = 0; key < keyCnt; key++)
+                    cache.put(updateKey, cache.get(updateKey) + 1);
+            }
+        }
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testEntriesByFilter() throws Exception {
+        IgniteCache<Integer, Integer> cache = grid(0).cache(CACHE_NAME);
+
+        ContinuousQuery<Integer, Integer> qry = new ContinuousQuery<>();
+
+        final Map<Integer, List<T2<Integer, Long>>> map = new HashMap<>();
+        final CountDownLatch latch = new CountDownLatch(8);
+
+        qry.setLocalListener(new CacheEntryUpdatedListener<Integer, Integer>() {
+            @Override public void onUpdated(Iterable<CacheEntryEvent<? extends Integer, ? extends Integer>> evts) {
+                for (CacheEntryEvent<? extends Integer, ? extends Integer> e : evts) {
+                    synchronized (map) {
+                        List<T2<Integer, Long>> vals = map.get(e.getKey());
+
+                        if (vals == null) {
+                            vals = new ArrayList<>();
+
+                            map.put(e.getKey(), vals);
+                        }
+
+                        vals.add(new T2<>(e.getValue(), e.unwrap(Long.class)));
+                    }
+
+                    latch.countDown();
+                }
+            }
+        });
+
+        qry.setRemoteFilter(new CacheEntryEventSerializableFilter<Integer,Integer>() {
+            @Override public boolean evaluate(CacheEntryEvent<? extends Integer,? extends Integer> evt) {
+                return evt.getValue() % 2 == 0;
+            }
+        });
+
+        try (QueryCursor<Cache.Entry<Integer, Integer>> ignored = cache.query(qry)) {
+            cache.put(1, 1);
+            cache.put(1, 2);
+            cache.put(1, 3);
+            cache.put(1, 4);
+
+            cache.put(2, 1);
+            cache.put(2, 2);
+            cache.put(2, 3);
+            cache.put(2, 4);
+
+            cache.remove(1);
+            cache.remove(2);
+
+            cache.put(1, 10);
+            cache.put(2, 40);
+
+            assert latch.await(LATCH_TIMEOUT, MILLISECONDS);
+
+            assertEquals(2, map.size());
+
+            List<T2<Integer, Long>> vals = map.get(1);
+
+            assertNotNull(vals);
+            assertEquals(4, vals.size());
+
+            assertEquals((int)vals.get(0).get1(), 2);
+            assertEquals((long)vals.get(0).get1(), (long)vals.get(0).get2());
+
+            assertEquals((int)vals.get(1).get1(), 4);
+            assertEquals((long)vals.get(1).get1(), (long)vals.get(1).get2());
+
+            assertNull(vals.get(2).get1());
+            assertEquals(5, (long)vals.get(2).get2());
+
+            assertEquals((int)vals.get(3).get1(), 10);
+            assertEquals(6, (long)vals.get(3).get2());
+
+            vals = map.get(2);
+
+            assertNotNull(vals);
+            assertEquals(4, vals.size());
+
+            assertEquals((int)vals.get(0).get1(), 2);
+            assertEquals((long)vals.get(0).get1(), (long)vals.get(0).get2());
+
+            assertEquals((int)vals.get(1).get1(), 4);
+            assertEquals((long)vals.get(1).get1(), (long)vals.get(1).get2());
+
+            assertNull(vals.get(2).get1());
+            assertEquals(5, (long)vals.get(2).get2());
+
+            assertEquals((int)vals.get(3).get1(), 40);
+            assertEquals(6, (long)vals.get(3).get2());
+        }
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testLoadCache() throws Exception {
+        IgniteCache<Integer, Integer> cache = grid(0).cache(CACHE_NAME);
+
+        ContinuousQuery<Integer, Integer> qry = new ContinuousQuery<>();
+
+        final Map<Integer, T2<Integer, Long>> map = new ConcurrentHashMap8<>();
+        final CountDownLatch latch = new CountDownLatch(10);
+
+        qry.setLocalListener(new CacheEntryUpdatedListener<Integer, Integer>() {
+            @Override public void onUpdated(Iterable<CacheEntryEvent<? extends Integer, ? extends Integer>> evts) {
+                for (CacheEntryEvent<? extends Integer, ? extends Integer> e : evts) {
+                    map.put(e.getKey(), new T2<>(e.getValue(), e.unwrap(Long.class)));
+
+                    latch.countDown();
+                }
+            }
+        });
+
+        try (QueryCursor<Cache.Entry<Integer, Integer>> ignored = cache.query(qry)) {
+            cache.loadCache(null, 0);
+
+            assert latch.await(LATCH_TIMEOUT, MILLISECONDS) : "Count: " + latch.getCount();
+
+            assertEquals(10, map.size());
+
+            for (int i = 0; i < 10; i++) {
+                assertEquals(i, (int)map.get(i).get1());
+                assertEquals((long)1, (long)map.get(i).get2());
+            }
+        }
+    }
+
+    /**
+     *
+     */
+    private static class StoreFactory implements Factory<CacheStore> {
+        @Override public CacheStore create() {
+            return new TestStore();
+        }
+    }
+
+    /**
+     * Store.
+     */
+    private static class TestStore extends CacheStoreAdapter<Object, Object> {
+        /** {@inheritDoc} */
+        @Override public void loadCache(IgniteBiInClosure<Object, Object> clo, Object... args) {
+            for (int i = 0; i < 10; i++)
+                clo.apply(i, i);
+        }
+
+        /** {@inheritDoc} */
+        @Nullable @Override public Object load(Object key) {
+            return null;
+        }
+
+        /** {@inheritDoc} */
+        @Override public void write(Cache.Entry<?, ?> entry) throws CacheWriterException {
+            // No-op.
+        }
+
+        /** {@inheritDoc} */
+        @Override public void delete(Object key) throws CacheWriterException {
+            // No-op.
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/4252c5db/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryCounterPartitionedAtomicTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryCounterPartitionedAtomicTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryCounterPartitionedAtomicTest.java
new file mode 100644
index 0000000..7b97928
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryCounterPartitionedAtomicTest.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.query.continuous;
+
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.cache.CacheMode;
+
+/**
+ *
+ */
+public class CacheContinuousQueryCounterPartitionedAtomicTest extends CacheContinuousQueryCounterAbstractTest {
+    /** {@inheritDoc} */
+    @Override protected CacheMode cacheMode() {
+        return CacheMode.PARTITIONED;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected CacheAtomicityMode atomicityMode() {
+        return CacheAtomicityMode.ATOMIC;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected int gridCount() {
+        return 2;
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/4252c5db/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryCounterPartitionedTxTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryCounterPartitionedTxTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryCounterPartitionedTxTest.java
new file mode 100644
index 0000000..aa42832
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryCounterPartitionedTxTest.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.query.continuous;
+
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.cache.CacheMode;
+
+/**
+ *
+ */
+public class CacheContinuousQueryCounterPartitionedTxTest extends CacheContinuousQueryCounterAbstractTest {
+    /** {@inheritDoc} */
+    @Override protected CacheMode cacheMode() {
+        return CacheMode.PARTITIONED;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected CacheAtomicityMode atomicityMode() {
+        return CacheAtomicityMode.TRANSACTIONAL;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected int gridCount() {
+        return 2;
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/4252c5db/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryCounterReplicatedAtomicTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryCounterReplicatedAtomicTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryCounterReplicatedAtomicTest.java
new file mode 100644
index 0000000..afa7a22
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryCounterReplicatedAtomicTest.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.query.continuous;
+
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.cache.CacheMode;
+
+/**
+ *
+ */
+public class CacheContinuousQueryCounterReplicatedAtomicTest extends CacheContinuousQueryCounterAbstractTest {
+    /** {@inheritDoc} */
+    @Override protected CacheMode cacheMode() {
+        return CacheMode.REPLICATED;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected CacheAtomicityMode atomicityMode() {
+        return CacheAtomicityMode.ATOMIC;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected int gridCount() {
+        return 2;
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/4252c5db/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryCounterReplicatedTxTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryCounterReplicatedTxTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryCounterReplicatedTxTest.java
new file mode 100644
index 0000000..4ee12de
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryCounterReplicatedTxTest.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.query.continuous;
+
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.cache.CacheMode;
+
+/**
+ *
+ */
+public class CacheContinuousQueryCounterReplicatedTxTest extends CacheContinuousQueryCounterAbstractTest {
+    /** {@inheritDoc} */
+    @Override protected CacheMode cacheMode() {
+        return CacheMode.PARTITIONED;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected CacheAtomicityMode atomicityMode() {
+        return CacheAtomicityMode.TRANSACTIONAL;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected int gridCount() {
+        return 2;
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/4252c5db/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java
index 3cd4579..e161b3a 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java
@@ -70,6 +70,10 @@ import org.apache.ignite.internal.processors.cache.local.IgniteCacheLocalAtomicQ
 import org.apache.ignite.internal.processors.cache.local.IgniteCacheLocalFieldsQuerySelfTest;
 import org.apache.ignite.internal.processors.cache.local.IgniteCacheLocalQuerySelfTest;
 import org.apache.ignite.internal.processors.cache.query.GridCacheSwapScanQuerySelfTest;
+import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryCounterPartitionedAtomicTest;
+import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryCounterPartitionedTxTest;
+import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryCounterReplicatedAtomicTest;
+import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryCounterReplicatedTxTest;
 import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryFailoverAtomicPrimaryWriteOrderOffheapTieredTest;
 import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryFailoverAtomicPrimaryWriteOrderSelfTest;
 import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryFailoverAtomicReplicatedSelfTest;
@@ -206,6 +210,10 @@ public class IgniteCacheQuerySelfTestSuite extends TestSuite {
         suite.addTestSuite(CacheContinuousQueryFailoverAtomicReplicatedSelfTest.class);
         suite.addTestSuite(CacheContinuousQueryFailoverTxSelfTest.class);
         suite.addTestSuite(CacheContinuousQueryFailoverTxReplicatedSelfTest.class);
+        suite.addTestSuite(CacheContinuousQueryCounterPartitionedAtomicTest.class);
+        suite.addTestSuite(CacheContinuousQueryCounterPartitionedTxTest.class);
+        suite.addTestSuite(CacheContinuousQueryCounterReplicatedAtomicTest.class);
+        suite.addTestSuite(CacheContinuousQueryCounterReplicatedTxTest.class);
         suite.addTestSuite(CacheContinuousQueryFailoverAtomicPrimaryWriteOrderOffheapTieredTest.class);
         suite.addTestSuite(CacheContinuousQueryFailoverTxOffheapTieredTest.class);
         suite.addTestSuite(CacheContinuousQueryRandomOperationsTest.class);


[2/2] ignite git commit: IGNITE-2515 Improvement tests.

Posted by nt...@apache.org.
IGNITE-2515 Improvement tests.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/301abcfa
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/301abcfa
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/301abcfa

Branch: refs/heads/ignite-2515
Commit: 301abcfa63290ac27407979c33849663f14f5d9b
Parents: 4252c5d
Author: nikolay_tikhonov <nt...@gridgain.com>
Authored: Wed Feb 10 21:18:05 2016 +0300
Committer: nikolay_tikhonov <nt...@gridgain.com>
Committed: Wed Feb 10 21:18:56 2016 +0300

----------------------------------------------------------------------
 .../continuous/CacheContinuousQueryHandler.java |  91 +---
 .../CacheContinuousQueryListener.java           |   5 +-
 .../continuous/CacheContinuousQueryManager.java |  50 +-
 ...acheContinuousQueryRandomOperationsTest.java | 543 +++++++++++++++----
 4 files changed, 499 insertions(+), 190 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/301abcfa/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
index cf9b439..9f725a6 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
@@ -327,7 +327,7 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
 
                 boolean notify = true;
 
-                if (rmtFilter != null) {
+                if (!evt.entry().isFiltered() && rmtFilter != null) {
                     try {
                         notify = rmtFilter.evaluate(evt);
                     }
@@ -472,38 +472,15 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
                 sendBackupAcknowledge(ackBuf.acknowledgeOnTimeout(), routineId, ctx);
             }
 
-            @Override public void skipUpdateEvent(CacheContinuousQueryEvent<K, V> evt, AffinityTopologyVersion topVer) {
-                try {
-                    assert evt != null;
-
-                    CacheContinuousQueryEntry e = evt.entry();
-
-                    EntryBuffer buf = entryBufs.get(e.partition());
-
-                    if (buf == null) {
-                        buf = new EntryBuffer();
-
-                        EntryBuffer oldRec = entryBufs.putIfAbsent(e.partition(), buf);
+            @Override public void skipUpdateEvent(CacheContinuousQueryEvent<K, V> evt, AffinityTopologyVersion topVer,
+                boolean primary, boolean recordIgniteEvt) {
+                assert evt != null;
 
-                        if (oldRec != null)
-                            buf = oldRec;
-                    }
-
-                    e = buf.skipEntry(e);
+                CacheContinuousQueryEntry e = evt.entry();
 
-                    if (e != null && !ctx.localNodeId().equals(nodeId))
-                        ctx.continuous().addNotification(nodeId, routineId, e, topic, sync, true);
-                }
-                catch (ClusterTopologyCheckedException ex) {
-                    IgniteLogger log = ctx.log(getClass());
+                e.markFiltered();
 
-                    if (log.isDebugEnabled())
-                        log.debug("Failed to send event notification to node, node left cluster " +
-                                "[node=" + nodeId + ", err=" + ex + ']');
-                }
-                catch (IgniteCheckedException ex) {
-                    U.error(ctx.log(getClass()), "Failed to send event notification to node: " + nodeId, ex);
-                }
+                onEntryUpdated(evt, primary, recordIgniteEvt);
             }
 
             @Override public void onPartitionEvicted(int part) {
@@ -618,20 +595,22 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
         for (CacheContinuousQueryEntry e : entries)
             entries0.addAll(handleEvent(ctx, e));
 
-        Iterable<CacheEntryEvent<? extends K, ? extends V>> evts = F.viewReadOnly(entries0,
-            new C1<CacheContinuousQueryEntry, CacheEntryEvent<? extends K, ? extends V>>() {
-                @Override public CacheEntryEvent<? extends K, ? extends V> apply(CacheContinuousQueryEntry e) {
-                    return new CacheContinuousQueryEvent<>(cache, cctx, e);
-                }
-            },
-            new IgnitePredicate<CacheContinuousQueryEntry>() {
-                @Override public boolean apply(CacheContinuousQueryEntry entry) {
-                    return !entry.isFiltered();
+        if (!entries0.isEmpty()) {
+            Iterable<CacheEntryEvent<? extends K, ? extends V>> evts = F.viewReadOnly(entries0,
+                new C1<CacheContinuousQueryEntry, CacheEntryEvent<? extends K, ? extends V>>() {
+                    @Override public CacheEntryEvent<? extends K, ? extends V> apply(CacheContinuousQueryEntry e) {
+                        return new CacheContinuousQueryEvent<>(cache, cctx, e);
+                    }
+                },
+                new IgnitePredicate<CacheContinuousQueryEntry>() {
+                    @Override public boolean apply(CacheContinuousQueryEntry entry) {
+                        return !entry.isFiltered();
+                    }
                 }
-            }
-        );
+            );
 
-        locLsnr.onUpdated(evts);
+            locLsnr.onUpdated(evts);
+        }
     }
 
     /**
@@ -878,32 +857,6 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
         }
 
         /**
-         * @param e Entry.
-         * @return Continuous query entry.
-         */
-        private CacheContinuousQueryEntry skipEntry(CacheContinuousQueryEntry e) {
-            if (lastFiredCntr.get() > e.updateCounter() || e.updateCounter() == 1L) {
-                e.markFiltered();
-
-                return e;
-            }
-            else {
-                buf.add(e.updateCounter());
-
-                // Double check. If another thread sent a event with counter higher than this event.
-                if (lastFiredCntr.get() > e.updateCounter() && buf.contains(e.updateCounter())) {
-                    buf.remove(e.updateCounter());
-
-                    e.markFiltered();
-
-                    return e;
-                }
-                else
-                    return null;
-            }
-        }
-
-        /**
          * Add continuous entry.
          *
          * @param e Cache continuous query entry.
@@ -1037,7 +990,7 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
                         nodes.addAll(ctx.discovery().cacheNodes(topVer));
 
                     for (ClusterNode node : nodes) {
-                        if (!node.id().equals(ctx.localNodeId())) {
+                        if (!node.id().equals(ctx.localNodeId()) && !node.isClient()) {
                             try {
                                 cctx.io().send(node, msg, GridIoPolicy.SYSTEM_POOL);
                             }

http://git-wip-us.apache.org/repos/asf/ignite/blob/301abcfa/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryListener.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryListener.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryListener.java
index dce04de..6a5c7e6 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryListener.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryListener.java
@@ -67,8 +67,11 @@ public interface CacheContinuousQueryListener<K, V> {
     /**
      * @param evt Event
      * @param topVer Topology version.
+     * @param primary Primary
+     * @param recordIgniteEvt Whether to record event.
      */
-    public void skipUpdateEvent(CacheContinuousQueryEvent<K, V> evt, AffinityTopologyVersion topVer);
+    public void skipUpdateEvent(CacheContinuousQueryEvent<K, V> evt, AffinityTopologyVersion topVer,
+        boolean primary, boolean recordIgniteEvt);
 
     /**
      * @param part Partition.

http://git-wip-us.apache.org/repos/asf/ignite/blob/301abcfa/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
index cc59989..afc3f24 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
@@ -163,7 +163,10 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter {
      * @param topVer Topology version.
      */
     public void skipUpdateEvent(Map<UUID, CacheContinuousQueryListener> lsnrs,
-        KeyCacheObject key, int partId, long updCntr, AffinityTopologyVersion topVer) {
+        KeyCacheObject key,
+        int partId,
+        long updCntr,
+        AffinityTopologyVersion topVer) {
         assert lsnrs != null;
 
         for (CacheContinuousQueryListener lsnr : lsnrs.values()) {
@@ -181,7 +184,44 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter {
             CacheContinuousQueryEvent evt = new CacheContinuousQueryEvent<>(
                 cctx.kernalContext().cache().jcache(cctx.name()), cctx, e0);
 
-            lsnr.skipUpdateEvent(evt, topVer);
+            lsnr.skipUpdateEvent(evt, topVer, true, false);
+        }
+    }
+
+    /**
+     * @param lsnrs Listeners to notify.
+     * @param key Entry key.
+     * @param partId Partition id.
+     * @param updCntr Updated counter.
+     * @param topVer Topology version.
+     * @param primary Primary.
+     * @param recordIgniteEvt Whether to record event.
+     */
+    public void skipUpdateEvent(Map<UUID, CacheContinuousQueryListener> lsnrs,
+        KeyCacheObject key,
+        int partId,
+        long updCntr,
+        boolean primary,
+        boolean recordIgniteEvt,
+        AffinityTopologyVersion topVer) {
+        assert lsnrs != null;
+
+        for (CacheContinuousQueryListener lsnr : lsnrs.values()) {
+            CacheContinuousQueryEntry e0 = new CacheContinuousQueryEntry(
+                cctx.cacheId(),
+                UPDATED,
+                key,
+                null,
+                null,
+                lsnr.keepBinary(),
+                partId,
+                updCntr,
+                topVer);
+
+            CacheContinuousQueryEvent evt = new CacheContinuousQueryEvent<>(
+                cctx.kernalContext().cache().jcache(cctx.name()), cctx, e0);
+
+            lsnr.skipUpdateEvent(evt, topVer, primary, recordIgniteEvt);
         }
     }
 
@@ -277,8 +317,10 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter {
         boolean hasNewVal = newVal != null;
         boolean hasOldVal = oldVal != null;
 
+        boolean recordIgniteEvt = primary && !internal && cctx.gridEvents().isRecordable(EVT_CACHE_QUERY_OBJECT_READ);
+
         if (!hasNewVal && !hasOldVal) {
-            skipUpdateEvent(lsnrCol, key, partId, updateCntr, topVer);
+            skipUpdateEvent(lsnrCol, key, partId, updateCntr, primary, recordIgniteEvt, topVer);
 
             return;
         }
@@ -287,8 +329,6 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter {
 
         boolean initialized = false;
 
-        boolean recordIgniteEvt = primary && !internal && cctx.gridEvents().isRecordable(EVT_CACHE_QUERY_OBJECT_READ);
-
         for (CacheContinuousQueryListener lsnr : lsnrCol.values()) {
             if (preload && !lsnr.notifyExisting())
                 continue;

http://git-wip-us.apache.org/repos/asf/ignite/blob/301abcfa/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryRandomOperationsTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryRandomOperationsTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryRandomOperationsTest.java
index d9b2091..4eb133e 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryRandomOperationsTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryRandomOperationsTest.java
@@ -18,6 +18,7 @@
 package org.apache.ignite.internal.processors.cache.query.continuous;
 
 import java.io.Serializable;
+import java.util.Map;
 import java.util.Random;
 import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.BlockingQueue;
@@ -31,10 +32,12 @@ import javax.cache.integration.CacheLoaderException;
 import javax.cache.integration.CacheWriterException;
 import javax.cache.processor.EntryProcessor;
 import javax.cache.processor.MutableEntry;
+import org.apache.ignite.Ignite;
 import org.apache.ignite.IgniteCache;
 import org.apache.ignite.cache.CacheAtomicityMode;
 import org.apache.ignite.cache.CacheMemoryMode;
 import org.apache.ignite.cache.CacheMode;
+import org.apache.ignite.cache.affinity.Affinity;
 import org.apache.ignite.cache.query.ContinuousQuery;
 import org.apache.ignite.cache.query.QueryCursor;
 import org.apache.ignite.cache.store.CacheStore;
@@ -46,6 +49,7 @@ import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
 import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
 import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
 import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.apache.ignite.transactions.Transaction;
 
 import static java.util.concurrent.TimeUnit.MILLISECONDS;
 import static java.util.concurrent.TimeUnit.SECONDS;
@@ -76,6 +80,9 @@ public class CacheContinuousQueryRandomOperationsTest extends GridCommonAbstract
     private static final int VALS = 10;
 
     /** */
+    public static final int ITERATION_CNT = 1000;
+
+    /** */
     private boolean client;
 
     /** {@inheritDoc} */
@@ -110,6 +117,19 @@ public class CacheContinuousQueryRandomOperationsTest extends GridCommonAbstract
     /**
      * @throws Exception If failed.
      */
+    public void testAtomicClient() throws Exception {
+        CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED,
+            1,
+            ATOMIC,
+            ONHEAP_TIERED,
+            false);
+
+        testContinuousQuery(ccfg, true, false);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
     public void testAtomic() throws Exception {
         CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED,
             1,
@@ -117,7 +137,7 @@ public class CacheContinuousQueryRandomOperationsTest extends GridCommonAbstract
             ONHEAP_TIERED,
             false);
 
-        testContinuousQuery(ccfg);
+        testContinuousQuery(ccfg, false, false);
     }
 
     /**
@@ -130,7 +150,20 @@ public class CacheContinuousQueryRandomOperationsTest extends GridCommonAbstract
             ONHEAP_TIERED,
             false);
 
-        testContinuousQuery(ccfg);
+        testContinuousQuery(ccfg, false, false);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testAtomicReplicatedClient() throws Exception {
+        CacheConfiguration<Object, Object> ccfg = cacheConfiguration(REPLICATED,
+            0,
+            ATOMIC,
+            ONHEAP_TIERED,
+            false);
+
+        testContinuousQuery(ccfg, true, false);
     }
 
     /**
@@ -143,7 +176,20 @@ public class CacheContinuousQueryRandomOperationsTest extends GridCommonAbstract
             OFFHEAP_VALUES,
             false);
 
-        testContinuousQuery(ccfg);
+        testContinuousQuery(ccfg, false, false);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testAtomicOffheapValuesClient() throws Exception {
+        CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED,
+            1,
+            ATOMIC,
+            OFFHEAP_VALUES,
+            false);
+
+        testContinuousQuery(ccfg, true, false);
     }
 
     /**
@@ -156,7 +202,20 @@ public class CacheContinuousQueryRandomOperationsTest extends GridCommonAbstract
             OFFHEAP_TIERED,
             false);
 
-        testContinuousQuery(ccfg);
+        testContinuousQuery(ccfg, false, false);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testAtomicOffheapTieredClient() throws Exception {
+        CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED,
+            1,
+            ATOMIC,
+            OFFHEAP_TIERED,
+            false);
+
+        testContinuousQuery(ccfg, true, false);
     }
 
     /**
@@ -169,7 +228,20 @@ public class CacheContinuousQueryRandomOperationsTest extends GridCommonAbstract
             ONHEAP_TIERED,
             false);
 
-        testContinuousQuery(ccfg);
+        testContinuousQuery(ccfg, false, false);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testAtomicNoBackupsClient() throws Exception {
+        CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED,
+            0,
+            ATOMIC,
+            ONHEAP_TIERED,
+            false);
+
+        testContinuousQuery(ccfg, true, false);
     }
 
     /**
@@ -182,7 +254,46 @@ public class CacheContinuousQueryRandomOperationsTest extends GridCommonAbstract
             ONHEAP_TIERED,
             false);
 
-        testContinuousQuery(ccfg);
+        testContinuousQuery(ccfg, false, false);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testTxExplicit() throws Exception {
+        CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED,
+            1,
+            TRANSACTIONAL,
+            ONHEAP_TIERED,
+            false);
+
+        testContinuousQuery(ccfg, false, true);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testTxClient() throws Exception {
+        CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED,
+            1,
+            TRANSACTIONAL,
+            ONHEAP_TIERED,
+            false);
+
+        testContinuousQuery(ccfg, true, false);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testTxClientExplicit() throws Exception {
+        CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED,
+            1,
+            TRANSACTIONAL,
+            ONHEAP_TIERED,
+            false);
+
+        testContinuousQuery(ccfg, true, false);
     }
 
     /**
@@ -195,7 +306,20 @@ public class CacheContinuousQueryRandomOperationsTest extends GridCommonAbstract
             ONHEAP_TIERED,
             false);
 
-        testContinuousQuery(ccfg);
+        testContinuousQuery(ccfg, false, false);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testTxReplicatedClient() throws Exception {
+        CacheConfiguration<Object, Object> ccfg = cacheConfiguration(REPLICATED,
+            0,
+            TRANSACTIONAL,
+            ONHEAP_TIERED,
+            false);
+
+        testContinuousQuery(ccfg, true, false);
     }
 
     /**
@@ -208,7 +332,33 @@ public class CacheContinuousQueryRandomOperationsTest extends GridCommonAbstract
             OFFHEAP_VALUES,
             false);
 
-        testContinuousQuery(ccfg);
+        testContinuousQuery(ccfg, false, false);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testTxOffheapValuesExplicit() throws Exception {
+        CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED,
+            1,
+            TRANSACTIONAL,
+            OFFHEAP_VALUES,
+            false);
+
+        testContinuousQuery(ccfg, false, true);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testTxOffheapValuesClient() throws Exception {
+        CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED,
+            1,
+            TRANSACTIONAL,
+            OFFHEAP_VALUES,
+            false);
+
+        testContinuousQuery(ccfg, true, false);
     }
 
     /**
@@ -221,7 +371,33 @@ public class CacheContinuousQueryRandomOperationsTest extends GridCommonAbstract
             OFFHEAP_TIERED,
             false);
 
-        testContinuousQuery(ccfg);
+        testContinuousQuery(ccfg, false, false);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testTxOffheapTieredClient() throws Exception {
+        CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED,
+            1,
+            TRANSACTIONAL,
+            OFFHEAP_TIERED,
+            false);
+
+        testContinuousQuery(ccfg, true, false);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testTxOffheapTieredClientExplicit() throws Exception {
+        CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED,
+            1,
+            TRANSACTIONAL,
+            OFFHEAP_TIERED,
+            false);
+
+        testContinuousQuery(ccfg, true, true);
     }
 
     /**
@@ -234,18 +410,52 @@ public class CacheContinuousQueryRandomOperationsTest extends GridCommonAbstract
             ONHEAP_TIERED,
             false);
 
-        testContinuousQuery(ccfg);
+        testContinuousQuery(ccfg, false, false);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testTxNoBackupsExplicit() throws Exception {
+        CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED,
+            0,
+            TRANSACTIONAL,
+            ONHEAP_TIERED,
+            false);
+
+        testContinuousQuery(ccfg, false, false);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testTxNoBackupsClient() throws Exception {
+        CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED,
+            0,
+            TRANSACTIONAL,
+            ONHEAP_TIERED,
+            false);
+
+        testContinuousQuery(ccfg, true, false);
     }
 
     /**
      * @param ccfg Cache configuration.
+     * @param client Client.
+     * @param expTx Explicit tx.
      * @throws Exception If failed.
      */
-    private void testContinuousQuery(CacheConfiguration<Object, Object> ccfg) throws Exception {
+    private void testContinuousQuery(CacheConfiguration<Object, Object> ccfg, boolean client, boolean expTx)
+        throws Exception {
         ignite(0).createCache(ccfg);
 
         try {
-            IgniteCache<Object, Object> cache = ignite(NODES - 1).cache(ccfg.getName());
+            IgniteCache<Object, Object> cache;
+
+            if (client)
+                cache = ignite(NODES - 1).cache(ccfg.getName());
+            else
+                cache = ignite(0).cache(ccfg.getName());
 
             long seed = System.currentTimeMillis();
 
@@ -256,15 +466,12 @@ public class CacheContinuousQueryRandomOperationsTest extends GridCommonAbstract
             ContinuousQuery<Object, Object> qry = new ContinuousQuery<>();
 
             final BlockingQueue<CacheEntryEvent<?, ?>> evtsQueue =
-                new ArrayBlockingQueue<>(10_000);
+                new ArrayBlockingQueue<>(50_000);
 
             qry.setLocalListener(new CacheEntryUpdatedListener<Object, Object>() {
                 @Override public void onUpdated(Iterable<CacheEntryEvent<?, ?>> evts) {
-                    for (CacheEntryEvent<?, ?> evt : evts) {
-                        // System.out.println("Event: " + evt);
-
+                    for (CacheEntryEvent<?, ?> evt : evts)
                         evtsQueue.add(evt);
-                    }
                 }
             });
 
@@ -272,12 +479,14 @@ public class CacheContinuousQueryRandomOperationsTest extends GridCommonAbstract
 
             ConcurrentMap<Object, Object> expData = new ConcurrentHashMap<>();
 
+            Map<Integer, Long> partCntr = new ConcurrentHashMap<>();
+
             try {
-                for (int i = 0; i < 1000; i++) {
+                for (int i = 0; i < ITERATION_CNT; i++) {
                     if (i % 100 == 0)
                         log.info("Iteration: " + i);
 
-                    randomUpdate(rnd, evtsQueue, expData, cache);
+                    randomUpdate(rnd, evtsQueue, expData, partCntr, cache, expTx);
                 }
             }
             finally {
@@ -293,14 +502,18 @@ public class CacheContinuousQueryRandomOperationsTest extends GridCommonAbstract
      * @param rnd Random generator.
      * @param evtsQueue Events queue.
      * @param expData Expected cache data.
+     * @param partCntr Partition counter.
      * @param cache Cache.
+     * @param expTx Explicit TX.
      * @throws Exception If failed.
      */
     private void randomUpdate(
         Random rnd,
         BlockingQueue<CacheEntryEvent<?, ?>> evtsQueue,
         ConcurrentMap<Object, Object> expData,
-        IgniteCache<Object, Object> cache)
+        Map<Integer, Long> partCntr,
+        IgniteCache<Object, Object> cache,
+        boolean expTx)
         throws Exception {
         Object key = new QueryTestKey(rnd.nextInt(KEYS));
         Object newVal = value(rnd);
@@ -308,159 +521,250 @@ public class CacheContinuousQueryRandomOperationsTest extends GridCommonAbstract
 
         int op = rnd.nextInt(11);
 
-        // log.info("Random operation [key=" + key + ", op=" + op + ']');
+        Ignite ignite = cache.unwrap(Ignite.class);
 
-        switch (op) {
-            case 0: {
-                cache.put(key, newVal);
+        Transaction tx = null;
 
-                waitEvent(evtsQueue, key, newVal, oldVal);
+        if (expTx && cache.getConfiguration(CacheConfiguration.class).getAtomicityMode() == TRANSACTIONAL)
+            tx = ignite.transactions().txStart();
 
-                expData.put(key, newVal);
+        try {
+            // log.info("Random operation [key=" + key + ", op=" + op + ']');
 
-                break;
-            }
+            switch (op) {
+                case 0: {
+                    cache.put(key, newVal);
 
-            case 1: {
-                cache.getAndPut(key, newVal);
+                    if (tx != null)
+                        tx.commit();
 
-                waitEvent(evtsQueue, key, newVal, oldVal);
+                    updatePartitionCounter(cache, key, partCntr);
 
-                expData.put(key, newVal);
+                    waitAndCheckEvent(evtsQueue, partCntr, affinity(cache), key, newVal, oldVal);
 
-                break;
-            }
+                    expData.put(key, newVal);
 
-            case 2: {
-                cache.remove(key);
+                    break;
+                }
 
-                waitEvent(evtsQueue, key, null, oldVal);
+                case 1: {
+                    cache.getAndPut(key, newVal);
 
-                expData.remove(key);
+                    if (tx != null)
+                        tx.commit();
 
-                break;
-            }
+                    updatePartitionCounter(cache, key, partCntr);
 
-            case 3: {
-                cache.getAndRemove(key);
+                    waitAndCheckEvent(evtsQueue, partCntr, affinity(cache), key, newVal, oldVal);
 
-                waitEvent(evtsQueue, key, null, oldVal);
+                    expData.put(key, newVal);
 
-                expData.remove(key);
+                    break;
+                }
 
-                break;
-            }
+                case 2: {
+                    cache.remove(key);
 
-            case 4: {
-                cache.invoke(key, new EntrySetValueProcessor(newVal, rnd.nextBoolean()));
+                    if (tx != null)
+                        tx.commit();
 
-                waitEvent(evtsQueue, key, newVal, oldVal);
+                    updatePartitionCounter(cache, key, partCntr);
 
-                expData.put(key, newVal);
+                    waitAndCheckEvent(evtsQueue, partCntr, affinity(cache), key, null, oldVal);
 
-                break;
-            }
+                    expData.remove(key);
 
-            case 5: {
-                cache.invoke(key, new EntrySetValueProcessor(null, rnd.nextBoolean()));
+                    break;
+                }
 
-                waitEvent(evtsQueue, key, null, oldVal);
+                case 3: {
+                    cache.getAndRemove(key);
 
-                expData.remove(key);
+                    if (tx != null)
+                        tx.commit();
 
-                break;
-            }
+                    updatePartitionCounter(cache, key, partCntr);
 
-            case 6: {
-                cache.putIfAbsent(key, newVal);
+                    waitAndCheckEvent(evtsQueue, partCntr, affinity(cache), key, null, oldVal);
 
-                if (oldVal == null) {
-                    waitEvent(evtsQueue, key, newVal, null);
+                    expData.remove(key);
 
-                    expData.put(key, newVal);
+                    break;
                 }
-                else
-                    checkNoEvent(evtsQueue);
 
-                break;
-            }
+                case 4: {
+                    cache.invoke(key, new EntrySetValueProcessor(newVal, rnd.nextBoolean()));
 
-            case 7: {
-                cache.getAndPutIfAbsent(key, newVal);
+                    if (tx != null)
+                        tx.commit();
 
-                if (oldVal == null) {
-                    waitEvent(evtsQueue, key, newVal, null);
+                    updatePartitionCounter(cache, key, partCntr);
+
+                    waitAndCheckEvent(evtsQueue, partCntr, affinity(cache), key, newVal, oldVal);
 
                     expData.put(key, newVal);
+
+                    break;
                 }
-                else
-                    checkNoEvent(evtsQueue);
 
-                break;
-            }
+                case 5: {
+                    cache.invoke(key, new EntrySetValueProcessor(null, rnd.nextBoolean()));
 
-            case 8: {
-                cache.replace(key, newVal);
+                    if (tx != null)
+                        tx.commit();
 
-                if (oldVal != null) {
-                    waitEvent(evtsQueue, key, newVal, oldVal);
+                    updatePartitionCounter(cache, key, partCntr);
 
-                    expData.put(key, newVal);
+                    waitAndCheckEvent(evtsQueue, partCntr, affinity(cache), key, null, oldVal);
+
+                    expData.remove(key);
+
+                    break;
                 }
-                else
-                    checkNoEvent(evtsQueue);
 
-                break;
-            }
+                case 6: {
+                    cache.putIfAbsent(key, newVal);
 
-            case 9: {
-                cache.getAndReplace(key, newVal);
+                    if (tx != null)
+                        tx.commit();
 
-                if (oldVal != null) {
-                    waitEvent(evtsQueue, key, newVal, oldVal);
+                    if (oldVal == null) {
+                        updatePartitionCounter(cache, key, partCntr);
 
-                    expData.put(key, newVal);
+                        waitAndCheckEvent(evtsQueue, partCntr, affinity(cache), key, newVal, null);
+
+                        expData.put(key, newVal);
+                    }
+                    else
+                        checkNoEvent(evtsQueue);
+
+                    break;
                 }
-                else
-                    checkNoEvent(evtsQueue);
 
-                break;
-            }
+                case 7: {
+                    cache.getAndPutIfAbsent(key, newVal);
+
+                    if (tx != null)
+                        tx.commit();
+
+                    if (oldVal == null) {
+                        updatePartitionCounter(cache, key, partCntr);
+
+                        waitAndCheckEvent(evtsQueue, partCntr, affinity(cache), key, newVal, null);
+
+                        expData.put(key, newVal);
+                    }
+                    else
+                        checkNoEvent(evtsQueue);
+
+                    break;
+                }
+
+                case 8: {
+                    cache.replace(key, newVal);
+
+                    if (tx != null)
+                        tx.commit();
+
+                    if (oldVal != null) {
+                        updatePartitionCounter(cache, key, partCntr);
+
+                        waitAndCheckEvent(evtsQueue, partCntr, affinity(cache), key, newVal, oldVal);
+
+                        expData.put(key, newVal);
+                    }
+                    else
+                        checkNoEvent(evtsQueue);
 
-            case 10: {
-                if (oldVal != null) {
-                    Object replaceVal = value(rnd);
+                    break;
+                }
+
+                case 9: {
+                    cache.getAndReplace(key, newVal);
 
-                    boolean success = replaceVal.equals(oldVal);
+                    if (tx != null)
+                        tx.commit();
 
-                    if (success) {
-                        cache.replace(key, replaceVal, newVal);
+                    if (oldVal != null) {
+                        updatePartitionCounter(cache, key, partCntr);
 
-                        waitEvent(evtsQueue, key, newVal, oldVal);
+                        waitAndCheckEvent(evtsQueue, partCntr, affinity(cache), key, newVal, oldVal);
 
                         expData.put(key, newVal);
                     }
+                    else
+                        checkNoEvent(evtsQueue);
+
+                    break;
+                }
+
+                case 10: {
+                    if (oldVal != null) {
+                        Object replaceVal = value(rnd);
+
+                        boolean success = replaceVal.equals(oldVal);
+
+                        if (success) {
+                            cache.replace(key, replaceVal, newVal);
+
+                            if (tx != null)
+                                tx.commit();
+
+                            updatePartitionCounter(cache, key, partCntr);
+
+                            waitAndCheckEvent(evtsQueue, partCntr, affinity(cache), key, newVal, oldVal);
+
+                            expData.put(key, newVal);
+                        }
+                        else {
+                            cache.replace(key, replaceVal, newVal);
+
+                            if (tx != null)
+                                tx.commit();
+
+                            checkNoEvent(evtsQueue);
+                        }
+                    }
                     else {
-                        cache.replace(key, replaceVal, newVal);
+                        cache.replace(key, value(rnd), newVal);
+
+                        if (tx != null)
+                            tx.commit();
 
                         checkNoEvent(evtsQueue);
                     }
-                }
-                else {
-                    cache.replace(key, value(rnd), newVal);
 
-                    checkNoEvent(evtsQueue);
+                    break;
                 }
 
-                break;
+                default:
+                    fail();
             }
-
-            default:
-                fail();
+        } finally {
+            if (tx != null)
+                tx.close();
         }
     }
 
     /**
+     * @param cache Cache.
+     * @param key Key
+     * @param cntrs Partition counters.
+     */
+    private void updatePartitionCounter(IgniteCache<Object, Object> cache, Object key, Map<Integer, Long> cntrs) {
+        Affinity<Object> aff = cache.unwrap(Ignite.class).affinity(cache.getName());
+
+        int part = aff.partition(key);
+
+        Long partCntr = cntrs.get(part);
+
+        if (partCntr == null)
+            partCntr = 0L;
+
+        cntrs.put(part, ++partCntr);
+    }
+
+    /**
      * @param rnd Random generator.
      * @return Cache value.
      */
@@ -470,13 +774,19 @@ public class CacheContinuousQueryRandomOperationsTest extends GridCommonAbstract
 
     /**
      * @param evtsQueue Event queue.
+     * @param partCntrs Partition counters.
+     * @param aff Affinity function.
      * @param key Key.
      * @param val Value.
      * @param oldVal Old value.
      * @throws Exception If failed.
      */
-    private void waitEvent(BlockingQueue<CacheEntryEvent<?, ?>> evtsQueue,
-        Object key, Object val, Object oldVal) throws Exception {
+    private void waitAndCheckEvent(BlockingQueue<CacheEntryEvent<?, ?>> evtsQueue,
+        Map<Integer, Long> partCntrs,
+        Affinity<Object> aff,
+        Object key,
+        Object val,
+        Object oldVal) throws Exception {
         if (val == null && oldVal == null) {
             checkNoEvent(evtsQueue);
 
@@ -485,12 +795,15 @@ public class CacheContinuousQueryRandomOperationsTest extends GridCommonAbstract
 
         CacheEntryEvent<?, ?> evt = evtsQueue.poll(5, SECONDS);
 
-        assertNotNull("Failed to wait for event [key=" + key +
-            ", val=" + val +
-            ", oldVal=" + oldVal + ']', evt);
+        assertNotNull("Failed to wait for event [key=" + key + ", val=" + val + ", oldVal=" + oldVal + ']', evt);
         assertEquals(key, evt.getKey());
         assertEquals(val, evt.getValue());
         assertEquals(oldVal, evt.getOldValue());
+
+        Long cntr = partCntrs.get(aff.partition(key));
+
+        assertNotNull(cntr);
+        assertEquals(cntr, evt.unwrap(Long.class));
     }
 
     /**
@@ -644,6 +957,7 @@ public class CacheContinuousQueryRandomOperationsTest extends GridCommonAbstract
             return S.toString(QueryTestValue.class, this);
         }
     }
+
     /**
      *
      */
@@ -680,5 +994,4 @@ public class CacheContinuousQueryRandomOperationsTest extends GridCommonAbstract
             return S.toString(EntrySetValueProcessor.class, this);
         }
     }
-
 }