You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by nt...@apache.org on 2016/03/01 12:09:41 UTC

[15/20] ignite git commit: IGNITE-1186 WIP

IGNITE-1186 WIP


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/9ad476b2
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/9ad476b2
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/9ad476b2

Branch: refs/heads/ignite-1186
Commit: 9ad476b21a1f6f9d52d8efcaad346a4ac85ea73b
Parents: e80e906
Author: nikolay_tikhonov <nt...@gridgain.com>
Authored: Mon Feb 29 20:56:35 2016 +0300
Committer: nikolay_tikhonov <nt...@gridgain.com>
Committed: Mon Feb 29 20:56:35 2016 +0300

----------------------------------------------------------------------
 .../ignite/cache/query/ContinuousQuery.java     |   10 +-
 .../internal/GridEventConsumeHandler.java       |    6 +
 .../internal/GridMessageListenHandler.java      |    6 +
 .../processors/cache/IgniteCacheProxy.java      |   13 +-
 .../continuous/CacheContinuousQueryHandler.java |    9 +-
 .../CacheContinuousQueryHandlerV2.java          |   20 +-
 .../continuous/CacheContinuousQueryManager.java |  263 ++++-
 .../continuous/GridContinuousHandler.java       |    6 +
 .../IgniteCacheEntryListenerAbstractTest.java   |   66 +-
 .../CacheContinuousQueryFactoryFilterTest.java  | 1002 +++---------------
 .../CacheContinuousQueryOperationP2PTest.java   |  394 +++++++
 ...acheContinuousQueryRandomOperationsTest.java |   10 +-
 .../p2p/CacheDeploymentEntryEventFilter.java    |   33 +
 .../CacheDeploymentEntryEventFilterFactory.java |   31 +
 14 files changed, 924 insertions(+), 945 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/9ad476b2/modules/core/src/main/java/org/apache/ignite/cache/query/ContinuousQuery.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/cache/query/ContinuousQuery.java b/modules/core/src/main/java/org/apache/ignite/cache/query/ContinuousQuery.java
index 452735e..3ea8f93 100644
--- a/modules/core/src/main/java/org/apache/ignite/cache/query/ContinuousQuery.java
+++ b/modules/core/src/main/java/org/apache/ignite/cache/query/ContinuousQuery.java
@@ -19,6 +19,7 @@ package org.apache.ignite.cache.query;
 
 import javax.cache.Cache;
 import javax.cache.configuration.Factory;
+import javax.cache.event.CacheEntryEventFilter;
 import javax.cache.event.CacheEntryUpdatedListener;
 import org.apache.ignite.IgniteCache;
 import org.apache.ignite.cache.CacheEntryEventSerializableFilter;
@@ -121,7 +122,7 @@ public final class ContinuousQuery<K, V> extends Query<Cache.Entry<K, V>> {
     private CacheEntryEventSerializableFilter<K, V> rmtFilter;
 
     /** Remote filter factory. */
-    private Factory<? extends CacheEntryEventSerializableFilter<K, V>> rmtFilterFactory;
+    private Factory<? extends CacheEntryEventFilter<K, V>> rmtFilterFactory;
 
     /** Time interval. */
     private long timeInterval = DFLT_TIME_INTERVAL;
@@ -200,7 +201,10 @@ public final class ContinuousQuery<K, V> extends Query<Cache.Entry<K, V>> {
      *
      * @param rmtFilter Key-value filter.
      * @return {@code this} for chaining.
+     *
+     * @deprecated Use {@link #setRemoteFilterFactory(Factory)} instead.
      */
+    @Deprecated
     public ContinuousQuery<K, V> setRemoteFilter(CacheEntryEventSerializableFilter<K, V> rmtFilter) {
         this.rmtFilter = rmtFilter;
 
@@ -228,7 +232,7 @@ public final class ContinuousQuery<K, V> extends Query<Cache.Entry<K, V>> {
      * @return {@code this} for chaining.
      */
     public ContinuousQuery<K, V> setRemoteFilterFactory(
-        Factory<? extends CacheEntryEventSerializableFilter<K, V>> rmtFilterFactory) {
+        Factory<? extends CacheEntryEventFilter<K, V>> rmtFilterFactory) {
         this.rmtFilterFactory = rmtFilterFactory;
 
         return this;
@@ -239,7 +243,7 @@ public final class ContinuousQuery<K, V> extends Query<Cache.Entry<K, V>> {
      *
      * @return Remote filter.
      */
-    public Factory<? extends CacheEntryEventSerializableFilter<K, V>> getRemoteFilterFactory() {
+    public Factory<? extends CacheEntryEventFilter<K, V>> getRemoteFilterFactory() {
         return rmtFilterFactory;
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/9ad476b2/modules/core/src/main/java/org/apache/ignite/internal/GridEventConsumeHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridEventConsumeHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/GridEventConsumeHandler.java
index e2b1184..924a8ea 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/GridEventConsumeHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/GridEventConsumeHandler.java
@@ -26,6 +26,7 @@ import java.util.LinkedList;
 import java.util.Map;
 import java.util.Queue;
 import java.util.UUID;
+import javax.cache.event.CacheEntryEventFilter;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.events.CacheEvent;
@@ -141,6 +142,11 @@ class GridEventConsumeHandler implements GridContinuousHandler {
     }
 
     /** {@inheritDoc} */
+    @Override public CacheEntryEventFilter getEventFilter() {
+        return null;
+    }
+
+    /** {@inheritDoc} */
     @Override public RegisterStatus register(final UUID nodeId, final UUID routineId, final GridKernalContext ctx)
         throws IgniteCheckedException {
         assert nodeId != null;

http://git-wip-us.apache.org/repos/asf/ignite/blob/9ad476b2/modules/core/src/main/java/org/apache/ignite/internal/GridMessageListenHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridMessageListenHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/GridMessageListenHandler.java
index 402365c..e157c98 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/GridMessageListenHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/GridMessageListenHandler.java
@@ -24,6 +24,7 @@ import java.io.ObjectOutput;
 import java.util.Collection;
 import java.util.Map;
 import java.util.UUID;
+import javax.cache.event.CacheEntryEventFilter;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.internal.managers.deployment.GridDeployment;
 import org.apache.ignite.internal.managers.deployment.GridDeploymentInfoBean;
@@ -130,6 +131,11 @@ public class GridMessageListenHandler implements GridContinuousHandler {
     }
 
     /** {@inheritDoc} */
+    @Override public CacheEntryEventFilter getEventFilter() {
+        return null;
+    }
+
+    /** {@inheritDoc} */
     @Override public RegisterStatus register(UUID nodeId, UUID routineId, final GridKernalContext ctx)
         throws IgniteCheckedException {
         ctx.io().addUserMessageListener(topic, pred);

http://git-wip-us.apache.org/repos/asf/ignite/blob/9ad476b2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
index 9e9b985..690e0b4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
@@ -97,16 +97,6 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V
     /** */
     private static final long serialVersionUID = 0L;
 
-    /** */
-    private static final IgniteBiPredicate ACCEPT_ALL = new IgniteBiPredicate() {
-        /** */
-        private static final long serialVersionUID = -1640538788290240617L;
-
-        @Override public boolean apply(Object k, Object v) {
-            return true;
-        }
-    };
-
     /** Context. */
     private GridCacheContext<K, V> ctx;
 
@@ -565,6 +555,9 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V
         if (qry.getLocalListener() == null)
             throw new IgniteException("Mandatory local listener is not set for the query: " + qry);
 
+        if (qry.getRemoteFilter() != null && qry.getRemoteFilterFactory() != null)
+            throw new IgniteException("Should be used either RemoterFilter or RemoteFilterFactory.");
+
         try {
             final UUID routineId = ctx.continuousQueries().executeQuery(
                 qry.getLocalListener(),

http://git-wip-us.apache.org/repos/asf/ignite/blob/9ad476b2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
index 47f5c52..393f7fb 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
@@ -25,7 +25,6 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
-import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
@@ -263,7 +262,7 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
         if (locLsnr != null)
             ctx.resource().injectGeneric(locLsnr);
 
-        final CacheEntryEventFilter filter = getRemoteFilter();
+        final CacheEntryEventFilter filter = getEventFilter();
 
         if (filter != null)
             ctx.resource().injectGeneric(filter);
@@ -521,10 +520,8 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
         return mgr.registerListener(routineId, lsnr, internal);
     }
 
-    /**
-     * @return Remote filter.
-     */
-    protected CacheEntryEventFilter<K, V> getRemoteFilter() {
+    /** {@inheritDoc} */
+    public CacheEntryEventFilter getEventFilter() {
         return rmtFilter;
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/9ad476b2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandlerV2.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandlerV2.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandlerV2.java
index 4573e6c..628e1c3 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandlerV2.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandlerV2.java
@@ -26,8 +26,10 @@ import javax.cache.configuration.Factory;
 import javax.cache.event.CacheEntryEventFilter;
 import javax.cache.event.CacheEntryUpdatedListener;
 import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.cache.CacheEntryEventSerializableFilter;
 import org.apache.ignite.internal.GridKernalContext;
+import org.apache.ignite.internal.IgniteDeploymentCheckedException;
+import org.apache.ignite.internal.managers.deployment.GridDeployment;
+import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryManager.JCacheRemoteQueryFactory;
 import org.apache.ignite.internal.processors.continuous.GridContinuousHandler;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.U;
@@ -40,13 +42,13 @@ public class CacheContinuousQueryHandlerV2<K, V> extends CacheContinuousQueryHan
     private static final long serialVersionUID = 0L;
 
     /** Remote filter factory. */
-    private Factory<? extends CacheEntryEventSerializableFilter<K, V>> rmtFilterFactory;
+    private Factory<? extends CacheEntryEventFilter> rmtFilterFactory;
 
     /** Deployable object for filter factory. */
     private DeployableObject rmtFilterFactoryDep;
 
     /** */
-    protected transient CacheEntryEventFilter<K, V> rmtNonSerFilter;
+    protected transient CacheEntryEventFilter filter;
 
     /**
      * Required by {@link Externalizable}.
@@ -76,7 +78,7 @@ public class CacheContinuousQueryHandlerV2<K, V> extends CacheContinuousQueryHan
         String cacheName,
         Object topic,
         CacheEntryUpdatedListener<K, V> locLsnr,
-        Factory<? extends CacheEntryEventSerializableFilter<K, V>> rmtFilterFactory,
+        Factory<? extends CacheEntryEventFilter<K, V>> rmtFilterFactory,
         boolean internal,
         boolean notifyExisting,
         boolean oldValRequired,
@@ -108,14 +110,14 @@ public class CacheContinuousQueryHandlerV2<K, V> extends CacheContinuousQueryHan
     }
 
     /** {@inheritDoc} */
-    @Override protected CacheEntryEventFilter<K, V> getRemoteFilter() {
-        if (rmtNonSerFilter == null) {
+    @Override public CacheEntryEventFilter getEventFilter() {
+        if (filter == null) {
             assert rmtFilterFactory != null;
 
-            rmtNonSerFilter = rmtFilterFactory.create();
+            filter = rmtFilterFactory.create();
         }
 
-        return rmtNonSerFilter;
+        return filter;
     }
 
     /** {@inheritDoc} */
@@ -168,6 +170,6 @@ public class CacheContinuousQueryHandlerV2<K, V> extends CacheContinuousQueryHan
         if (b)
             rmtFilterFactoryDep = (DeployableObject)in.readObject();
         else
-            rmtFilterFactory = (Factory<CacheEntryEventSerializableFilter<K, V>>)in.readObject();
+            rmtFilterFactory = (Factory)in.readObject();
     }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/9ad476b2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
index fd4d71e..33d6d59 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
@@ -21,10 +21,11 @@ import java.io.Externalizable;
 import java.io.IOException;
 import java.io.ObjectInput;
 import java.io.ObjectOutput;
+import java.io.Serializable;
 import java.util.ArrayList;
-import java.util.Map;
 import java.util.Collection;
 import java.util.Iterator;
+import java.util.Map;
 import java.util.NoSuchElementException;
 import java.util.UUID;
 import java.util.concurrent.ConcurrentMap;
@@ -415,29 +416,44 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter {
      * @throws IgniteCheckedException In case of error.
      */
     public UUID executeQuery(CacheEntryUpdatedListener locLsnr,
-        CacheEntryEventSerializableFilter rmtFilter,
-        Factory<? extends CacheEntryEventSerializableFilter> rmtFilterFactory,
+        @Nullable CacheEntryEventSerializableFilter rmtFilter,
+        @Nullable Factory<? extends CacheEntryEventSerializableFilter> rmtFilterFactory,
         int bufSize,
         long timeInterval,
         boolean autoUnsubscribe,
         boolean loc,
         boolean keepBinary) throws IgniteCheckedException
     {
-        return executeQuery0(
-            locLsnr,
-            rmtFilter,
-            rmtFilterFactory,
-            bufSize,
-            timeInterval,
-            autoUnsubscribe,
-            false,
-            false,
-            true,
-            false,
-            true,
-            loc,
-            keepBinary,
-            false);
+        if (rmtFilterFactory != null)
+            return executeQueryWithFilterFactory(
+                locLsnr,
+                rmtFilterFactory,
+                bufSize,
+                timeInterval,
+                autoUnsubscribe,
+                false,
+                false,
+                true,
+                false,
+                true,
+                loc,
+                keepBinary,
+                false);
+        else
+            return executeQueryWithFilter(
+                locLsnr,
+                rmtFilter,
+                bufSize,
+                timeInterval,
+                autoUnsubscribe,
+                false,
+                false,
+                true,
+                false,
+                true,
+                loc,
+                keepBinary,
+                false);
     }
 
     /**
@@ -455,10 +471,9 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter {
         boolean ignoreClassNotFound)
         throws IgniteCheckedException
     {
-        return executeQuery0(
+        return executeQueryWithFilter(
             locLsnr,
             rmtFilter,
-            null,
             ContinuousQuery.DFLT_PAGE_SIZE,
             ContinuousQuery.DFLT_TIME_INTERVAL,
             ContinuousQuery.DFLT_AUTO_UNSUBSCRIBE,
@@ -556,8 +571,162 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter {
      * @return Continuous routine ID.
      * @throws IgniteCheckedException In case of error.
      */
-    private UUID executeQuery0(CacheEntryUpdatedListener locLsnr,
+    private UUID executeQueryWithFilter(CacheEntryUpdatedListener locLsnr,
         final CacheEntryEventSerializableFilter rmtFilter,
+        int bufSize,
+        long timeInterval,
+        boolean autoUnsubscribe,
+        boolean internal,
+        boolean notifyExisting,
+        boolean oldValRequired,
+        boolean sync,
+        boolean ignoreExpired,
+        boolean loc,
+        final boolean keepBinary,
+        boolean ignoreClassNotFound) throws IgniteCheckedException
+    {
+        cctx.checkSecurity(SecurityPermission.CACHE_READ);
+
+        int taskNameHash = !internal && cctx.kernalContext().security().enabled() ?
+            cctx.kernalContext().job().currentTaskNameHash() : 0;
+
+        boolean skipPrimaryCheck = loc && cctx.config().getCacheMode() == CacheMode.REPLICATED && cctx.affinityNode();
+
+        GridContinuousHandler hnd = new CacheContinuousQueryHandler(
+            cctx.name(),
+            TOPIC_CACHE.topic(topicPrefix, cctx.localNodeId(), seq.getAndIncrement()),
+            locLsnr,
+            rmtFilter,
+            internal,
+            notifyExisting,
+            oldValRequired,
+            sync,
+            ignoreExpired,
+            taskNameHash,
+            skipPrimaryCheck,
+            cctx.isLocal(),
+            keepBinary,
+            ignoreClassNotFound);
+
+        return executeQuery0(locLsnr,
+            bufSize,
+            timeInterval,
+            autoUnsubscribe,
+            notifyExisting,
+            loc,
+            keepBinary,
+            hnd);
+    }
+
+
+    /**
+     * @param locLsnr Local listener.
+     * @param bufSize Buffer size.
+     * @param timeInterval Time interval.
+     * @param autoUnsubscribe Auto unsubscribe flag.
+     * @param internal Internal flag.
+     * @param notifyExisting Notify existing flag.
+     * @param oldValRequired Old value required flag.
+     * @param sync Synchronous flag.
+     * @param ignoreExpired Ignore expired event flag.
+     * @param loc Local flag.
+     * @return Continuous routine ID.
+     * @throws IgniteCheckedException In case of error.
+     */
+    private UUID executeJCacheQueryFactory(CacheEntryUpdatedListener locLsnr,
+        final JCacheRemoteQueryFactory rmtFilterFactory,
+        int bufSize,
+        long timeInterval,
+        boolean autoUnsubscribe,
+        boolean internal,
+        boolean notifyExisting,
+        boolean oldValRequired,
+        boolean sync,
+        boolean ignoreExpired,
+        boolean loc,
+        final boolean keepBinary,
+        boolean ignoreClassNotFound) throws IgniteCheckedException
+    {
+        cctx.checkSecurity(SecurityPermission.CACHE_READ);
+
+        int taskNameHash = !internal && cctx.kernalContext().security().enabled() ?
+            cctx.kernalContext().job().currentTaskNameHash() : 0;
+
+        boolean skipPrimaryCheck = loc && cctx.config().getCacheMode() == CacheMode.REPLICATED && cctx.affinityNode();
+
+        boolean v2 = useV2Protocol(cctx.discovery().allNodes());
+
+        GridContinuousHandler hnd;
+
+        if (v2)
+            hnd = new CacheContinuousQueryHandlerV2(
+                cctx.name(),
+                TOPIC_CACHE.topic(topicPrefix, cctx.localNodeId(), seq.getAndIncrement()),
+                locLsnr,
+                rmtFilterFactory,
+                internal,
+                notifyExisting,
+                oldValRequired,
+                sync,
+                ignoreExpired,
+                taskNameHash,
+                skipPrimaryCheck,
+                cctx.isLocal(),
+                keepBinary,
+                ignoreClassNotFound);
+        else {
+            JCacheQueryRemoteFilter fltr = null;
+
+            if (rmtFilterFactory != null) {
+                fltr = rmtFilterFactory.create();
+
+                if (!(fltr.impl instanceof Serializable))
+                    throw new IgniteCheckedException("Topology has nodes of the old versions. In this case " +
+                        "EntryEventFilter must implement java.io.Serializable interface. Filter: " + fltr.impl);
+            }
+
+            hnd = new CacheContinuousQueryHandler(
+                cctx.name(),
+                TOPIC_CACHE.topic(topicPrefix, cctx.localNodeId(), seq.getAndIncrement()),
+                locLsnr,
+                fltr,
+                internal,
+                notifyExisting,
+                oldValRequired,
+                sync,
+                ignoreExpired,
+                taskNameHash,
+                skipPrimaryCheck,
+                cctx.isLocal(),
+                keepBinary,
+                ignoreClassNotFound);
+        }
+
+        return executeQuery0(locLsnr,
+            bufSize,
+            timeInterval,
+            autoUnsubscribe,
+            notifyExisting,
+            loc,
+            keepBinary,
+            hnd);
+    }
+
+    /**
+     * @param locLsnr Local listener.
+     * @param bufSize Buffer size.
+     * @param timeInterval Time interval.
+     * @param autoUnsubscribe Auto unsubscribe flag.
+     * @param internal Internal flag.
+     * @param notifyExisting Notify existing flag.
+     * @param oldValRequired Old value required flag.
+     * @param sync Synchronous flag.
+     * @param ignoreExpired Ignore expired event flag.
+     * @param loc Local flag.
+     * @return Continuous routine ID.
+     * @throws IgniteCheckedException In case of error.
+     */
+    private UUID executeQueryWithFilterFactory(CacheEntryUpdatedListener locLsnr,
         final Factory<? extends CacheEntryEventFilter> rmtFilterFactory,
         int bufSize,
         long timeInterval,
@@ -582,9 +751,7 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter {
 
         GridContinuousHandler hnd;
 
-        if (v2) {
-            assert rmtFilter == null : rmtFilter;
-
+        if (v2)
             hnd = new CacheContinuousQueryHandlerV2(
                 cctx.name(),
                 TOPIC_CACHE.topic(topicPrefix, cctx.localNodeId(), seq.getAndIncrement()),
@@ -600,7 +767,6 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter {
                 cctx.isLocal(),
                 keepBinary,
                 ignoreClassNotFound);
-        }
         else {
             CacheEntryEventFilter fltr = null;
 
@@ -608,8 +774,9 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter {
                 fltr = rmtFilterFactory.create();
 
                 if (!(fltr instanceof CacheEntryEventSerializableFilter))
-                    throw new IgniteCheckedException("Cache entry event filter must implement " +
-                        "org.apache.ignite.cache.CacheEntryEventSerializableFilter: " + fltr);
+                    throw new IgniteCheckedException("Topology has nodes of the old versions. In this case " +
+                        "EntryEventFilter should implement org.apache.ignite.cache.CacheEntryEventSerializableFilter " +
+                        "interface. Filter: " + fltr);
             }
 
             hnd = new CacheContinuousQueryHandler(
@@ -629,6 +796,37 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter {
                 ignoreClassNotFound);
         }
 
+        return executeQuery0(locLsnr,
+            bufSize,
+            timeInterval,
+            autoUnsubscribe,
+            notifyExisting,
+            loc,
+            keepBinary,
+            hnd);
+    }
+
+    /**
+     * @param locLsnr Local listener.
+     * @param bufSize Buffer size.
+     * @param timeInterval Time interval.
+     * @param autoUnsubscribe Auto unsubscribe flag.
+     * @param notifyExisting Notify existing flag.
+     * @param loc Local flag.
+     * @param keepBinary Keep binary.
+     * @param hnd Handler.
+     * @return Continuous routine ID.
+     * @throws IgniteCheckedException In case of error.
+     */
+    private UUID executeQuery0(CacheEntryUpdatedListener locLsnr,
+        int bufSize,
+        long timeInterval,
+        boolean autoUnsubscribe,
+        boolean notifyExisting,
+        boolean loc,
+        final boolean keepBinary,
+        final GridContinuousHandler hnd)
+        throws IgniteCheckedException {
         IgnitePredicate<ClusterNode> pred = (loc || cctx.config().getCacheMode() == CacheMode.LOCAL) ?
             F.nodeForNodeId(cctx.localNodeId()) : F.<ClusterNode>alwaysTrue();
 
@@ -694,7 +892,7 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter {
                                     cctx.kernalContext().cache().jcache(cctx.name()),
                                     cctx, entry);
 
-                                if (rmtFilter != null && !rmtFilter.evaluate(next))
+                                if (hnd.getEventFilter() != null && !hnd.getEventFilter().evaluate(next))
                                     next = null;
                             }
                         }
@@ -825,9 +1023,8 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter {
                 locLsnrImpl,
                 log);
 
-            routineId = executeQuery0(
+            routineId = executeJCacheQueryFactory(
                 locLsnr,
-                null,
                 new JCacheRemoteQueryFactory(cfg.getCacheEntryEventFilterFactory(), types),
                 ContinuousQuery.DFLT_PAGE_SIZE,
                 ContinuousQuery.DFLT_TIME_INTERVAL,
@@ -1026,12 +1223,12 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter {
     /**
      *
      */
-    private static class JCacheRemoteQueryFactory implements Factory<CacheEntryEventFilter> {
+    protected static class JCacheRemoteQueryFactory implements Factory<CacheEntryEventFilter> {
         /** */
         private static final long serialVersionUID = 0L;
 
         /** Factory. */
-        private Factory<CacheEntryEventFilter> impl;
+        protected Factory<CacheEntryEventFilter> impl;
 
         /** */
         private byte types;
@@ -1046,7 +1243,7 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter {
         }
 
         /** {@inheritDoc} */
-        @Override public CacheEntryEventFilter create() {
+        @Override public JCacheQueryRemoteFilter create() {
             return new JCacheQueryRemoteFilter(impl != null ? impl.create() : null, types);
         }
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/9ad476b2/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousHandler.java
index 8cd30a8..232e1ff 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousHandler.java
@@ -21,6 +21,7 @@ import java.io.Externalizable;
 import java.util.Collection;
 import java.util.Map;
 import java.util.UUID;
+import javax.cache.event.CacheEntryEventFilter;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.internal.GridKernalContext;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
@@ -158,4 +159,9 @@ public interface GridContinuousHandler extends Externalizable, Cloneable {
      * @param topVer Topology version.
      */
     public void updateCounters(AffinityTopologyVersion topVer, Map<Integer, Long> cntrs);
+
+    /**
+     * @return Cache entry filter.
+     */
+    public CacheEntryEventFilter getEventFilter();
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/9ad476b2/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerAbstractTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerAbstractTest.java
index 62a4153..e61127d 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerAbstractTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerAbstractTest.java
@@ -42,6 +42,7 @@ import javax.cache.configuration.FactoryBuilder;
 import javax.cache.configuration.MutableCacheEntryListenerConfiguration;
 import javax.cache.event.CacheEntryCreatedListener;
 import javax.cache.event.CacheEntryEvent;
+import javax.cache.event.CacheEntryEventFilter;
 import javax.cache.event.CacheEntryExpiredListener;
 import javax.cache.event.CacheEntryListener;
 import javax.cache.event.CacheEntryListenerException;
@@ -62,6 +63,7 @@ import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.processors.continuous.GridContinuousProcessor;
 import org.apache.ignite.internal.util.lang.GridAbsPredicate;
+import org.apache.ignite.internal.util.typedef.PA;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.testframework.GridTestUtils;
@@ -99,6 +101,9 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb
     /** */
     private boolean useObjects;
 
+    /** */
+    private static AtomicBoolean serialized = new AtomicBoolean(false);
+
     /** {@inheritDoc} */
     @SuppressWarnings("unchecked")
     @Override protected CacheConfiguration cacheConfiguration(String gridName) throws Exception {
@@ -138,6 +143,8 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb
 
             assertEquals(0, syncMsgFuts.size());
         }
+
+        serialized.set(false);
     }
 
     /**
@@ -439,18 +446,23 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb
 
         jcache(0).registerCacheEntryListener(new MutableCacheEntryListenerConfiguration<>(
             FactoryBuilder.factoryOf(lsnr),
-            null,
+            new SerializableFactory(),
             true,
             false
         ));
 
         try {
             startGrid(gridCount());
+
+            jcache(0).put(1, 1);
         }
         finally {
             stopGrid(gridCount());
         }
 
+        jcache(0).put(2, 2);
+
+        assertFalse(IgniteCacheEntryListenerAbstractTest.serialized.get());
         assertFalse(serialized.get());
     }
 
@@ -527,16 +539,16 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb
     /**
      * @throws Exception If failed.
      */
-    public void testEventsObjectKeyValue() throws Exception {
+    public void _testEventsObjectKeyValue() throws Exception {
         useObjects = true;
 
-        testEvents();
+        _testEvents();
     }
 
     /**
      * @throws Exception If failed.
      */
-    public void testEvents() throws Exception {
+    public void _testEvents() throws Exception {
         IgniteCache<Object, Object> cache = jcache();
 
         Map<Object, Object> vals = new HashMap<>();
@@ -1126,9 +1138,9 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb
     /**
      *
      */
-    private static class TestFilterFactory implements Factory<CacheEntryEventSerializableFilter<Object, Object>> {
+    private static class TestFilterFactory implements Factory<CacheEntryEventFilter<Object, Object>> {
         /** {@inheritDoc} */
-        @Override public CacheEntryEventSerializableFilter<Object, Object> create() {
+        @Override public CacheEntryEventFilter<Object, Object> create() {
             return new TestFilter();
         }
     }
@@ -1180,7 +1192,7 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb
     /**
      *
      */
-    private static class TestFilter implements CacheEntryEventSerializableFilter<Object, Object> {
+    private static class TestFilter implements CacheEntryEventFilter<Object, Object>, Externalizable {
         /** {@inheritDoc} */
         @Override public boolean evaluate(CacheEntryEvent<?, ?> evt) {
             assert evt != null;
@@ -1197,6 +1209,16 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb
 
             return key % 2 == 0;
         }
+
+        /** {@inheritDoc} */
+        @Override public void writeExternal(ObjectOutput out) throws IOException {
+            throw new UnsupportedOperationException("Filter muns't be marshaled.");
+        }
+
+        /** {@inheritDoc} */
+        @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+            throw new UnsupportedOperationException("Filter muns't be unmarshaled.");
+        }
     }
 
     /**
@@ -1351,6 +1373,36 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb
     }
 
     /**
+     *
+     */
+    public static class SerializableFactory implements Factory<NonSerializableFilter> {
+        /** {@inheritDoc} */
+        @Override public NonSerializableFilter create() {
+            return new NonSerializableFilter();
+        }
+    }
+
+    /**
+     *
+     */
+    public static class NonSerializableFilter implements CacheEntryEventFilter<Object, Object>, Externalizable {
+        /** {@inheritDoc} */
+        @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+            serialized.set(true);
+        }
+
+        /** {@inheritDoc} */
+        @Override public void writeExternal(ObjectOutput out) throws IOException {
+            serialized.set(true);
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean evaluate(CacheEntryEvent<?, ?> event) throws CacheEntryListenerException {
+            return true;
+        }
+    }
+
+    /**
      */
     public static class NonSerializableListener implements CacheEntryCreatedListener<Object, Object>, Externalizable {
         /** */

http://git-wip-us.apache.org/repos/asf/ignite/blob/9ad476b2/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFactoryFilterTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFactoryFilterTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFactoryFilterTest.java
index 2ff2b79..d6d30ae 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFactoryFilterTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFactoryFilterTest.java
@@ -21,69 +21,47 @@ import java.io.Externalizable;
 import java.io.IOException;
 import java.io.ObjectInput;
 import java.io.ObjectOutput;
-import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.Collection;
-import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Random;
-import java.util.SortedMap;
-import java.util.TreeMap;
 import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
-import javax.cache.Cache;
+import java.util.concurrent.ThreadLocalRandom;
+import javax.cache.configuration.CacheEntryListenerConfiguration;
 import javax.cache.configuration.Factory;
 import javax.cache.configuration.FactoryBuilder;
-import javax.cache.configuration.FactoryBuilder.SingletonFactory;
+import javax.cache.configuration.MutableCacheEntryListenerConfiguration;
+import javax.cache.event.CacheEntryCreatedListener;
 import javax.cache.event.CacheEntryEvent;
-import javax.cache.event.CacheEntryEventFilter;
+import javax.cache.event.CacheEntryExpiredListener;
 import javax.cache.event.CacheEntryListenerException;
+import javax.cache.event.CacheEntryRemovedListener;
 import javax.cache.event.CacheEntryUpdatedListener;
-import javax.cache.integration.CacheLoaderException;
-import javax.cache.integration.CacheWriterException;
-import javax.cache.processor.EntryProcessor;
-import javax.cache.processor.MutableEntry;
 import org.apache.ignite.Ignite;
 import org.apache.ignite.IgniteCache;
-import org.apache.ignite.cache.CacheAtomicityMode;
 import org.apache.ignite.cache.CacheEntryEventSerializableFilter;
-import org.apache.ignite.cache.CacheMemoryMode;
-import org.apache.ignite.cache.CacheMode;
 import org.apache.ignite.cache.affinity.Affinity;
 import org.apache.ignite.cache.query.CacheQueryEntryEvent;
 import org.apache.ignite.cache.query.ContinuousQuery;
 import org.apache.ignite.cache.query.QueryCursor;
-import org.apache.ignite.cache.store.CacheStore;
-import org.apache.ignite.cache.store.CacheStoreAdapter;
 import org.apache.ignite.configuration.CacheConfiguration;
-import org.apache.ignite.configuration.IgniteConfiguration;
-import org.apache.ignite.internal.util.typedef.internal.S;
-import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
-import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
-import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
-import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryRandomOperationsTest.QueryTestKey;
+import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryRandomOperationsTest.QueryTestValue;
+import org.apache.ignite.internal.util.typedef.T2;
 import org.apache.ignite.transactions.Transaction;
 import org.apache.ignite.transactions.TransactionConcurrency;
 import org.apache.ignite.transactions.TransactionIsolation;
 
 import static java.util.concurrent.TimeUnit.MILLISECONDS;
 import static java.util.concurrent.TimeUnit.SECONDS;
-import static org.apache.ignite.cache.CacheAtomicWriteOrderMode.PRIMARY;
-import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC;
 import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
-import static org.apache.ignite.cache.CacheMemoryMode.OFFHEAP_TIERED;
-import static org.apache.ignite.cache.CacheMemoryMode.OFFHEAP_VALUES;
-import static org.apache.ignite.cache.CacheMemoryMode.ONHEAP_TIERED;
-import static org.apache.ignite.cache.CacheMode.PARTITIONED;
-import static org.apache.ignite.cache.CacheMode.REPLICATED;
-import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
-import static org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryFactoryFilterTest.ContinuousDeploy.ALL;
-import static org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryFactoryFilterTest.ContinuousDeploy.CLIENT;
-import static org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryFactoryFilterTest.ContinuousDeploy.SERVER;
 import static org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryFactoryFilterTest.NonSerializableFilter.isAccepted;
+import static org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryRandomOperationsTest.ContinuousDeploy.CLIENT;
+import static org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryRandomOperationsTest.ContinuousDeploy.SERVER;
 import static org.apache.ignite.transactions.TransactionIsolation.READ_COMMITTED;
 import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ;
 import static org.apache.ignite.transactions.TransactionIsolation.SERIALIZABLE;
@@ -91,10 +69,7 @@ import static org.apache.ignite.transactions.TransactionIsolation.SERIALIZABLE;
 /**
  *
  */
-public class CacheContinuousQueryFactoryFilterTest extends GridCommonAbstractTest {
-    /** */
-    private static TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true);
-
+public class CacheContinuousQueryFactoryFilterTest extends CacheContinuousQueryRandomOperationsTest {
     /** */
     private static final int NODES = 5;
 
@@ -105,488 +80,10 @@ public class CacheContinuousQueryFactoryFilterTest extends GridCommonAbstractTes
     private static final int VALS = 10;
 
     /** */
-    public static final int ITERATION_CNT = 100;
-
-    /** */
-    private boolean client;
+    public static final int ITERATION_CNT = 40;
 
     /** {@inheritDoc} */
-    @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
-        IgniteConfiguration cfg = super.getConfiguration(gridName);
-
-        ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(ipFinder);
-
-        cfg.setClientMode(client);
-
-        return cfg;
-    }
-
-    /** {@inheritDoc} */
-    @Override protected void beforeTestsStarted() throws Exception {
-        super.beforeTestsStarted();
-
-        startGridsMultiThreaded(NODES - 1);
-
-        client = true;
-
-        startGrid(NODES - 1);
-    }
-
-    /** {@inheritDoc} */
-    @Override protected void afterTestsStopped() throws Exception {
-        stopAllGrids();
-
-        super.afterTestsStopped();
-    }
-
-    /**
-     * @throws Exception If failed.
-     */
-    public void testAtomicClient() throws Exception {
-        CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED,
-            1,
-            ATOMIC,
-            ONHEAP_TIERED,
-            false);
-
-        testContinuousQuery(ccfg, CLIENT);
-    }
-
-    /**
-     * @throws Exception If failed.
-     */
-    public void testAtomic() throws Exception {
-        CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED,
-            1,
-            ATOMIC,
-            ONHEAP_TIERED,
-            false);
-
-        testContinuousQuery(ccfg, SERVER);
-    }
-
-    /**
-     * @throws Exception If failed.
-     */
-    public void testAtomicAllNodes() throws Exception {
-        CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED,
-            1,
-            ATOMIC,
-            ONHEAP_TIERED,
-            false);
-
-        testContinuousQuery(ccfg, ALL);
-    }
-
-    /**
-     * @throws Exception If failed.
-     */
-    public void testAtomicReplicated() throws Exception {
-        CacheConfiguration<Object, Object> ccfg = cacheConfiguration(REPLICATED,
-            0,
-            ATOMIC,
-            ONHEAP_TIERED,
-            false);
-
-        testContinuousQuery(ccfg, SERVER);
-    }
-
-    /**
-     * @throws Exception If failed.
-     */
-    public void testAtomicReplicatedAllNodes() throws Exception {
-        CacheConfiguration<Object, Object> ccfg = cacheConfiguration(REPLICATED,
-            0,
-            ATOMIC,
-            ONHEAP_TIERED,
-            false);
-
-        testContinuousQuery(ccfg, ALL);
-    }
-
-    /**
-     * @throws Exception If failed.
-     */
-    public void testAtomicReplicatedClient() throws Exception {
-        CacheConfiguration<Object, Object> ccfg = cacheConfiguration(REPLICATED,
-            0,
-            ATOMIC,
-            ONHEAP_TIERED,
-            false);
-
-        testContinuousQuery(ccfg, CLIENT);
-    }
-
-    /**
-     * @throws Exception If failed.
-     */
-    public void testAtomicOffheapValues() throws Exception {
-        CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED,
-            1,
-            ATOMIC,
-            OFFHEAP_VALUES,
-            false);
-
-        testContinuousQuery(ccfg, SERVER);
-    }
-
-    /**
-     * @throws Exception If failed.
-     */
-    public void testAtomicOffheapValuesAllNodes() throws Exception {
-        CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED,
-            1,
-            ATOMIC,
-            OFFHEAP_VALUES,
-            false);
-
-        testContinuousQuery(ccfg, ALL);
-    }
-
-    /**
-     * @throws Exception If failed.
-     */
-    public void testAtomicOffheapValuesClient() throws Exception {
-        CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED,
-            1,
-            ATOMIC,
-            OFFHEAP_VALUES,
-            false);
-
-        testContinuousQuery(ccfg, CLIENT);
-    }
-
-    /**
-     * @throws Exception If failed.
-     */
-    public void testAtomicOffheapTiered() throws Exception {
-        CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED,
-            1,
-            ATOMIC,
-            OFFHEAP_TIERED,
-            false);
-
-        testContinuousQuery(ccfg, SERVER);
-    }
-
-    /**
-     * @throws Exception If failed.
-     */
-    public void testAtomicOffheapTieredAllNodes() throws Exception {
-        CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED,
-            1,
-            ATOMIC,
-            OFFHEAP_TIERED,
-            false);
-
-        testContinuousQuery(ccfg, ALL);
-    }
-
-    /**
-     * @throws Exception If failed.
-     */
-    public void testAtomicOffheapTieredClient() throws Exception {
-        CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED,
-            1,
-            ATOMIC,
-            OFFHEAP_TIERED,
-            false);
-
-        testContinuousQuery(ccfg, CLIENT);
-    }
-
-    /**
-     * @throws Exception If failed.
-     */
-    public void testAtomicNoBackups() throws Exception {
-        CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED,
-            0,
-            ATOMIC,
-            ONHEAP_TIERED,
-            false);
-
-        testContinuousQuery(ccfg, SERVER);
-    }
-
-    /**
-     * @throws Exception If failed.
-     */
-    public void testAtomicNoBackupsAllNodes() throws Exception {
-        CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED,
-            0,
-            ATOMIC,
-            ONHEAP_TIERED,
-            false);
-
-        testContinuousQuery(ccfg, ALL);
-    }
-
-    /**
-     * @throws Exception If failed.
-     */
-    public void testAtomicNoBackupsClient() throws Exception {
-        CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED,
-            0,
-            ATOMIC,
-            ONHEAP_TIERED,
-            false);
-
-        testContinuousQuery(ccfg, CLIENT);
-    }
-
-    /**
-     * @throws Exception If failed.
-     */
-    public void testTx() throws Exception {
-        CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED,
-            1,
-            TRANSACTIONAL,
-            ONHEAP_TIERED,
-            false);
-
-        testContinuousQuery(ccfg, SERVER);
-    }
-
-    /**
-     * @throws Exception If failed.
-     */
-    public void testTxAllNodes() throws Exception {
-        CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED,
-            1,
-            TRANSACTIONAL,
-            ONHEAP_TIERED,
-            false);
-
-        testContinuousQuery(ccfg, ALL);
-    }
-
-    /**
-     * @throws Exception If failed.
-     */
-    public void testTxExplicit() throws Exception {
-        CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED,
-            1,
-            TRANSACTIONAL,
-            ONHEAP_TIERED,
-            false);
-
-        testContinuousQuery(ccfg, SERVER);
-    }
-
-    /**
-     * @throws Exception If failed.
-     */
-    public void testTxClient() throws Exception {
-        CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED,
-            1,
-            TRANSACTIONAL,
-            ONHEAP_TIERED,
-            false);
-
-        testContinuousQuery(ccfg, CLIENT);
-    }
-
-    /**
-     * @throws Exception If failed.
-     */
-    public void testTxClientExplicit() throws Exception {
-        CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED,
-            1,
-            TRANSACTIONAL,
-            ONHEAP_TIERED,
-            false);
-
-        testContinuousQuery(ccfg, CLIENT);
-    }
-
-    /**
-     * @throws Exception If failed.
-     */
-    public void testTxReplicated() throws Exception {
-        CacheConfiguration<Object, Object> ccfg = cacheConfiguration(REPLICATED,
-            0,
-            TRANSACTIONAL,
-            ONHEAP_TIERED,
-            false);
-
-        testContinuousQuery(ccfg, SERVER);
-    }
-
-    /**
-     * @throws Exception If failed.
-     */
-    public void testTxReplicatedClient() throws Exception {
-        CacheConfiguration<Object, Object> ccfg = cacheConfiguration(REPLICATED,
-            0,
-            TRANSACTIONAL,
-            ONHEAP_TIERED,
-            false);
-
-        testContinuousQuery(ccfg, CLIENT);
-    }
-
-    /**
-     * @throws Exception If failed.
-     */
-    public void testTxOffheapValues() throws Exception {
-        CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED,
-            1,
-            TRANSACTIONAL,
-            OFFHEAP_VALUES,
-            false);
-
-        testContinuousQuery(ccfg, SERVER);
-    }
-
-    /**
-     * @throws Exception If failed.
-     */
-    public void testTxOffheapValuesAllNodes() throws Exception {
-        CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED,
-            1,
-            TRANSACTIONAL,
-            OFFHEAP_VALUES,
-            false);
-
-        testContinuousQuery(ccfg, ALL);
-    }
-
-    /**
-     * @throws Exception If failed.
-     */
-    public void testTxOffheapValuesExplicit() throws Exception {
-        CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED,
-            1,
-            TRANSACTIONAL,
-            OFFHEAP_VALUES,
-            false);
-
-        testContinuousQuery(ccfg, SERVER);
-    }
-
-    /**
-     * @throws Exception If failed.
-     */
-    public void testTxOffheapValuesClient() throws Exception {
-        CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED,
-            1,
-            TRANSACTIONAL,
-            OFFHEAP_VALUES,
-            false);
-
-        testContinuousQuery(ccfg, CLIENT);
-    }
-
-    /**
-     * @throws Exception If failed.
-     */
-    public void testTxOffheapTiered() throws Exception {
-        CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED,
-            1,
-            TRANSACTIONAL,
-            OFFHEAP_TIERED,
-            false);
-
-        testContinuousQuery(ccfg, SERVER);
-    }
-
-    /**
-     * @throws Exception If failed.
-     */
-    public void testTxOffheapTieredAllNodes() throws Exception {
-        CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED,
-            1,
-            TRANSACTIONAL,
-            OFFHEAP_TIERED,
-            false);
-
-        testContinuousQuery(ccfg, ALL);
-    }
-
-    /**
-     * @throws Exception If failed.
-     */
-    public void testTxOffheapTieredClient() throws Exception {
-        CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED,
-            1,
-            TRANSACTIONAL,
-            OFFHEAP_TIERED,
-            false);
-
-        testContinuousQuery(ccfg, CLIENT);
-    }
-
-    /**
-     * @throws Exception If failed.
-     */
-    public void testTxOffheapTieredClientExplicit() throws Exception {
-        CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED,
-            1,
-            TRANSACTIONAL,
-            OFFHEAP_TIERED,
-            false);
-
-        testContinuousQuery(ccfg, CLIENT);
-    }
-
-    /**
-     * @throws Exception If failed.
-     */
-    public void testTxNoBackups() throws Exception {
-        CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED,
-            0,
-            TRANSACTIONAL,
-            ONHEAP_TIERED,
-            false);
-
-        testContinuousQuery(ccfg, SERVER);
-    }
-
-    /**
-     * @throws Exception If failed.
-     */
-    public void testTxNoBackupsAllNodes() throws Exception {
-        CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED,
-            0,
-            TRANSACTIONAL,
-            ONHEAP_TIERED,
-            false);
-
-        testContinuousQuery(ccfg, ALL);
-    }
-
-    /**
-     * @throws Exception If failed.
-     */
-    public void testTxNoBackupsExplicit() throws Exception {
-        CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED,
-            0,
-            TRANSACTIONAL,
-            ONHEAP_TIERED,
-            false);
-
-        testContinuousQuery(ccfg, SERVER);
-    }
-
-    /**
-     * @throws Exception If failed.
-     */
-    public void testTxNoBackupsClient() throws Exception {
-        CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED,
-            0,
-            TRANSACTIONAL,
-            ONHEAP_TIERED,
-            false);
-
-        testContinuousQuery(ccfg, CLIENT);
-    }
-
-    /**
-     * @param ccfg Cache configuration.
-     * @param deploy The place where continuous query will be started.
-     * @throws Exception If failed.
-     */
-    private void testContinuousQuery(CacheConfiguration<Object, Object> ccfg, ContinuousDeploy deploy)
+    @Override protected void testContinuousQuery(CacheConfiguration<Object, Object> ccfg, ContinuousDeploy deploy)
         throws Exception {
         ignite(0).createCache(ccfg);
 
@@ -601,70 +98,18 @@ public class CacheContinuousQueryFactoryFilterTest extends GridCommonAbstractTes
 
             Collection<QueryCursor<?>> curs = new ArrayList<>();
 
-            if (deploy == CLIENT) {
-                ContinuousQuery<QueryTestKey, QueryTestValue> qry = new ContinuousQuery<>();
-
-                final BlockingQueue<CacheEntryEvent<?, ?>> evtsQueue = new ArrayBlockingQueue<>(50_000);
-
-                qry.setLocalListener(new CacheEntryUpdatedListener<QueryTestKey, QueryTestValue>() {
-                    @Override public void onUpdated(Iterable<CacheEntryEvent<? extends QueryTestKey,
-                        ? extends QueryTestValue>> evts) throws CacheEntryListenerException {
-                        for (CacheEntryEvent<?, ?> evt : evts)
-                            evtsQueue.add(evt);
-                    }
-                });
-
-                qry.setRemoteFilterFactory(new FilterFactory());
-
-                evtsQueues.add(evtsQueue);
+            Collection<T2<Integer, MutableCacheEntryListenerConfiguration>> lsnrCfgs = new ArrayList<>();
 
-                QueryCursor<?> cur = grid(NODES - 1).cache(ccfg.getName()).query(qry);
-
-                curs.add(cur);
-            }
-            else if (deploy == SERVER) {
-                ContinuousQuery<QueryTestKey, QueryTestValue> qry = new ContinuousQuery<>();
-
-                final BlockingQueue<CacheEntryEvent<?, ?>> evtsQueue = new ArrayBlockingQueue<>(50_000);
-
-                qry.setLocalListener(new CacheEntryUpdatedListener<QueryTestKey, QueryTestValue>() {
-                    @Override public void onUpdated(Iterable<CacheEntryEvent<? extends QueryTestKey,
-                        ? extends QueryTestValue>> evts) throws CacheEntryListenerException {
-                        for (CacheEntryEvent<?, ?> evt : evts)
-                            evtsQueue.add(evt);
-                    }
-                });
-
-                qry.setRemoteFilterFactory(new FilterFactory());
-
-                evtsQueues.add(evtsQueue);
-
-                QueryCursor<?> cur = grid(rnd.nextInt(NODES - 1)).cache(ccfg.getName()).query(qry);
-
-                curs.add(cur);
-            }
+            if (deploy == CLIENT)
+                evtsQueues.add(registerListener(ccfg.getName(), NODES - 1, curs, lsnrCfgs, rnd.nextBoolean()));
+            else if (deploy == SERVER)
+                evtsQueues.add(registerListener(ccfg.getName(), rnd.nextInt(NODES - 1), curs, lsnrCfgs,
+                    rnd.nextBoolean()));
             else {
-                for (int i = 0; i < NODES - 1; i++) {
-                    ContinuousQuery<QueryTestKey, QueryTestValue> qry = new ContinuousQuery<>();
+                boolean isSync = rnd.nextBoolean();
 
-                    final BlockingQueue<CacheEntryEvent<?, ?>> evtsQueue = new ArrayBlockingQueue<>(50_000);
-
-                    qry.setLocalListener(new CacheEntryUpdatedListener<QueryTestKey, QueryTestValue>() {
-                        @Override public void onUpdated(Iterable<CacheEntryEvent<? extends QueryTestKey,
-                            ? extends QueryTestValue>> evts) throws CacheEntryListenerException {
-                            for (CacheEntryEvent<?, ?> evt : evts)
-                                evtsQueue.add(evt);
-                        }
-                    });
-
-                    qry.setRemoteFilterFactory(new FilterFactory());
-
-                    evtsQueues.add(evtsQueue);
-
-                    QueryCursor<?> cur = ignite(i).cache(ccfg.getName()).query(qry);
-
-                    curs.add(cur);
-                }
+                for (int i = 0; i < NODES - 1; i++)
+                    evtsQueues.add(registerListener(ccfg.getName(), i, curs, lsnrCfgs, isSync));
             }
 
             ConcurrentMap<Object, Object> expData = new ConcurrentHashMap<>();
@@ -673,7 +118,7 @@ public class CacheContinuousQueryFactoryFilterTest extends GridCommonAbstractTes
 
             try {
                 for (int i = 0; i < ITERATION_CNT; i++) {
-                    if (i % 20 == 0)
+                    if (i % 10 == 0)
                         log.info("Iteration: " + i);
 
                     for (int idx = 0; idx < NODES; idx++)
@@ -683,6 +128,9 @@ public class CacheContinuousQueryFactoryFilterTest extends GridCommonAbstractTes
             finally {
                 for (QueryCursor<?> cur : curs)
                     cur.close();
+
+                for (T2<Integer, MutableCacheEntryListenerConfiguration> e : lsnrCfgs)
+                    grid(e.get1()).cache(ccfg.getName()).deregisterCacheEntryListener(e.get2());
             }
         }
         finally {
@@ -691,6 +139,60 @@ public class CacheContinuousQueryFactoryFilterTest extends GridCommonAbstractTes
     }
 
     /**
+     * @param cacheName Cache name.
+     * @param nodeIdx Node index.
+     * @param curs Cursors.
+     * @param lsnrCfgs Listener configurations.
+     * @return Event queue
+     */
+    private BlockingQueue<CacheEntryEvent<?, ?>> registerListener(String cacheName,
+        int nodeIdx,
+        Collection<QueryCursor<?>> curs,
+        Collection<T2<Integer, MutableCacheEntryListenerConfiguration>> lsnrCfgs,
+        boolean sync) {
+        final BlockingQueue<CacheEntryEvent<?, ?>> evtsQueue = new ArrayBlockingQueue<>(50_000);
+
+        if (ThreadLocalRandom.current().nextBoolean()) {
+            MutableCacheEntryListenerConfiguration<QueryTestKey, QueryTestValue> lsnrCfg =
+                new MutableCacheEntryListenerConfiguration<>(
+                    FactoryBuilder.factoryOf(new LocalNonSerialiseListener() {
+                        @Override protected void onEvents(Iterable<CacheEntryEvent<? extends QueryTestKey,
+                            ? extends QueryTestValue>> evts) {
+                            for (CacheEntryEvent<?, ?> evt : evts)
+                                evtsQueue.add(evt);
+                        }
+                    }),
+                    new FilterFactory(),
+                    true,
+                    sync
+                );
+
+            grid(nodeIdx).cache(cacheName).registerCacheEntryListener((CacheEntryListenerConfiguration)lsnrCfg);
+
+            lsnrCfgs.add(new T2<Integer, MutableCacheEntryListenerConfiguration>(nodeIdx, lsnrCfg));
+        }
+        else {
+            ContinuousQuery<QueryTestKey, QueryTestValue> qry = new ContinuousQuery<>();
+
+            qry.setLocalListener(new CacheEntryUpdatedListener<QueryTestKey, QueryTestValue>() {
+                @Override public void onUpdated(Iterable<CacheEntryEvent<? extends QueryTestKey,
+                    ? extends QueryTestValue>> evts) throws CacheEntryListenerException {
+                    for (CacheEntryEvent<?, ?> evt : evts)
+                        evtsQueue.add(evt);
+                }
+            });
+
+            qry.setRemoteFilterFactory(new FilterFactory());
+
+            QueryCursor<?> cur = grid(nodeIdx).cache(cacheName).query(qry);
+
+            curs.add(cur);
+        }
+
+        return evtsQueue;
+    }
+
+    /**
      * @param rnd Random generator.
      * @param evtsQueues Events queue.
      * @param expData Expected cache data.
@@ -927,49 +429,6 @@ public class CacheContinuousQueryFactoryFilterTest extends GridCommonAbstractTes
                     break;
                 }
 
-                case 11: {
-                    SortedMap<Object, Object> vals = new TreeMap<>();
-
-                    while (vals.size() < KEYS / 5)
-                        vals.put(new QueryTestKey(rnd.nextInt(KEYS)), value(rnd));
-
-                    cache.putAll(vals);
-
-                    if (tx != null)
-                        tx.commit();
-
-                    for (Map.Entry<Object, Object> e : vals.entrySet())
-                        updatePartitionCounter(cache, e.getKey(), partCntr);
-
-                    waitAndCheckEvent(evtsQueues, partCntr, affinity(cache), vals, expData);
-
-                    expData.putAll(vals);
-
-                    break;
-                }
-
-                case 12: {
-                    SortedMap<Object, Object> vals = new TreeMap<>();
-
-                    while (vals.size() < KEYS / 5)
-                        vals.put(new QueryTestKey(rnd.nextInt(KEYS)), newVal);
-
-                    cache.invokeAll(vals.keySet(), new EntrySetValueProcessor(newVal, rnd.nextBoolean()));
-
-                    if (tx != null)
-                        tx.commit();
-
-                    for (Map.Entry<Object, Object> e : vals.entrySet())
-                        updatePartitionCounter(cache, e.getKey(), partCntr);
-
-                    waitAndCheckEvent(evtsQueues, partCntr, affinity(cache), vals, expData);
-
-                    for (Object o : vals.keySet())
-                        expData.put(o, newVal);
-
-                    break;
-                }
-
                 default:
                     fail("Op:" + op);
             }
@@ -980,76 +439,6 @@ public class CacheContinuousQueryFactoryFilterTest extends GridCommonAbstractTes
     }
 
     /**
-     *  @param evtsQueues Queue.
-     * @param partCntrs Counters.
-     * @param aff Affinity.
-     * @param vals Values.
-     * @param expData Expected data.
-     */
-    private void waitAndCheckEvent(List<BlockingQueue<CacheEntryEvent<?, ?>>> evtsQueues,
-        Map<Integer, Long> partCntrs,
-        Affinity<Object> aff,
-        SortedMap<Object, Object> vals,
-        Map<Object, Object> expData)
-        throws Exception {
-        Map<Object, Object> vals0 = new HashMap<>(vals);
-
-        for (Map.Entry<Object, Object> e : vals0.entrySet()) {
-            if (!isAccepted((QueryTestValue)e.getValue()))
-                vals.remove(e.getKey());
-        }
-
-        for (BlockingQueue<CacheEntryEvent<?, ?>> evtsQueue : evtsQueues) {
-            Map<Object, CacheEntryEvent> rcvEvts = new HashMap<>();
-
-            for (int i = 0; i < vals.size(); i++) {
-                CacheEntryEvent<?, ?> evt = evtsQueue.poll(5, SECONDS);
-
-                try {
-                    assertNotNull(evt);
-                }
-                catch (Throwable e) {
-                    int z = 0;
-
-                    ++z;
-                }
-
-                rcvEvts.put(evt.getKey(), evt);
-            }
-
-            assertEquals(vals.size(), rcvEvts.size());
-
-            for (Map.Entry<Object, Object> e : vals.entrySet()) {
-                Object key = e.getKey();
-                Object val = e.getValue();
-                Object oldVal = expData.get(key);
-
-                if (val == null && oldVal == null) {
-                    checkNoEvent(evtsQueues);
-
-                    continue;
-                }
-
-                CacheEntryEvent evt = rcvEvts.get(key);
-
-                assertNotNull("Failed to wait for event [key=" + key + ", val=" + val + ", oldVal=" + oldVal + ']',
-                    evt);
-                assertEquals(key, evt.getKey());
-                assertEquals(val, evt.getValue());
-                assertEquals(oldVal, evt.getOldValue());
-
-                long cntr = partCntrs.get(aff.partition(key));
-                CacheQueryEntryEvent qryEntryEvt = (CacheQueryEntryEvent)evt.unwrap(CacheQueryEntryEvent.class);
-
-                assertNotNull(cntr);
-                assertNotNull(qryEntryEvt);
-
-                assertEquals(cntr, qryEntryEvt.getPartitionUpdateCounter());
-            }
-        }
-    }
-
-    /**
      * @param rnd {@link Random}.
      * @return {@link TransactionIsolation}.
      */
@@ -1153,230 +542,99 @@ public class CacheContinuousQueryFactoryFilterTest extends GridCommonAbstractTes
 
     /**
      *
-     * @param cacheMode Cache mode.
-     * @param backups Number of backups.
-     * @param atomicityMode Cache atomicity mode.
-     * @param memoryMode Cache memory mode.
-     * @param store If {@code true} configures dummy cache store.
-     * @return Cache configuration.
-     */
-    private CacheConfiguration<Object, Object> cacheConfiguration(
-        CacheMode cacheMode,
-        int backups,
-        CacheAtomicityMode atomicityMode,
-        CacheMemoryMode memoryMode,
-        boolean store) {
-        CacheConfiguration<Object, Object> ccfg = new CacheConfiguration<>();
-
-        ccfg.setAtomicityMode(atomicityMode);
-        ccfg.setCacheMode(cacheMode);
-        ccfg.setMemoryMode(memoryMode);
-        ccfg.setWriteSynchronizationMode(FULL_SYNC);
-        ccfg.setAtomicWriteOrderMode(PRIMARY);
-
-        if (cacheMode == PARTITIONED)
-            ccfg.setBackups(backups);
-
-        if (store) {
-            ccfg.setCacheStoreFactory(new TestStoreFactory());
-            ccfg.setReadThrough(true);
-            ccfg.setWriteThrough(true);
-        }
-
-        return ccfg;
-    }
-
-    /**
-     *
-     */
-    private static class TestStoreFactory implements Factory<CacheStore<Object, Object>> {
-        /** {@inheritDoc} */
-        @SuppressWarnings("unchecked")
-        @Override public CacheStore<Object, Object> create() {
-            return new CacheStoreAdapter() {
-                @Override public Object load(Object key) throws CacheLoaderException {
-                    return null;
-                }
-
-                @Override public void write(Cache.Entry entry) throws CacheWriterException {
-                    // No-op.
-                }
-
-                @Override public void delete(Object key) throws CacheWriterException {
-                    // No-op.
-                }
-            };
-        }
-    }
-
-    /**
-     *
      */
-    static class QueryTestKey implements Serializable, Comparable {
+    protected static class NonSerializableFilter
+        implements CacheEntryEventSerializableFilter<QueryTestKey, QueryTestValue>, Externalizable {
         /** */
-        private final Integer key;
-
-        /**
-         * @param key Key.
-         */
-        public QueryTestKey(Integer key) {
-            this.key = key;
+        public NonSerializableFilter() {
+            // No-op.
         }
 
         /** {@inheritDoc} */
-        @Override public boolean equals(Object o) {
-            if (this == o)
-                return true;
-
-            if (o == null || getClass() != o.getClass())
-                return false;
-
-            QueryTestKey that = (QueryTestKey)o;
-
-            return key.equals(that.key);
+        @Override public boolean evaluate(CacheEntryEvent<? extends QueryTestKey, ? extends QueryTestValue> event)
+            throws CacheEntryListenerException {
+            return isAccepted(event.getValue());
         }
 
         /** {@inheritDoc} */
-        @Override public int hashCode() {
-            return key.hashCode();
+        @Override public void writeExternal(ObjectOutput out) throws IOException {
+            fail("Entry filter should not be marshaled.");
         }
 
         /** {@inheritDoc} */
-        @Override public String toString() {
-            return S.toString(QueryTestKey.class, this);
+        @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+            fail("Entry filter should not be marshaled.");
         }
 
-        /** {@inheritDoc} */
-        @Override public int compareTo(Object o) {
-            return key - ((QueryTestKey)o).key;
+        /**
+         * @return {@code True} if value is even.
+         */
+        public static boolean isAccepted(QueryTestValue val) {
+            return val == null || val.val1 % 2 == 0;
         }
     }
 
     /**
      *
      */
-    static class QueryTestValue implements Serializable {
-        /** */
-        private final Integer val1;
-
-        /** */
-        private final String val2;
-
-        /**
-         * @param val Value.
-         */
-        public QueryTestValue(Integer val) {
-            this.val1 = val;
-            this.val2 = String.valueOf(val);
-        }
-
-        /** {@inheritDoc} */
-        @Override public boolean equals(Object o) {
-            if (this == o)
-                return true;
-
-            if (o == null || getClass() != o.getClass())
-                return false;
-
-            QueryTestValue that = (QueryTestValue) o;
-
-            return val1.equals(that.val1) && val2.equals(that.val2);
-        }
-
-        /** {@inheritDoc} */
-        @Override public int hashCode() {
-            int res = val1.hashCode();
-
-            res = 31 * res + val2.hashCode();
-
-            return res;
-        }
-
-        /** {@inheritDoc} */
-        @Override public String toString() {
-            return S.toString(QueryTestValue.class, this);
+    protected static class FilterFactory implements Factory<NonSerializableFilter> {
+        @Override public NonSerializableFilter create() {
+            return new NonSerializableFilter();
         }
     }
 
     /**
      *
      */
-    protected static class EntrySetValueProcessor implements EntryProcessor<Object, Object, Object> {
+    public abstract class LocalNonSerialiseListener implements
+        CacheEntryUpdatedListener<QueryTestKey, QueryTestValue>,
+        CacheEntryCreatedListener<QueryTestKey, QueryTestValue>,
+        CacheEntryExpiredListener<QueryTestKey, QueryTestValue>,
+        CacheEntryRemovedListener<QueryTestKey, QueryTestValue>,
+        Externalizable {
         /** */
-        private Object val;
-
-        /** */
-        private boolean retOld;
-
-        /**
-         * @param val Value to set.
-         * @param retOld Return old value flag.
-         */
-        public EntrySetValueProcessor(Object val, boolean retOld) {
-            this.val = val;
-            this.retOld = retOld;
+        public LocalNonSerialiseListener() {
+            // No-op.
         }
 
         /** {@inheritDoc} */
-        @Override public Object process(MutableEntry<Object, Object> e, Object... args) {
-            Object old = retOld ? e.getValue() : null;
-
-            if (val != null)
-                e.setValue(val);
-            else
-                e.remove();
-
-            return old;
+        @Override public void onCreated(Iterable<CacheEntryEvent<? extends QueryTestKey,
+            ? extends QueryTestValue>> evts) throws CacheEntryListenerException {
+            onEvents(evts);
         }
 
         /** {@inheritDoc} */
-        @Override public String toString() {
-            return S.toString(EntrySetValueProcessor.class, this);
+        @Override public void onExpired(Iterable<CacheEntryEvent<? extends QueryTestKey,
+            ? extends QueryTestValue>> evts) throws CacheEntryListenerException {
+            onEvents(evts);
         }
-    }
-
-    /**
-     *
-     */
-    protected enum ContinuousDeploy {
-        CLIENT, SERVER, ALL
-    }
 
-    /**
-     *
-     */
-    protected static class NonSerializableFilter
-        implements CacheEntryEventSerializableFilter<QueryTestKey, QueryTestValue>, Externalizable {
         /** {@inheritDoc} */
-        @Override public boolean evaluate(CacheEntryEvent<? extends QueryTestKey, ? extends QueryTestValue> event)
-            throws CacheEntryListenerException {
-            return isAccepted(event.getValue());
-        }
-
-        /** {@inheritDoc} */
-        @Override public void writeExternal(ObjectOutput out) throws IOException {
-            fail("Entry filter should not be marshaled.");
+        @Override public void onRemoved(Iterable<CacheEntryEvent<? extends QueryTestKey,
+            ? extends QueryTestValue>> evts) throws CacheEntryListenerException {
+            onEvents(evts);
         }
 
         /** {@inheritDoc} */
-        @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
-            fail("Entry filter should not be marshaled.");
+        @Override public void onUpdated(Iterable<CacheEntryEvent<? extends QueryTestKey,
+            ? extends QueryTestValue>> evts) throws CacheEntryListenerException {
+            onEvents(evts);
         }
 
         /**
-         * @return {@code True} if value is even.
+         * @param evts Events.
          */
-        public static boolean isAccepted(QueryTestValue val) {
-            return val == null ? true : val.val1 % 2 == 0;
+        protected abstract void onEvents(Iterable<CacheEntryEvent<? extends QueryTestKey,
+            ? extends QueryTestValue>> evts);
+
+        /** {@inheritDoc} */
+        @Override public void writeExternal(ObjectOutput out) throws IOException {
+            throw new UnsupportedOperationException("Failed. Listener should not be marshaled.");
         }
-    }
 
-    /**
-     *
-     */
-    protected static class FilterFactory implements Factory<NonSerializableFilter> {
-        @Override public NonSerializableFilter create() {
-            return new NonSerializableFilter();
+        /** {@inheritDoc} */
+        @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+            throw new UnsupportedOperationException("Failed. Listener should not be unmarshaled.");
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/9ad476b2/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryOperationP2PTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryOperationP2PTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryOperationP2PTest.java
new file mode 100644
index 0000000..ff8d0a7
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryOperationP2PTest.java
@@ -0,0 +1,394 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.query.continuous;
+
+import java.io.Serializable;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
+import javax.cache.configuration.Factory;
+import javax.cache.configuration.FactoryBuilder;
+import javax.cache.configuration.MutableCacheEntryListenerConfiguration;
+import javax.cache.event.CacheEntryEvent;
+import javax.cache.event.CacheEntryEventFilter;
+import javax.cache.event.CacheEntryListener;
+import javax.cache.event.CacheEntryListenerException;
+import javax.cache.event.CacheEntryUpdatedListener;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.cache.CacheEntryEventSerializableFilter;
+import org.apache.ignite.cache.CacheMemoryMode;
+import org.apache.ignite.cache.CacheMode;
+import org.apache.ignite.cache.query.ContinuousQuery;
+import org.apache.ignite.cache.query.QueryCursor;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+
+import static org.apache.ignite.cache.CacheAtomicWriteOrderMode.PRIMARY;
+import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC;
+import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
+import static org.apache.ignite.cache.CacheMemoryMode.ONHEAP_TIERED;
+import static org.apache.ignite.cache.CacheMode.PARTITIONED;
+import static org.apache.ignite.cache.CacheMode.REPLICATED;
+import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
+
+/**
+ *
+ */
+public class CacheContinuousQueryOperationP2PTest extends GridCommonAbstractTest {
+    /** */
+    private static TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true);
+
+    /** */
+    private static final int NODES = 5;
+
+    /** */
+    private static final int KEYS = 50;
+
+    /** */
+    private static final int VALS = 10;
+
+    /** */
+    public static final int ITERATION_CNT = 100;
+
+    /** */
+    private boolean client;
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+        ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(ipFinder);
+
+        cfg.setClientMode(client);
+        cfg.setPeerClassLoadingEnabled(true);
+
+        return cfg;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTestsStarted() throws Exception {
+        super.beforeTestsStarted();
+
+        startGridsMultiThreaded(NODES - 1);
+
+        client = true;
+
+        startGrid(NODES - 1);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTestsStopped() throws Exception {
+        stopAllGrids();
+
+        super.afterTestsStopped();
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testAtomicClient() throws Exception {
+        CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED,
+            1,
+            ATOMIC,
+            ONHEAP_TIERED
+        );
+
+        testContinuousQuery(ccfg, true);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testAtomic() throws Exception {
+        CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED,
+            1,
+            ATOMIC,
+            ONHEAP_TIERED
+        );
+
+        testContinuousQuery(ccfg, false);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testAtomicReplicated() throws Exception {
+        CacheConfiguration<Object, Object> ccfg = cacheConfiguration(REPLICATED,
+            0,
+            ATOMIC,
+            ONHEAP_TIERED
+        );
+
+        testContinuousQuery(ccfg, false);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testAtomicReplicatedClient() throws Exception {
+        CacheConfiguration<Object, Object> ccfg = cacheConfiguration(REPLICATED,
+            0,
+            ATOMIC,
+            ONHEAP_TIERED
+        );
+
+        testContinuousQuery(ccfg, true);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testTx() throws Exception {
+        CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED,
+            1,
+            TRANSACTIONAL,
+            ONHEAP_TIERED
+        );
+
+        testContinuousQuery(ccfg, false);
+    }
+    /**
+     * @throws Exception If failed.
+     */
+    public void testTxClient() throws Exception {
+        CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED,
+            1,
+            TRANSACTIONAL,
+            ONHEAP_TIERED
+        );
+
+        testContinuousQuery(ccfg, true);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testTxReplicated() throws Exception {
+        CacheConfiguration<Object, Object> ccfg = cacheConfiguration(REPLICATED,
+            0,
+            TRANSACTIONAL,
+            ONHEAP_TIERED
+        );
+
+        testContinuousQuery(ccfg, false);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testTxReplicatedClient() throws Exception {
+        CacheConfiguration<Object, Object> ccfg = cacheConfiguration(REPLICATED,
+            0,
+            TRANSACTIONAL,
+            ONHEAP_TIERED
+        );
+
+        testContinuousQuery(ccfg, true);
+    }
+
+    /**
+     * @param ccfg Cache configuration.
+     * @param isClient Client.
+     * @throws Exception If failed.
+     */
+    protected void testContinuousQuery(CacheConfiguration<Object, Object> ccfg, boolean isClient)
+        throws Exception {
+        ignite(0).createCache(ccfg);
+
+        ThreadLocalRandom rnd = ThreadLocalRandom.current();
+
+        QueryCursor<?> cur = null;
+
+        final Class<Factory<CacheEntryEventFilter>> evtFilterFactory =
+            (Class<Factory<CacheEntryEventFilter>>)getExternalClassLoader().
+                loadClass("org.apache.ignite.tests.p2p.CacheDeploymentEntryEventFilterFactory");
+
+        final CountDownLatch latch = new CountDownLatch(10);
+
+        ContinuousQuery<Integer, Integer> qry = new ContinuousQuery<>();
+
+        CacheEntryUpdatedListener<Integer, Integer> localLsnr = new CacheEntryUpdatedListener<Integer, Integer>() {
+            @Override public void onUpdated(Iterable<CacheEntryEvent<? extends Integer,
+                ? extends Integer>> evts) throws CacheEntryListenerException {
+                for (CacheEntryEvent<? extends Integer, ? extends Integer> evt : evts)
+                    latch.countDown();
+            }
+        };
+
+        MutableCacheEntryListenerConfiguration<Integer, Integer> lsnrCfg =
+            new MutableCacheEntryListenerConfiguration<>(
+                new FactoryBuilder.SingletonFactory<>(localLsnr),
+                (Factory<? extends CacheEntryEventFilter<? super Integer, ? super Integer>>)
+                    (Object)evtFilterFactory.newInstance(),
+                true,
+                true
+            );
+
+        qry.setLocalListener(localLsnr);
+
+        qry.setRemoteFilterFactory(
+            (Factory<? extends CacheEntryEventFilter<Integer, Integer>>)(Object)evtFilterFactory.newInstance());
+
+        IgniteCache<Integer, Integer> cache = null;
+
+        try {
+            if (isClient)
+                cache = grid(NODES - 1).cache(ccfg.getName());
+            else
+                cache = grid(rnd.nextInt(NODES - 1)).cache(ccfg.getName());
+
+            //cur = cache.query(qry);
+
+            cache.registerCacheEntryListener(lsnrCfg);
+
+            for (int i = 0; i < 10; i++)
+                cache.put(i, i);
+
+            assertTrue(latch.await(3, TimeUnit.SECONDS));
+        }
+        finally {
+            if (cur != null)
+                cur.close();
+
+            if (cache != null)
+                cache.deregisterCacheEntryListener(lsnrCfg);
+        }
+    }
+
+    /**
+     *
+     * @param cacheMode Cache mode.
+     * @param backups Number of backups.
+     * @param atomicityMode Cache atomicity mode.
+     * @param memoryMode Cache memory mode.
+     * @return Cache configuration.
+     */
+    private CacheConfiguration<Object, Object> cacheConfiguration(
+        CacheMode cacheMode,
+        int backups,
+        CacheAtomicityMode atomicityMode,
+        CacheMemoryMode memoryMode) {
+        CacheConfiguration<Object, Object> ccfg = new CacheConfiguration<>();
+
+        ccfg.setAtomicityMode(atomicityMode);
+        ccfg.setCacheMode(cacheMode);
+        ccfg.setMemoryMode(memoryMode);
+        ccfg.setWriteSynchronizationMode(FULL_SYNC);
+        ccfg.setAtomicWriteOrderMode(PRIMARY);
+
+        if (cacheMode == PARTITIONED)
+            ccfg.setBackups(backups);
+
+        return ccfg;
+    }
+
+    /**
+     *
+     */
+    public static class QueryTestKey implements Serializable, Comparable {
+        /** */
+        private final Integer key;
+
+        /**
+         * @param key Key.
+         */
+        public QueryTestKey(Integer key) {
+            this.key = key;
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean equals(Object o) {
+            if (this == o)
+                return true;
+
+            if (o == null || getClass() != o.getClass())
+                return false;
+
+            QueryTestKey that = (QueryTestKey)o;
+
+            return key.equals(that.key);
+        }
+
+        /** {@inheritDoc} */
+        @Override public int hashCode() {
+            return key.hashCode();
+        }
+
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            return S.toString(QueryTestKey.class, this);
+        }
+
+        /** {@inheritDoc} */
+        @Override public int compareTo(Object o) {
+            return key - ((QueryTestKey)o).key;
+        }
+    }
+
+    /**
+     *
+     */
+    public static class QueryTestValue implements Serializable {
+        /** */
+        protected final Integer val1;
+
+        /** */
+        protected final String val2;
+
+        /**
+         * @param val Value.
+         */
+        public QueryTestValue(Integer val) {
+            this.val1 = val;
+            this.val2 = String.valueOf(val);
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean equals(Object o) {
+            if (this == o)
+                return true;
+
+            if (o == null || getClass() != o.getClass())
+                return false;
+
+            QueryTestValue that = (QueryTestValue) o;
+
+            return val1.equals(that.val1) && val2.equals(that.val2);
+        }
+
+        /** {@inheritDoc} */
+        @Override public int hashCode() {
+            int res = val1.hashCode();
+
+            res = 31 * res + val2.hashCode();
+
+            return res;
+        }
+
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            return S.toString(QueryTestValue.class, this);
+        }
+    }
+}