You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by dg...@apache.org on 2019/07/02 13:47:20 UTC

[ignite] branch master updated: IGNITE-3653 P2P doesn't work for remote filter and filter factory - Fixes #4566.

This is an automated email from the ASF dual-hosted git repository.

dgovorukhin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/master by this push:
     new 2cea7e2  IGNITE-3653 P2P doesn't work for remote filter and filter factory - Fixes #4566.
2cea7e2 is described below

commit 2cea7e245927e8e54b93f7adea7524ce68c6d226
Author: Denis Mekhanikov <dm...@gmail.com>
AuthorDate: Tue Jul 2 16:47:07 2019 +0300

    IGNITE-3653 P2P doesn't work for remote filter and filter factory - Fixes #4566.
    
    Signed-off-by: Dmitriy Govorukhin <dm...@gmail.com>
---
 .../apache/ignite/cache/query/ContinuousQuery.java |  50 ++--
 .../ignite/internal/GridEventConsumeHandler.java   |  59 ++++-
 .../ignite/internal/GridMessageListenHandler.java  |  56 ++---
 .../apache/ignite/internal/IgniteEventsImpl.java   |   3 +
 .../ignite/internal/IgniteMessagingImpl.java       |   3 +
 .../CacheContinuousQueryDeployableObject.java      |   8 +
 .../continuous/CacheContinuousQueryHandler.java    | 260 +++++++++++++++-----
 .../continuous/CacheContinuousQueryHandlerV2.java  |  20 +-
 .../continuous/CacheContinuousQueryHandlerV3.java  |  22 +-
 .../continuous/CacheContinuousQueryListener.java   |   5 -
 .../continuous/CacheContinuousQueryManager.java    |  20 +-
 .../continuous/GridContinuousProcessor.java        | 266 +++++++++++----------
 .../CacheContinuousQueryLongP2PTest.java           | 154 ++++++++++++
 .../CacheContinuousQueryOperationP2PTest.java      | 166 ++++++++++---
 ...ousQueryDeserializationErrorOnNodeJoinTest.java | 110 +++++++++
 .../ContinuousQueryPeerClassLoadingTest.java       |  11 +-
 ...GridCacheContinuousQueryNodesFilteringTest.java |  20 +-
 .../testsuites/IgniteCacheQuerySelfTestSuite6.java |   2 +
 .../zk/ZookeeperDiscoverySpiTestSuite3.java        |   6 +-
 19 files changed, 908 insertions(+), 333 deletions(-)

diff --git a/modules/core/src/main/java/org/apache/ignite/cache/query/ContinuousQuery.java b/modules/core/src/main/java/org/apache/ignite/cache/query/ContinuousQuery.java
index 0d1444b..f898365 100644
--- a/modules/core/src/main/java/org/apache/ignite/cache/query/ContinuousQuery.java
+++ b/modules/core/src/main/java/org/apache/ignite/cache/query/ContinuousQuery.java
@@ -30,22 +30,22 @@ import org.apache.ignite.lang.IgniteAsyncCallback;
 /**
  * API for configuring continuous cache queries.
  * <p>
- * Continuous queries allow to register a remote filter and a local listener
+ * Continuous queries allow registering a remote filter and a local listener
  * for cache updates. If an update event passes the filter, it will be sent to
- * the node that executed the query and local listener will be notified.
+ * the node that executed the query, and local listener will be notified.
  * <p>
- * Additionally, you can execute initial query to get currently existing data.
+ * Additionally, you can execute an initial query to get currently existing data.
  * Query can be of any type (SQL, TEXT or SCAN) and can be set via {@link #setInitialQuery(Query)}
  * method.
  * <p>
  * Query can be executed either on all nodes in topology using {@link IgniteCache#query(Query)}
  * method, or only on the local node, if {@link Query#setLocal(boolean)} parameter is set to {@code true}.
- * Note that in case query is distributed and a new node joins, it will get the remote
- * filter for the query during discovery process before it actually joins topology,
+ * Note that if the query is distributed and a new node joins, it will get the remote
+ * filter for the query during discovery process before it actually joins a topology,
  * so no updates will be missed.
  * <h1 class="header">Example</h1>
- * As an example, suppose we have cache with {@code 'Person'} objects and we need
- * to query all persons with salary above 1000.
+ * As an example, suppose we have a cache with {@code 'Person'} objects and we need
+ * to query for all people with salary above 1000.
  * <p>
  * Here is the {@code Person} class:
  * <pre name="code" class="java">
@@ -60,17 +60,17 @@ import org.apache.ignite.lang.IgniteAsyncCallback;
  * }
  * </pre>
  * <p>
- * You can create and execute continuous query like so:
+ * You can create and execute a continuous query like so:
  * <pre name="code" class="java">
- * // Create new continuous query.
+ * // Create a new continuous query.
  * ContinuousQuery&lt;Long, Person&gt; qry = new ContinuousQuery&lt;&gt;();
  *
- * // Initial iteration query will return all persons with salary above 1000.
+ * // Initial iteration query will return all people with salary above 1000.
  * qry.setInitialQuery(new ScanQuery&lt;&gt;((id, p) -> p.getSalary() &gt; 1000));
  *
  *
  * // Callback that is called locally when update notifications are received.
- * // It simply prints out information about all created persons.
+ * // It simply prints out information about all created or modified records.
  * qry.setLocalListener((evts) -> {
  *     for (CacheEntryEvent&lt;? extends Long, ? extends Person&gt; e : evts) {
  *         Person p = e.getValue();
@@ -79,29 +79,29 @@ import org.apache.ignite.lang.IgniteAsyncCallback;
  *     }
  * });
  *
- * // Continuous listener will be notified for persons with salary above 1000.
+ * // The continuous listener will be notified for people with salary above 1000.
  * qry.setRemoteFilter(evt -> evt.getValue().getSalary() &gt; 1000);
  *
- * // Execute query and get cursor that iterates through initial data.
+ * // Execute the query and get a cursor that iterates through the initial data.
  * QueryCursor&lt;Cache.Entry&lt;Long, Person&gt;&gt; cur = cache.query(qry);
  * </pre>
- * This will execute query on all nodes that have cache you are working with and
- * listener will start to receive notifications for cache updates.
+ * This will execute query on all nodes that have the cache you are working with and
+ * listener will start receiving notifications for cache updates.
  * <p>
  * To stop receiving updates call {@link QueryCursor#close()} method:
  * <pre name="code" class="java">
  * cur.close();
  * </pre>
- * Note that this works even if you didn't provide initial query. Cursor will
+ * Note that this works even if you didn't provide the initial query. Cursor will
  * be empty in this case, but it will still unregister listeners when {@link QueryCursor#close()}
  * is called.
  * <p>
  * {@link IgniteAsyncCallback} annotation is supported for {@link CacheEntryEventFilter}
  * (see {@link #setRemoteFilterFactory(Factory)}) and {@link CacheEntryUpdatedListener}
  * (see {@link #setLocalListener(CacheEntryUpdatedListener)}).
- * If filter and/or listener are annotated with {@link IgniteAsyncCallback} then annotated callback
- * is executed in async callback pool (see {@link IgniteConfiguration#getAsyncCallbackPoolSize()})
- * and notification order is kept the same as update order for given cache key.
+ * If a filter and/or listener are annotated with {@link IgniteAsyncCallback} then the annotated callback
+ * is executed in an async callback pool (see {@link IgniteConfiguration#getAsyncCallbackPoolSize()})
+ * and a notification order is kept the same as an update order for a given cache key.
  *
  * @see ContinuousQueryWithTransformer
  * @see IgniteAsyncCallback
@@ -130,10 +130,10 @@ public final class ContinuousQuery<K, V> extends AbstractContinuousQuery<K, V> {
     }
 
     /**
-     * Sets local callback. This callback is called only in local node when new updates are received.
+     * Sets a local callback. This callback is called only on local node when new updates are received.
      * <p>
-     * The callback predicate accepts ID of the node from where updates are received and collection
-     * of received entries. Note that for removed entries value will be {@code null}.
+     * The callback predicate accepts ID of the node from where updates are received and a collection
+     * of the received entries. Note that for removed entries values will be {@code null}.
      * <p>
      * If the predicate returns {@code false}, query execution will be cancelled.
      * <p>
@@ -141,7 +141,7 @@ public final class ContinuousQuery<K, V> extends AbstractContinuousQuery<K, V> {
      * synchronization or transactional cache operations), should be executed asynchronously without
      * blocking the thread that called the callback. Otherwise, you can get deadlocks.
      * <p>
-     * If local listener are annotated with {@link IgniteAsyncCallback} then it is executed in async callback pool
+     * If local listener are annotated with {@link IgniteAsyncCallback} then it is executed in an async callback pool
      * (see {@link IgniteConfiguration#getAsyncCallbackPoolSize()}) that allow to perform a cache operations.
      *
      * @param locLsnr Local callback.
@@ -157,8 +157,6 @@ public final class ContinuousQuery<K, V> extends AbstractContinuousQuery<K, V> {
     }
 
     /**
-     * Gets local listener.
-     *
      * @return Local listener.
      */
     public CacheEntryUpdatedListener<K, V> getLocalListener() {
@@ -214,7 +212,7 @@ public final class ContinuousQuery<K, V> extends AbstractContinuousQuery<K, V> {
     }
 
     /**
-     * Sets whether this query should be executed on local node only.
+     * Sets whether this query should be executed on a local node only.
      *
      * Note: backup event queues are not kept for local continuous queries. It may lead to loss of notifications in case
      * of node failures. Use {@link ContinuousQuery#setRemoteFilterFactory(Factory)} to register cache event listeners
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridEventConsumeHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/GridEventConsumeHandler.java
index e15cdd0..017a1b8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/GridEventConsumeHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/GridEventConsumeHandler.java
@@ -45,6 +45,8 @@ import org.apache.ignite.internal.processors.continuous.GridContinuousBatch;
 import org.apache.ignite.internal.processors.continuous.GridContinuousBatchAdapter;
 import org.apache.ignite.internal.processors.continuous.GridContinuousHandler;
 import org.apache.ignite.internal.processors.platform.PlatformEventFilterListener;
+import org.apache.ignite.internal.util.future.GridFinishedFuture;
+import org.apache.ignite.internal.util.future.GridFutureAdapter;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.P2;
 import org.apache.ignite.internal.util.typedef.T2;
@@ -92,6 +94,9 @@ class GridEventConsumeHandler implements GridContinuousHandler {
     /** Listener. */
     private GridLocalEventListener lsnr;
 
+    /** P2P unmarshalling future. */
+    private IgniteInternalFuture<Void> p2pUnmarshalFut = new GridFinishedFuture<>();
+
     /**
      * Required by {@link Externalizable}.
      */
@@ -147,6 +152,21 @@ class GridEventConsumeHandler implements GridContinuousHandler {
         return Collections.emptyMap();
     }
 
+    /**
+     * Performs remote filter initialization.
+     *
+     * @param filter Remote filter.
+     * @param ctx Kernal context.
+     * @throws IgniteCheckedException In case if initialization failed.
+     */
+    private void initFilter(IgnitePredicate<Event> filter, GridKernalContext ctx) throws IgniteCheckedException {
+        if (filter != null)
+            ctx.resource().injectGeneric(filter);
+
+        if (filter instanceof PlatformEventFilterListener)
+            ((PlatformEventFilterListener)filter).initialize(ctx);
+    }
+
     /** {@inheritDoc} */
     @Override public RegisterStatus register(final UUID nodeId, final UUID routineId, final GridKernalContext ctx)
         throws IgniteCheckedException {
@@ -157,12 +177,6 @@ class GridEventConsumeHandler implements GridContinuousHandler {
         if (cb != null)
             ctx.resource().injectGeneric(cb);
 
-        if (filter != null)
-            ctx.resource().injectGeneric(filter);
-
-        if (filter instanceof PlatformEventFilterListener)
-            ((PlatformEventFilterListener)filter).initialize(ctx);
-
         final boolean loc = nodeId.equals(ctx.localNodeId());
 
         lsnr = new GridLocalEventListener() {
@@ -262,7 +276,18 @@ class GridEventConsumeHandler implements GridContinuousHandler {
         if (F.isEmpty(types))
             types = EVTS_ALL;
 
-        ctx.event().addLocalEventListener(lsnr, types);
+        p2pUnmarshalFut.listen((fut) -> {
+            if (fut.error() == null) {
+                try {
+                    initFilter(filter, ctx);
+                }
+                catch (IgniteCheckedException e) {
+                    throw F.wrap(e);
+                }
+
+                ctx.event().addLocalEventListener(lsnr, types);
+            }
+        });
 
         return RegisterStatus.REGISTERED;
     }
@@ -387,13 +412,22 @@ class GridEventConsumeHandler implements GridContinuousHandler {
         assert ctx.config().isPeerClassLoadingEnabled();
 
         if (filterBytes != null) {
-            GridDeployment dep = ctx.deploy().getGlobalDeployment(depInfo.deployMode(), clsName, clsName,
-                depInfo.userVersion(), nodeId, depInfo.classLoaderId(), depInfo.participants(), null);
+            try {
+                GridDeployment dep = ctx.deploy().getGlobalDeployment(depInfo.deployMode(), clsName, clsName,
+                    depInfo.userVersion(), nodeId, depInfo.classLoaderId(), depInfo.participants(), null);
 
-            if (dep == null)
-                throw new IgniteDeploymentCheckedException("Failed to obtain deployment for class: " + clsName);
+                if (dep == null)
+                    throw new IgniteDeploymentCheckedException("Failed to obtain deployment for class: " + clsName);
+
+                filter = U.unmarshal(ctx, filterBytes, U.resolveClassLoader(dep.classLoader(), ctx.config()));
 
-            filter = U.unmarshal(ctx, filterBytes, U.resolveClassLoader(dep.classLoader(), ctx.config()));
+                ((GridFutureAdapter)p2pUnmarshalFut).onDone();
+            }
+            catch (IgniteCheckedException e) {
+                ((GridFutureAdapter)p2pUnmarshalFut).onDone(e);
+
+                throw e;
+            }
         }
     }
 
@@ -454,6 +488,7 @@ class GridEventConsumeHandler implements GridContinuousHandler {
         boolean b = in.readBoolean();
 
         if (b) {
+            p2pUnmarshalFut = new GridFutureAdapter<>();
             filterBytes = U.readByteArray(in);
             clsName = U.readString(in);
             depInfo = (GridDeploymentInfo)in.readObject();
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridMessageListenHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/GridMessageListenHandler.java
index a98ed50..83de474 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/GridMessageListenHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/GridMessageListenHandler.java
@@ -26,12 +26,15 @@ import java.util.Collections;
 import java.util.Map;
 import java.util.UUID;
 import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteException;
 import org.apache.ignite.internal.managers.deployment.GridDeployment;
 import org.apache.ignite.internal.managers.deployment.GridDeploymentInfoBean;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.continuous.GridContinuousBatch;
 import org.apache.ignite.internal.processors.continuous.GridContinuousBatchAdapter;
 import org.apache.ignite.internal.processors.continuous.GridContinuousHandler;
+import org.apache.ignite.internal.util.future.GridFinishedFuture;
+import org.apache.ignite.internal.util.future.GridFutureAdapter;
 import org.apache.ignite.internal.util.lang.GridPeerDeployAware;
 import org.apache.ignite.internal.util.typedef.T2;
 import org.apache.ignite.internal.util.typedef.internal.S;
@@ -67,6 +70,9 @@ public class GridMessageListenHandler implements GridContinuousHandler {
     /** */
     private boolean depEnabled;
 
+    /** P2P unmarshalling future. */
+    private IgniteInternalFuture<Void> p2pUnmarshalFut = new GridFinishedFuture<>();
+
     /**
      * Required by {@link Externalizable}.
      */
@@ -85,22 +91,6 @@ public class GridMessageListenHandler implements GridContinuousHandler {
         this.pred = pred;
     }
 
-    /**
-     *
-     * @param orig Handler to be copied.
-     */
-    public GridMessageListenHandler(GridMessageListenHandler orig) {
-        assert orig != null;
-
-        this.clsName = orig.clsName;
-        this.depInfo = orig.depInfo;
-        this.pred = orig.pred;
-        this.predBytes = orig.predBytes;
-        this.topic = orig.topic;
-        this.topicBytes = orig.topicBytes;
-        this.depEnabled = false;
-    }
-
     /** {@inheritDoc} */
     @Override public boolean isEvents() {
         return false;
@@ -138,9 +128,11 @@ public class GridMessageListenHandler implements GridContinuousHandler {
     }
 
     /** {@inheritDoc} */
-    @Override public RegisterStatus register(UUID nodeId, UUID routineId, final GridKernalContext ctx)
-        throws IgniteCheckedException {
-        ctx.io().addUserMessageListener(topic, pred, nodeId);
+    @Override public RegisterStatus register(UUID nodeId, UUID routineId, final GridKernalContext ctx) {
+        p2pUnmarshalFut.listen((fut) -> {
+            if (fut.error() == null)
+                ctx.io().addUserMessageListener(topic, pred, nodeId);
+        });
 
         return RegisterStatus.REGISTERED;
     }
@@ -186,18 +178,27 @@ public class GridMessageListenHandler implements GridContinuousHandler {
         assert ctx != null;
         assert ctx.config().isPeerClassLoadingEnabled();
 
-        GridDeployment dep = ctx.deploy().getGlobalDeployment(depInfo.deployMode(), clsName, clsName,
-            depInfo.userVersion(), nodeId, depInfo.classLoaderId(), depInfo.participants(), null);
+        try {
+            GridDeployment dep = ctx.deploy().getGlobalDeployment(depInfo.deployMode(), clsName, clsName,
+                depInfo.userVersion(), nodeId, depInfo.classLoaderId(), depInfo.participants(), null);
 
-        if (dep == null)
-            throw new IgniteDeploymentCheckedException("Failed to obtain deployment for class: " + clsName);
+            if (dep == null)
+                throw new IgniteDeploymentCheckedException("Failed to obtain deployment for class: " + clsName);
+
+            ClassLoader ldr = dep.classLoader();
 
-        ClassLoader ldr = dep.classLoader();
+            if (topicBytes != null)
+                topic = U.unmarshal(ctx, topicBytes, U.resolveClassLoader(ldr, ctx.config()));
 
-        if (topicBytes != null)
-            topic = U.unmarshal(ctx, topicBytes, U.resolveClassLoader(ldr, ctx.config()));
+            pred = U.unmarshal(ctx, predBytes, U.resolveClassLoader(ldr, ctx.config()));
+        }
+        catch (IgniteCheckedException | IgniteException e) {
+            ((GridFutureAdapter)p2pUnmarshalFut).onDone(e);
+
+            throw e;
+        }
 
-        pred = U.unmarshal(ctx, predBytes, U.resolveClassLoader(ldr, ctx.config()));
+        ((GridFutureAdapter)p2pUnmarshalFut).onDone();
     }
 
     /** {@inheritDoc} */
@@ -256,6 +257,7 @@ public class GridMessageListenHandler implements GridContinuousHandler {
         depEnabled = in.readBoolean();
 
         if (depEnabled) {
+            p2pUnmarshalFut = new GridFutureAdapter<>();
             topicBytes = U.readByteArray(in);
             predBytes = U.readByteArray(in);
             clsName = U.readString(in);
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteEventsImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteEventsImpl.java
index c91792d..d683ecb 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteEventsImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteEventsImpl.java
@@ -172,6 +172,9 @@ public class IgniteEventsImpl extends AsyncSupportAdapter<IgniteEvents> implemen
                 autoUnsubscribe,
                 prj.predicate()));
         }
+        catch (IgniteCheckedException e) {
+            throw U.convertException(e);
+        }
         finally {
             unguard();
         }
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteMessagingImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteMessagingImpl.java
index e86e83a..6719379 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteMessagingImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteMessagingImpl.java
@@ -241,6 +241,9 @@ public class IgniteMessagingImpl extends AsyncSupportAdapter<IgniteMessaging>
                 false,
                 prj.predicate()));
         }
+        catch (IgniteCheckedException e) {
+            throw U.convertException(e);
+        }
         finally {
             unguard();
         }
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryDeployableObject.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryDeployableObject.java
index f888467..c4e4005 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryDeployableObject.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryDeployableObject.java
@@ -28,6 +28,8 @@ import org.apache.ignite.internal.IgniteDeploymentCheckedException;
 import org.apache.ignite.internal.managers.deployment.GridDeployment;
 import org.apache.ignite.internal.managers.deployment.GridDeploymentInfo;
 import org.apache.ignite.internal.managers.deployment.GridDeploymentInfoBean;
+import org.apache.ignite.internal.util.tostring.GridToStringExclude;
+import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.U;
 
 /**
@@ -38,6 +40,7 @@ class CacheContinuousQueryDeployableObject implements Externalizable {
     private static final long serialVersionUID = 0L;
 
     /** Serialized object. */
+    @GridToStringExclude
     private byte[] bytes;
 
     /** Deployment class name. */
@@ -107,4 +110,9 @@ class CacheContinuousQueryDeployableObject implements Externalizable {
         clsName = U.readString(in);
         depInfo = (GridDeploymentInfo)in.readObject();
     }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(CacheContinuousQueryDeployableObject.class, this);
+    }
 }
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
index 19eb3bb..34f66b2 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
@@ -33,10 +33,12 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import javax.cache.event.CacheEntryEvent;
 import javax.cache.event.CacheEntryEventFilter;
+import javax.cache.event.CacheEntryListener;
 import javax.cache.event.CacheEntryUpdatedListener;
 import javax.cache.event.EventType;
 import org.apache.ignite.IgniteCache;
 import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteException;
 import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.IgniteSystemProperties;
 import org.apache.ignite.cache.CacheEntryEventSerializableFilter;
@@ -54,8 +56,8 @@ import org.apache.ignite.internal.processors.cache.GridCacheAdapter;
 import org.apache.ignite.internal.processors.cache.GridCacheAffinityManager;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
 import org.apache.ignite.internal.processors.cache.GridCacheDeploymentManager;
-import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtLocalPartition;
 import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicAbstractUpdateFuture;
+import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtLocalPartition;
 import org.apache.ignite.internal.processors.cache.query.CacheQueryType;
 import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryManager.JCacheQueryLocalListener;
 import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryManager.JCacheQueryRemoteFilter;
@@ -63,6 +65,8 @@ import org.apache.ignite.internal.processors.continuous.GridContinuousBatch;
 import org.apache.ignite.internal.processors.continuous.GridContinuousHandler;
 import org.apache.ignite.internal.processors.continuous.GridContinuousQueryBatch;
 import org.apache.ignite.internal.processors.platform.cache.query.PlatformContinuousQueryFilter;
+import org.apache.ignite.internal.util.future.GridFinishedFuture;
+import org.apache.ignite.internal.util.future.GridFutureAdapter;
 import org.apache.ignite.internal.util.typedef.CI1;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.T2;
@@ -114,6 +118,12 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
     /** Topic for ordered messages. */
     private Object topic;
 
+    /** P2P unmarshalling future. */
+    protected transient IgniteInternalFuture<Void> p2pUnmarshalFut = new GridFinishedFuture<>();
+
+    /** Initialization future. */
+    protected transient IgniteInternalFuture<Void> initFut;
+
     /** Local listener. */
     private transient CacheEntryUpdatedListener<K, V> locLsnr;
 
@@ -323,37 +333,26 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
         assert routineId != null;
         assert ctx != null;
 
-        if (locLsnr != null) {
-            if (locLsnr instanceof JCacheQueryLocalListener) {
-                ctx.resource().injectGeneric(((JCacheQueryLocalListener)locLsnr).impl);
-
-                asyncCb = ((JCacheQueryLocalListener)locLsnr).async();
-            }
-            else {
-                ctx.resource().injectGeneric(locLsnr);
-
-                asyncCb = U.hasAnnotation(locLsnr, IgniteAsyncCallback.class);
-            }
-        }
-
-        final CacheEntryEventFilter filter = getEventFilter();
+        initLocalListener(locLsnr, ctx);
 
-        if (filter != null) {
-            if (filter instanceof JCacheQueryRemoteFilter) {
-                if (((JCacheQueryRemoteFilter)filter).impl != null)
-                    ctx.resource().injectGeneric(((JCacheQueryRemoteFilter)filter).impl);
+        if (initFut == null) {
+            initFut = p2pUnmarshalFut.chain((fut) -> {
+                try {
+                    fut.get();
 
-                if (!asyncCb)
-                    asyncCb = ((JCacheQueryRemoteFilter)filter).async();
-            }
-            else {
-                ctx.resource().injectGeneric(filter);
+                    initRemoteFilter(getEventFilter0(), ctx);
+                }
+                catch (IgniteCheckedException e) {
+                    throw new IgniteException("Failed to initialize a remote filter.", e);
+                }
 
-                if (!asyncCb)
-                    asyncCb = U.hasAnnotation(filter, IgniteAsyncCallback.class);
-            }
+                return null;
+            });
         }
 
+        if (initFut.error() != null)
+            throw new IgniteCheckedException("Failed to initialize a continuous query.", initFut.error());
+
         entryBufs = new ConcurrentHashMap<>();
 
         ackBuf = new CacheContinuousQueryAcknowledgeBuffer();
@@ -373,29 +372,6 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
         log = ctx.log(CU.CONTINUOUS_QRY_LOG_CATEGORY);
 
         CacheContinuousQueryListener<K, V> lsnr = new CacheContinuousQueryListener<K, V>() {
-            @Override public void onExecution() {
-                GridCacheContext<K, V> cctx = cacheContext(ctx);
-
-                if (cctx != null && cctx.events().isRecordable(EVT_CACHE_QUERY_EXECUTED)) {
-                    //noinspection unchecked
-                    ctx.event().record(new CacheQueryExecutedEvent<>(
-                        ctx.discovery().localNode(),
-                        "Continuous query executed.",
-                        EVT_CACHE_QUERY_EXECUTED,
-                        CacheQueryType.CONTINUOUS.name(),
-                        cacheName,
-                        null,
-                        null,
-                        null,
-                        filter instanceof CacheEntryEventSerializableFilter ?
-                            (CacheEntryEventSerializableFilter)filter : null,
-                        null,
-                        nodeId,
-                        taskName()
-                    ));
-                }
-            }
-
             @Override public void onRegister() {
                 GridCacheContext<K, V> cctx = cacheContext(ctx);
 
@@ -453,8 +429,20 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
             }
 
             @Override public void onUnregister() {
-                if (filter instanceof PlatformContinuousQueryFilter)
-                    ((PlatformContinuousQueryFilter)filter).onQueryUnregister();
+                try {
+                    CacheEntryEventFilter filter = getEventFilter();
+
+                    if (filter instanceof PlatformContinuousQueryFilter)
+                        ((PlatformContinuousQueryFilter)filter).onQueryUnregister();
+                }
+                catch (IgniteCheckedException e) {
+                    if (log.isDebugEnabled()) {
+                        log.debug("Failed to execute the onUnregister callback " +
+                            "on the continuoue query listener. " +
+                            "[nodeId=" + nodeId + ", routineId=" + routineId + ", cacheName=" + cacheName +
+                            ", err=" + e + "]");
+                    }
+                }
             }
 
             @Override public void cleanupBackupQueue(Map<Integer, Long> updateCntrs) {
@@ -637,13 +625,112 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
         if (mgr == null)
             return RegisterStatus.DELAYED;
 
-        return mgr.registerListener(routineId, lsnr, internal);
+        RegisterStatus regStatus = mgr.registerListener(routineId, lsnr, internal);
+
+        if (regStatus == RegisterStatus.REGISTERED)
+            initFut.listen(res -> sendQueryExecutedEvent());
+
+        return regStatus;
+    }
+
+    /**
+     * Fires continuous query execution event.
+     * @see org.apache.ignite.events.EventType#EVT_CACHE_QUERY_EXECUTED
+     */
+    private void sendQueryExecutedEvent() {
+        GridCacheContext<K, V> cctx = cacheContext(ctx);
+
+        CacheEntryEventFilter filter;
+        try {
+            filter = getEventFilter();
+        }
+        catch (IgniteCheckedException e) {
+            if (log.isDebugEnabled()) {
+                log.debug("Failed to trigger the continuoue query executed event. " +
+                    "[routineId=" + routineId + ", cacheName=" + cacheName + ", err=" + e + "]");
+            }
+
+            return;
+        }
+
+        if (cctx != null && cctx.events().isRecordable(EVT_CACHE_QUERY_EXECUTED)) {
+            //noinspection unchecked
+            ctx.event().record(new CacheQueryExecutedEvent<K, V>(
+                ctx.discovery().localNode(),
+                "Continuous query executed.",
+                EVT_CACHE_QUERY_EXECUTED,
+                CacheQueryType.CONTINUOUS.name(),
+                cacheName,
+                null,
+                null,
+                null,
+                filter instanceof CacheEntryEventSerializableFilter ?
+                    (CacheEntryEventSerializableFilter)filter : null,
+                null,
+                nodeId,
+                taskName()
+            ));
+        }
+    }
+
+    /**
+     * Performs resource injection and checks asynchrony for the provided local listener.
+     *
+     * @param lsnr Local listener.
+     * @param ctx Kernal context.
+     * @throws IgniteCheckedException If failed to perform resource injection.
+     */
+    private void initLocalListener(CacheEntryListener lsnr, GridKernalContext ctx) throws IgniteCheckedException {
+        if (lsnr != null) {
+            CacheEntryListener impl =
+                lsnr instanceof JCacheQueryLocalListener
+                    ? ((JCacheQueryLocalListener)lsnr).impl
+                    : lsnr;
+
+            ctx.resource().injectGeneric(impl);
+
+            asyncCb = U.hasAnnotation(impl, IgniteAsyncCallback.class);
+        }
+    }
+
+    /**
+     * Performs resource injection and checks asynchrony for the provided remote filter.
+     *
+     * @param filter Remote filter.
+     * @param ctx Kernal context.
+     * @throws IgniteCheckedException If failed to perform resource injection.
+     */
+    protected void initRemoteFilter(CacheEntryEventFilter filter, GridKernalContext ctx) throws IgniteCheckedException {
+        CacheEntryEventFilter impl =
+            filter instanceof JCacheQueryRemoteFilter
+                ? ((JCacheQueryRemoteFilter)filter).impl
+                : filter;
+
+        if (impl != null) {
+            ctx.resource().injectGeneric(impl);
+
+            if (!asyncCb)
+                asyncCb = U.hasAnnotation(impl, IgniteAsyncCallback.class);
+        }
+    }
+
+    /**
+     * @return Cache entry event filter.
+     *
+     * @throws IgniteCheckedException If P2P unmarshalling failed.
+     */
+    public CacheEntryEventFilter getEventFilter() throws IgniteCheckedException {
+        initFut.get();
+
+        return getEventFilter0();
     }
 
     /**
+     * Returns an event filter without waiting on the unmarshalling future.
+     *
      * @return Cache entry event filter.
      */
-    public CacheEntryEventFilter getEventFilter() {
+    protected CacheEntryEventFilter getEventFilter0() {
         return rmtFilter;
     }
 
@@ -868,7 +955,7 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
 
     /**
      * @param evt Query event.
-     * @return {@code True} if event passed filter otherwise {@code true}.
+     * @return {@code True} if event passed filter otherwise {@code false}.
      */
     public boolean filter(CacheContinuousQueryEvent evt) {
         CacheContinuousQueryEntry entry = evt.entry();
@@ -895,7 +982,8 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
      * @param loc Listener deployed on this node.
      * @param recordIgniteEvt Record ignite event.
      */
-    private void onEntryUpdate(CacheContinuousQueryEvent evt, boolean notify, boolean loc, boolean recordIgniteEvt) {
+    private void onEntryUpdate(CacheContinuousQueryEvent<K, V> evt,
+        boolean notify, boolean loc, boolean recordIgniteEvt) {
         try {
             GridCacheContext<K, V> cctx = cacheContext(ctx);
 
@@ -948,8 +1036,21 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
         }
 
         if (recordIgniteEvt && notify) {
+            CacheEntryEventFilter filter;
+            try {
+                filter = getEventFilter();
+            }
+            catch (IgniteCheckedException e) {
+                if (log.isDebugEnabled()) {
+                    log.debug("Failed to trigger a continuoue query event. " +
+                        "[routineId=" + routineId + ", cacheName=" + cacheName + ", err=" + e + "]");
+                }
+
+                return;
+            }
+
             //noinspection unchecked
-            ctx.event().record(new CacheQueryReadEvent<>(
+            ctx.event().record(new CacheQueryReadEvent<K, V>(
                 ctx.discovery().localNode(),
                 "Continuous query executed.",
                 EVT_CACHE_QUERY_OBJECT_READ,
@@ -958,8 +1059,8 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
                 null,
                 null,
                 null,
-                getEventFilter() instanceof CacheEntryEventSerializableFilter ?
-                    (CacheEntryEventSerializableFilter)getEventFilter() : null,
+                filter instanceof CacheEntryEventSerializableFilter ?
+                    (CacheEntryEventSerializableFilter)filter : null,
                 null,
                 nodeId,
                 taskName(),
@@ -1153,7 +1254,41 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
         assert ctx.config().isPeerClassLoadingEnabled();
 
         if (rmtFilterDep != null)
-            rmtFilter = rmtFilterDep.unmarshal(nodeId, ctx);
+            rmtFilter = p2pUnmarshal(rmtFilterDep, nodeId, ctx);
+
+        if (!p2pUnmarshalFut.isDone())
+            ((GridFutureAdapter)p2pUnmarshalFut).onDone();
+    }
+
+    /**
+     * @return Whether the handler is marshalled for peer class loading.
+     */
+    public boolean isMarshalled() {
+        return rmtFilter == null || U.isGrid(rmtFilter.getClass()) || rmtFilterDep != null;
+    }
+
+    /**
+     * @param depObj Deployable object to unmarshal.
+     * @param nodeId Sender node Id.
+     * @param ctx Kernal context.
+     * @param <T> Result type.
+     * @return Unmarshalled object.
+     * @throws IgniteCheckedException In case of unmarshalling failures.
+     */
+    protected <T> T p2pUnmarshal(CacheContinuousQueryDeployableObject depObj,
+        UUID nodeId, GridKernalContext ctx) throws IgniteCheckedException {
+        if (depObj != null) {
+            try {
+                return depObj.unmarshal(nodeId, ctx);
+            }
+            catch (IgniteCheckedException e) {
+                ((GridFutureAdapter)p2pUnmarshalFut).onDone(e);
+
+                throw e;
+            }
+        }
+        else
+            return null;
     }
 
     /** {@inheritDoc} */
@@ -1262,8 +1397,11 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
 
         boolean b = in.readBoolean();
 
-        if (b)
+        if (b) {
             rmtFilterDep = (CacheContinuousQueryDeployableObject)in.readObject();
+
+            p2pUnmarshalFut = new GridFutureAdapter<>();
+        }
         else
             rmtFilter = (CacheEntryEventSerializableFilter<K, V>)in.readObject();
 
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandlerV2.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandlerV2.java
index 86c1ae1..1d968c6 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandlerV2.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandlerV2.java
@@ -29,6 +29,7 @@ import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.internal.GridKernalContext;
 import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryManager.JCacheQueryRemoteFilter;
 import org.apache.ignite.internal.processors.continuous.GridContinuousHandler;
+import org.apache.ignite.internal.util.future.GridFutureAdapter;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.jetbrains.annotations.Nullable;
@@ -99,7 +100,7 @@ public class CacheContinuousQueryHandlerV2<K, V> extends CacheContinuousQueryHan
     }
 
     /** {@inheritDoc} */
-    @Override public CacheEntryEventFilter getEventFilter() {
+    @Override protected CacheEntryEventFilter getEventFilter0() {
         if (filter == null) {
             assert rmtFilterFactory != null;
 
@@ -124,10 +125,16 @@ public class CacheContinuousQueryHandlerV2<K, V> extends CacheContinuousQueryHan
 
     /** {@inheritDoc} */
     @Override public void p2pUnmarshal(UUID nodeId, GridKernalContext ctx) throws IgniteCheckedException {
+        if (rmtFilterFactoryDep != null)
+            rmtFilterFactory = p2pUnmarshal(rmtFilterFactoryDep, nodeId, ctx);
+
         super.p2pUnmarshal(nodeId, ctx);
+    }
 
-        if (rmtFilterFactoryDep != null)
-            rmtFilterFactory = rmtFilterFactoryDep.unmarshal(nodeId, ctx);
+    /** {@inheritDoc} */
+    @Override public boolean isMarshalled() {
+        return super.isMarshalled() &&
+            (rmtFilterFactory == null || U.isGrid(rmtFilterFactory.getClass()) || rmtFilterFactoryDep != null);
     }
 
     /** {@inheritDoc} */
@@ -163,9 +170,12 @@ public class CacheContinuousQueryHandlerV2<K, V> extends CacheContinuousQueryHan
 
         boolean b = in.readBoolean();
 
-        if (b)
+        if (b) {
             rmtFilterFactoryDep = (CacheContinuousQueryDeployableObject)in.readObject();
-        else
+
+            if (p2pUnmarshalFut.isDone())
+                p2pUnmarshalFut = new GridFutureAdapter<>();
+        } else
             rmtFilterFactory = (Factory)in.readObject();
 
         types = in.readByte();
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandlerV3.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandlerV3.java
index 0008cfc..552e87d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandlerV3.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandlerV3.java
@@ -28,6 +28,7 @@ import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.cache.query.ContinuousQueryWithTransformer;
 import org.apache.ignite.cache.query.ContinuousQueryWithTransformer.EventListener;
 import org.apache.ignite.internal.GridKernalContext;
+import org.apache.ignite.internal.util.future.GridFutureAdapter;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgniteAsyncCallback;
 import org.apache.ignite.lang.IgniteClosure;
@@ -114,11 +115,11 @@ public class CacheContinuousQueryHandlerV3<K, V> extends CacheContinuousQueryHan
     }
 
     /** {@inheritDoc} */
-    @Override public CacheEntryEventFilter<K, V> getEventFilter() {
+    @Override protected CacheEntryEventFilter getEventFilter0() {
         if (rmtFilterFactory == null)
             return null;
 
-        return super.getEventFilter();
+        return super.getEventFilter0();
     }
 
     /** {@inheritDoc} */
@@ -148,10 +149,16 @@ public class CacheContinuousQueryHandlerV3<K, V> extends CacheContinuousQueryHan
 
     /** {@inheritDoc} */
     @Override public void p2pUnmarshal(UUID nodeId, GridKernalContext ctx) throws IgniteCheckedException {
+        if (rmtTransFactoryDep != null)
+            rmtTransFactory = p2pUnmarshal(rmtTransFactoryDep, nodeId, ctx);
+
         super.p2pUnmarshal(nodeId, ctx);
+    }
 
-        if (rmtTransFactoryDep != null)
-            rmtTransFactory = rmtTransFactoryDep.unmarshal(nodeId, ctx);
+    /** {@inheritDoc} */
+    @Override public boolean isMarshalled() {
+        return super.isMarshalled() &&
+            (rmtTransFactory == null || U.isGrid(rmtTransFactory.getClass()) || rmtTransFactoryDep != null);
     }
 
     /** {@inheritDoc} */
@@ -174,9 +181,12 @@ public class CacheContinuousQueryHandlerV3<K, V> extends CacheContinuousQueryHan
 
         boolean b = in.readBoolean();
 
-        if (b)
+        if (b) {
             rmtTransFactoryDep = (CacheContinuousQueryDeployableObject)in.readObject();
-        else
+
+            if (p2pUnmarshalFut.isDone())
+                p2pUnmarshalFut = new GridFutureAdapter<>();
+        } else
             rmtTransFactory = (Factory<? extends IgniteClosure<CacheEntryEvent<? extends K, ? extends V>, ?>>)in.readObject();
     }
 }
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryListener.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryListener.java
index a652c51..e534fdd 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryListener.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryListener.java
@@ -30,11 +30,6 @@ import org.jetbrains.annotations.Nullable;
  */
 public interface CacheContinuousQueryListener<K, V> {
     /**
-     * Query execution callback.
-     */
-    public void onExecution();
-
-    /**
      * Entry update callback.
      *
      * @param evt Event
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
index 0ae980a..d2fecbf 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
@@ -73,7 +73,6 @@ import org.apache.ignite.internal.util.typedef.CI2;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.U;
-import org.apache.ignite.lang.IgniteAsyncCallback;
 import org.apache.ignite.lang.IgniteClosure;
 import org.apache.ignite.lang.IgniteOutClosure;
 import org.apache.ignite.lang.IgnitePredicate;
@@ -840,7 +839,7 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter {
                                     cctx.kernalContext().cache().jcache(cctx.name()),
                                     cctx, entry);
 
-                                if (hnd.getEventFilter() != null && !hnd.getEventFilter().evaluate(next))
+                                if (!hnd.filter(next))
                                     next = null;
                             }
                         }
@@ -955,9 +954,6 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter {
             finally {
                 cctx.group().listenerLock().writeLock().unlock();
             }
-
-            if (added)
-                lsnr.onExecution();
         }
 
         return added ? GridContinuousHandler.RegisterStatus.REGISTERED
@@ -1212,13 +1208,6 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter {
             return evts;
         }
 
-        /**
-         * @return {@code True} if listener should be executed in non-system thread.
-         */
-        protected boolean async() {
-            return U.hasAnnotation(impl, IgniteAsyncCallback.class);
-        }
-
         /** {@inheritDoc} */
         @Override public void close() throws IOException {
             if (impl instanceof Closeable)
@@ -1286,13 +1275,6 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter {
         }
 
         /**
-         * @return {@code True} if filter should be executed in non-system thread.
-         */
-        protected boolean async() {
-            return U.hasAnnotation(impl, IgniteAsyncCallback.class);
-        }
-
-        /**
          * @param evtType Type.
          * @return Flag value.
          */
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
index bbd2290..f6bd571 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
@@ -44,8 +44,9 @@ import org.apache.ignite.cache.CacheEntryEventSerializableFilter;
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.events.DiscoveryEvent;
 import org.apache.ignite.events.Event;
+import org.apache.ignite.failure.FailureContext;
+import org.apache.ignite.failure.FailureType;
 import org.apache.ignite.internal.GridKernalContext;
-import org.apache.ignite.internal.GridMessageListenHandler;
 import org.apache.ignite.internal.IgniteClientDisconnectedCheckedException;
 import org.apache.ignite.internal.IgniteDeploymentCheckedException;
 import org.apache.ignite.internal.IgniteFutureTimeoutCheckedException;
@@ -444,12 +445,16 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
                 UUID routineId = e.getKey();
                 LocalRoutineInfo info = e.getValue();
 
+                assert !ctx.config().isPeerClassLoadingEnabled() ||
+                    !(info.hnd instanceof CacheContinuousQueryHandler) ||
+                    ((CacheContinuousQueryHandler)info.hnd).isMarshalled();
+
                 data.addItem(new DiscoveryDataItem(routineId,
-                        info.prjPred,
-                        info.hnd,
-                        info.bufSize,
-                        info.interval,
-                        info.autoUnsubscribe));
+                    info.prjPred,
+                    info.hnd,
+                    info.bufSize,
+                    info.interval,
+                    info.autoUnsubscribe));
             }
 
             return data;
@@ -505,13 +510,13 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
                 for (ContinuousRoutineInfo routineInfo : nodeData.startedRoutines) {
                     routinesInfo.addRoutineInfo(routineInfo);
 
-                    startDiscoveryDataRoutine(routineInfo);
+                    onDiscoveryDataReceivedV2(routineInfo);
                 }
             }
         }
         else {
             if (data.hasJoiningNodeData())
-                onDiscoDataReceived((DiscoveryData) data.joiningNodeData());
+                onDiscoveryDataReceivedV1((DiscoveryData) data.joiningNodeData());
         }
     }
 
@@ -531,7 +536,7 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
 
                     routinesInfo.addRoutineInfo(routineInfo);
 
-                    startDiscoveryDataRoutine(routineInfo);
+                    onDiscoveryDataReceivedV2(routineInfo);
                 }
             }
         }
@@ -540,15 +545,62 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
 
             if (nodeSpecData != null) {
                 for (Map.Entry<UUID, Serializable> e : nodeSpecData.entrySet())
-                    onDiscoDataReceived((DiscoveryData) e.getValue());
+                    onDiscoveryDataReceivedV1((DiscoveryData) e.getValue());
+            }
+        }
+    }
+
+    /**
+     * Processes data received in a discovery message.
+     * Used with protocol version 1.
+     *
+     * @param data received discovery data.
+     */
+    private void onDiscoveryDataReceivedV1(DiscoveryData data) {
+        if (!ctx.isDaemon() && data != null) {
+            for (DiscoveryDataItem item : data.items) {
+                if (!locInfos.containsKey(item.routineId)) {
+                    registerHandlerOnJoin(data.nodeId, item.routineId, item.prjPred,
+                        item.hnd, item.bufSize, item.interval, item.autoUnsubscribe);
+                }
+
+                if (!item.autoUnsubscribe) {
+                    locInfos.putIfAbsent(item.routineId, new LocalRoutineInfo(
+                        item.prjPred, item.hnd, item.bufSize, item.interval, item.autoUnsubscribe));
+                }
+            }
+
+            // Process CQs started on clients.
+            for (Map.Entry<UUID, Map<UUID, LocalRoutineInfo>> entry : data.clientInfos.entrySet()) {
+                UUID clientNodeId = entry.getKey();
+
+                if (!ctx.localNodeId().equals(clientNodeId)) {
+                    Map<UUID, LocalRoutineInfo> clientRoutineMap = entry.getValue();
+
+                    for (Map.Entry<UUID, LocalRoutineInfo> e : clientRoutineMap.entrySet()) {
+                        UUID routineId = e.getKey();
+                        LocalRoutineInfo info = e.getValue();
+
+                        registerHandlerOnJoin(clientNodeId, routineId, info.prjPred,
+                            info.hnd, info.bufSize, info.interval, info.autoUnsubscribe);
+                    }
+                }
+
+                Map<UUID, LocalRoutineInfo> map =
+                    clientInfos.computeIfAbsent(clientNodeId, k -> new HashMap<>());
+
+                map.putAll(entry.getValue());
             }
         }
     }
 
     /**
+     * Processes data received in a discovery message.
+     * Used with protocol version 2.
+     *
      * @param routineInfo Routine info.
      */
-    private void startDiscoveryDataRoutine(ContinuousRoutineInfo routineInfo) {
+    private void onDiscoveryDataReceivedV2(ContinuousRoutineInfo routineInfo) {
         IgnitePredicate<ClusterNode> nodeFilter;
 
         try {
@@ -561,122 +613,94 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
                 nodeFilter = null;
         }
         catch (IgniteCheckedException e) {
-            U.error(log, "Failed to unmarshal continuous routine filter, ignore routine [" +
+            U.error(log, "Failed to unmarshal continuous routine filter [" +
                 "routineId=" + routineInfo.routineId +
                 ", srcNodeId=" + routineInfo.srcNodeId + ']', e);
 
+            ctx.failure().process(new FailureContext(FailureType.CRITICAL_ERROR, e));
+
             return;
         }
 
-        ctx.discovery().localJoinFuture().listen(f -> ctx.closure().runLocalSafe(() -> {
-            if (nodeFilter == null || nodeFilter.apply(ctx.discovery().localNode())) {
-                GridContinuousHandler hnd;
+        GridContinuousHandler hnd;
 
-                try {
-                    hnd = U.unmarshal(marsh, routineInfo.hnd, U.resolveClassLoader(ctx.config()));
+        try {
+            hnd = U.unmarshal(marsh, routineInfo.hnd, U.resolveClassLoader(ctx.config()));
+        }
+        catch (IgniteCheckedException e) {
+            U.error(log, "Failed to unmarshal continuous routine handler [" +
+                "routineId=" + routineInfo.routineId +
+                ", srcNodeId=" + routineInfo.srcNodeId + ']', e);
 
-                    if (ctx.config().isPeerClassLoadingEnabled())
-                        hnd.p2pUnmarshal(routineInfo.srcNodeId, ctx);
-                }
-                catch (IgniteCheckedException e) {
-                    U.error(log, "Failed to unmarshal continuous routine handler, ignore routine [" +
-                        "routineId=" + routineInfo.routineId +
-                        ", srcNodeId=" + routineInfo.srcNodeId + ']', e);
+            ctx.failure().process(new FailureContext(FailureType.CRITICAL_ERROR, e));
 
-                    return;
-                }
+            return;
+        }
 
-                try {
-                    registerHandler(routineInfo.srcNodeId,
-                        routineInfo.routineId,
+        registerHandlerOnJoin(routineInfo.srcNodeId, routineInfo.routineId, nodeFilter,
+            hnd, routineInfo.bufSize, routineInfo.interval, routineInfo.autoUnsubscribe);
+    }
+
+    /**
+     * Register a continuous query handler on local node join.
+     *
+     * @param srcNodeId Id of the subscriber node.
+     * @param routineId Routine id.
+     * @param nodeFilter Node filter.
+     * @param hnd Continuous query handler.
+     * @param bufSize Buffer size.
+     * @param interval Time interval for buffer checker.
+     * @param autoUnsubscribe Automatic unsubscribe flag.
+     */
+    private void registerHandlerOnJoin(UUID srcNodeId, UUID routineId, IgnitePredicate<ClusterNode> nodeFilter,
+        GridContinuousHandler hnd, int bufSize, long interval, boolean autoUnsubscribe) {
+
+        try {
+            if (nodeFilter != null)
+                ctx.resource().injectGeneric(nodeFilter);
+
+            if (nodeFilter == null || nodeFilter.apply(ctx.discovery().localNode())) {
+                    registerHandler(srcNodeId,
+                        routineId,
                         hnd,
-                        routineInfo.bufSize,
-                        routineInfo.interval,
-                        routineInfo.autoUnsubscribe,
+                        bufSize,
+                        interval,
+                        autoUnsubscribe,
                         false);
-                }
-                catch (IgniteCheckedException e) {
-                    U.error(log, "Failed to register continuous routine handler, ignore routine [" +
-                        "routineId=" + routineInfo.routineId +
-                        ", srcNodeId=" + routineInfo.srcNodeId + ']', e);
-                }
             }
             else {
                 if (log.isDebugEnabled()) {
                     log.debug("Do not register continuous routine, rejected by node filter [" +
-                        "routineId=" + routineInfo.routineId +
-                        ", srcNodeId=" + routineInfo.srcNodeId + ']');
-                }
-            }
-        }));
-    }
-
-    /**
-     * @param data received discovery data.
-     */
-    private void onDiscoDataReceived(DiscoveryData data) {
-        if (!ctx.isDaemon() && data != null) {
-            for (DiscoveryDataItem item : data.items) {
-                try {
-                    if (item.prjPred != null)
-                        ctx.resource().injectGeneric(item.prjPred);
-
-                    // Register handler only if local node passes projection predicate.
-                    if ((item.prjPred == null || item.prjPred.apply(ctx.discovery().localNode())) &&
-                            !locInfos.containsKey(item.routineId))
-                        registerHandler(data.nodeId, item.routineId, item.hnd, item.bufSize, item.interval,
-                                item.autoUnsubscribe, false);
-
-                    if (!item.autoUnsubscribe)
-                        // Register routine locally.
-                        locInfos.putIfAbsent(item.routineId, new LocalRoutineInfo(
-                                item.prjPred, item.hnd, item.bufSize, item.interval, item.autoUnsubscribe));
-                }
-                catch (IgniteCheckedException e) {
-                    U.error(log, "Failed to register continuous handler.", e);
+                        "routineId=" + routineId +
+                        ", srcNodeId=" + srcNodeId + ']');
                 }
             }
+        }
+        catch (IgniteCheckedException e) {
+            U.error(log, "Failed to register continuous routine handler [" +
+                "routineId=" + routineId +
+                ", srcNodeId=" + srcNodeId + ']', e);
 
-            for (Map.Entry<UUID, Map<UUID, LocalRoutineInfo>> entry : data.clientInfos.entrySet()) {
-                UUID clientNodeId = entry.getKey();
-
-                if (!ctx.clientNode()) {
-                    Map<UUID, LocalRoutineInfo> clientRoutineMap = entry.getValue();
-
-                    for (Map.Entry<UUID, LocalRoutineInfo> e : clientRoutineMap.entrySet()) {
-                        UUID routineId = e.getKey();
-                        LocalRoutineInfo info = e.getValue();
-
-                        try {
-                            if (info.prjPred != null)
-                                ctx.resource().injectGeneric(info.prjPred);
+            ctx.failure().process(new FailureContext(FailureType.CRITICAL_ERROR, e));
+        }
 
-                            if (info.prjPred == null || info.prjPred.apply(ctx.discovery().localNode())) {
-                                registerHandler(clientNodeId,
-                                        routineId,
-                                        info.hnd,
-                                        info.bufSize,
-                                        info.interval,
-                                        info.autoUnsubscribe,
-                                        false);
-                            }
-                        }
-                        catch (IgniteCheckedException err) {
-                            U.error(log, "Failed to register continuous handler.", err);
-                        }
-                    }
+        if (ctx.config().isPeerClassLoadingEnabled()) {
+            // Peer class loading cannot be performed before a node joins, so we delay the deployment.
+            // Run the deployment task in the system pool to avoid blocking of the discovery thread.
+            ctx.discovery().localJoinFuture().listen(f -> ctx.closure().runLocalSafe(() -> {
+                try {
+                    hnd.p2pUnmarshal(srcNodeId, ctx);
                 }
+                catch (IgniteCheckedException | IgniteException e) {
+                    U.error(log, "Failed to unmarshal continuous routine handler [" +
+                        "routineId=" + routineId +
+                        ", srcNodeId=" + srcNodeId + ']', e);
 
-                Map<UUID, LocalRoutineInfo> map = clientInfos.get(entry.getKey());
+                    ctx.failure().process(new FailureContext(FailureType.CRITICAL_ERROR, e));
 
-                if (map == null) {
-                    map = new HashMap<>();
-
-                    clientInfos.put(entry.getKey(), map);
+                    unregisterHandler(routineId, hnd, false);
                 }
-
-                map.putAll(entry.getValue());
-            }
+            }));
         }
     }
 
@@ -721,6 +745,8 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
      * Registers routine info to be sent in discovery data during this node join
      * (to be used for internal queries started from client nodes).
      *
+     * Peer class loading is not applied to static routines.
+     *
      * @param cacheName Cache name.
      * @param locLsnr Local listener.
      * @param rmtFilter Remote filter.
@@ -818,7 +844,7 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
         int bufSize,
         long interval,
         boolean autoUnsubscribe,
-        @Nullable IgnitePredicate<ClusterNode> prjPred) {
+        @Nullable IgnitePredicate<ClusterNode> prjPred) throws IgniteCheckedException {
         assert hnd != null;
         assert bufSize > 0;
         assert interval >= 0;
@@ -826,6 +852,12 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
         // Generate ID.
         final UUID routineId = UUID.randomUUID();
 
+        if (ctx.config().isPeerClassLoadingEnabled()) {
+            hnd.p2pMarshal(ctx);
+
+            assert !(hnd instanceof CacheContinuousQueryHandler) || ((CacheContinuousQueryHandler)hnd).isMarshalled();
+        }
+
         // Register routine locally.
         locInfos.put(routineId, new LocalRoutineInfo(prjPred, hnd, bufSize, interval, autoUnsubscribe));
 
@@ -1369,10 +1401,6 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
             U.error(log, "Failed to register handler [nodeId=" + node.id() + ", routineId=" + routineId + ']', e);
         }
 
-        GridContinuousHandler hnd0 = hnd instanceof GridMessageListenHandler ?
-            new GridMessageListenHandler((GridMessageListenHandler)hnd) :
-            hnd;
-
         if (node.isClient()) {
             Map<UUID, LocalRoutineInfo> clientRoutineMap = clientInfos.get(node.id());
 
@@ -1385,7 +1413,7 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
             }
 
             clientRoutineMap.put(routineId, new LocalRoutineInfo(data.projectionPredicate(),
-                hnd0,
+                hnd,
                 data.bufferSize(),
                 data.interval(),
                 data.autoUnsubscribe()));
@@ -1400,13 +1428,13 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
 
                 if ((prjPred == null || prjPred.apply(ctx.discovery().node(ctx.localNodeId()))) &&
                     !locInfos.containsKey(routineId))
-                    registerHandler(node.id(), routineId, hnd0, data.bufferSize(), data.interval(),
+                    registerHandler(node.id(), routineId, hnd, data.bufferSize(), data.interval(),
                         data.autoUnsubscribe(), false);
 
                 if (!data.autoUnsubscribe())
                     // Register routine locally.
                     locInfos.putIfAbsent(routineId, new LocalRoutineInfo(
-                        prjPred, hnd0, data.bufferSize(), data.interval(), data.autoUnsubscribe()));
+                        prjPred, hnd, data.bufferSize(), data.interval(), data.autoUnsubscribe()));
             }
             catch (IgniteCheckedException e) {
                 err = e;
@@ -1416,14 +1444,14 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
         }
 
         // Load partition counters.
-        if (hnd0.isQuery()) {
+        if (hnd.isQuery()) {
             GridCacheProcessor proc = ctx.cache();
 
             if (proc != null) {
-                GridCacheAdapter cache = ctx.cache().internalCache(hnd0.cacheName());
+                GridCacheAdapter cache = ctx.cache().internalCache(hnd.cacheName());
 
                 if (cache != null && !cache.isLocal() && cache.context().userCache())
-                    req.addUpdateCounters(ctx.localNodeId(), hnd0.updateCounters());
+                    req.addUpdateCounters(ctx.localNodeId(), hnd.updateCounters());
             }
         }
 
@@ -1544,23 +1572,19 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
                             ((CacheContinuousQueryHandler)hnd).keepBinary(true);
                         }
 
-                        GridContinuousHandler hnd0 = hnd instanceof GridMessageListenHandler ?
-                            new GridMessageListenHandler((GridMessageListenHandler)hnd) :
-                            hnd;
-
                         registerHandler(snd.id(),
                             msg.routineId,
-                            hnd0,
+                            hnd,
                             reqData.bufferSize(),
                             reqData.interval(),
                             reqData.autoUnsubscribe(),
                             false);
 
-                        if (hnd0.isQuery()) {
+                        if (hnd.isQuery()) {
                             GridCacheProcessor proc = ctx.cache();
 
                             if (proc != null) {
-                                GridCacheAdapter cache = ctx.cache().internalCache(hnd0.cacheName());
+                                GridCacheAdapter cache = ctx.cache().internalCache(hnd.cacheName());
 
                                 if (cache != null && !cache.isLocal() && cache.context().userCache()) {
                                     CachePartitionPartialCountersMap cntrsMap =
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryLongP2PTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryLongP2PTest.java
new file mode 100644
index 0000000..b4ce91f
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryLongP2PTest.java
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.query.continuous;
+
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+import javax.cache.configuration.Factory;
+import javax.cache.event.CacheEntryEventFilter;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.cache.query.ContinuousQuery;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.managers.communication.GridIoMessage;
+import org.apache.ignite.internal.managers.deployment.GridDeploymentResponse;
+import org.apache.ignite.lang.IgniteInClosure;
+import org.apache.ignite.plugin.extensions.communication.Message;
+import org.apache.ignite.resources.LoggerResource;
+import org.apache.ignite.spi.IgniteSpiException;
+import org.apache.ignite.spi.communication.CommunicationSpi;
+import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.junit.Test;
+
+import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC;
+import static org.apache.ignite.cache.CacheMode.PARTITIONED;
+
+/**
+ *
+ */
+public class CacheContinuousQueryLongP2PTest extends CacheContinuousQueryOperationP2PTest {
+    /** */
+    private static volatile int delay;
+
+    /** {@inheritDoc} */
+    @Override protected CommunicationSpi communicationSpi() {
+        return new P2PDelayingCommunicationSpi();
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTest() throws Exception {
+        delay = 300;
+
+        super.beforeTest();
+    }
+
+    /**
+     * Checks that a node start is not blocked by peer class loading of the continuous query remote filter.
+     *
+     * @throws Exception If failed.
+     */
+    @Test(timeout = 60_000)
+    public void testLongP2PClassLoadingDoesntBlockNodeStart() throws Exception {
+        delay = 3_000;
+
+        CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED, 1, ATOMIC);
+        IgniteCache cache = grid(NODES - 1).getOrCreateCache(ccfg.getName());
+
+        ContinuousQuery<Integer, Integer> qry = continuousQuery();
+
+        cache.query(qry);
+
+        AtomicReference<String> err = new AtomicReference<>();
+
+        IgniteInternalFuture<?> startFut = GridTestUtils.runAsync(() -> {
+            try {
+                startGrid(NODES);
+            }
+            catch (Exception e) {
+                err.set(e.getMessage());
+
+                e.printStackTrace();
+            }
+        });
+
+        startFut.get(1, TimeUnit.SECONDS);
+
+        assertNull("Error occurred when starting a node: " + err.get(), err.get());
+    }
+
+    /**
+     * @return Continuous query with remote filter from an external class loader.
+     * @throws Exception If failed.
+     */
+    private ContinuousQuery<Integer, Integer> continuousQuery() throws Exception {
+        final Class<Factory<CacheEntryEventFilter>> evtFilterFactoryCls =
+            (Class<Factory<CacheEntryEventFilter>>)getExternalClassLoader().
+                loadClass("org.apache.ignite.tests.p2p.CacheDeploymentEntryEventFilterFactory");
+
+        ContinuousQuery<Integer, Integer> qry = new ContinuousQuery<>();
+
+        qry.setLocalListener((evt) -> {});
+
+        qry.setRemoteFilterFactory(
+            (Factory<? extends CacheEntryEventFilter<Integer, Integer>>)(Object)evtFilterFactoryCls.newInstance());
+
+        return qry;
+    }
+
+    /**
+     * TcpCommunicationSpi
+     */
+    private static class P2PDelayingCommunicationSpi extends TcpCommunicationSpi {
+        /** */
+        @LoggerResource
+        private IgniteLogger log;
+
+        /** {@inheritDoc} */
+        @Override public void sendMessage(ClusterNode node, Message msg, IgniteInClosure<IgniteException> ackC)
+            throws IgniteSpiException {
+            if (isDeploymentResponse((GridIoMessage) msg)) {
+                log.info(">>> Delaying deployment message: " + msg);
+
+                try {
+                    Thread.sleep(delay);
+                }
+                catch (InterruptedException e) {
+                    e.printStackTrace();
+                }
+            }
+
+            super.sendMessage(node, msg, ackC);
+        }
+
+        /**
+         * Checks if it is a p2p deployment response.
+         *
+         * @param msg Message to check.
+         * @return {@code True} if this is a p2p response.
+         */
+        private boolean isDeploymentResponse(GridIoMessage msg) {
+            Object origMsg = msg.message();
+
+            return origMsg instanceof GridDeploymentResponse;
+        }
+    }
+}
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryOperationP2PTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryOperationP2PTest.java
index 39debb0..9a53315 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryOperationP2PTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryOperationP2PTest.java
@@ -20,6 +20,7 @@ package org.apache.ignite.internal.processors.cache.query.continuous;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
 import javax.cache.configuration.Factory;
 import javax.cache.configuration.FactoryBuilder;
 import javax.cache.configuration.MutableCacheEntryListenerConfiguration;
@@ -28,6 +29,7 @@ import javax.cache.event.CacheEntryEvent;
 import javax.cache.event.CacheEntryEventFilter;
 import javax.cache.event.CacheEntryListenerException;
 import javax.cache.event.CacheEntryUpdatedListener;
+import org.apache.ignite.Ignite;
 import org.apache.ignite.IgniteCache;
 import org.apache.ignite.cache.CacheAtomicityMode;
 import org.apache.ignite.cache.CacheMode;
@@ -35,6 +37,9 @@ import org.apache.ignite.cache.query.ContinuousQuery;
 import org.apache.ignite.cache.query.QueryCursor;
 import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.spi.communication.CommunicationSpi;
+import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
+import org.apache.ignite.testframework.GridTestUtils;
 import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
 import org.junit.Test;
 
@@ -50,7 +55,10 @@ import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
  */
 public class CacheContinuousQueryOperationP2PTest extends GridCommonAbstractTest {
     /** */
-    private static final int NODES = 5;
+    public static final int NODES = 5;
+
+    /** */
+    private static final int UPDATES = 100;
 
     /** */
     private boolean client;
@@ -59,27 +67,34 @@ public class CacheContinuousQueryOperationP2PTest extends GridCommonAbstractTest
     @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
         IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
 
+        cfg.setCommunicationSpi(communicationSpi());
+
         cfg.setClientMode(client);
         cfg.setPeerClassLoadingEnabled(true);
 
         return cfg;
     }
 
+    /**
+     * @return Communication SPI to use during a test.
+     */
+    protected CommunicationSpi communicationSpi() {
+        return new TcpCommunicationSpi();
+    }
+
     /** {@inheritDoc} */
     @Override protected void beforeTest() throws Exception {
-        super.beforeTest();
-
         startGridsMultiThreaded(NODES - 1);
 
         client = true;
 
         startGrid(NODES - 1);
+
+        client = false;
     }
 
     /** {@inheritDoc} */
     @Override protected void afterTest() throws Exception {
-        super.afterTest();
-
         stopAllGrids();
     }
 
@@ -240,85 +255,158 @@ public class CacheContinuousQueryOperationP2PTest extends GridCommonAbstractTest
     }
 
     /**
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testMultithreadedUpdatesNodeJoin() throws Exception {
+        Ignite client = startGrid("client");
+
+        CacheConfiguration<Object, Object> cacheCfg = cacheConfiguration(PARTITIONED,
+            0,
+            ATOMIC
+        );
+        IgniteCache<Object, Object> cache = client.createCache(cacheCfg);
+
+        int iterations = 50;
+        int keysNum = 100;
+        int threadsNum = Runtime.getRuntime().availableProcessors();
+
+        CountDownLatch updatesLatch = new CountDownLatch(iterations * keysNum * threadsNum / 2);
+
+        ContinuousQuery<Object, Object> qry = new ContinuousQuery<>();
+
+        final Class<Factory<CacheEntryEventFilter>> evtFilterFactoryCls =
+            (Class<Factory<CacheEntryEventFilter>>)getExternalClassLoader().
+                loadClass("org.apache.ignite.tests.p2p.CacheDeploymentEntryEventFilterFactory");
+        qry.setRemoteFilterFactory(
+            (Factory<? extends CacheEntryEventFilter<Object, Object>>)(Object)evtFilterFactoryCls.newInstance());
+
+        qry.setLocalListener((evts) -> {
+            for (CacheEntryEvent<?, ?> ignored : evts)
+                updatesLatch.countDown();
+        });
+
+        cache.query(qry);
+
+        for (int t = 0; t < threadsNum; t++) {
+            int threadId = t;
+
+            GridTestUtils.runAsync(() -> {
+                for (int i = 0; i < iterations; i++) {
+                    log.info("Iteration #" + (i + 1));
+
+                    for (int k = 0; k < keysNum; k++) {
+                        int key = keysNum * threadId + k;
+
+                        cache.put(key, key);
+                    }
+                }
+            }, "cache-writer-thread-" + threadId);
+        }
+
+        startGrid(NODES);
+
+        assertTrue("Failed to wait for all cache updates invocations. Latch: " + updatesLatch,
+            updatesLatch.await(30, TimeUnit.SECONDS));
+    }
+
+    /**
      * @param ccfg Cache configuration.
      * @param isClient Client.
      * @throws Exception If failed.
      */
     protected void testContinuousQuery(CacheConfiguration<Object, Object> ccfg, boolean isClient)
         throws Exception {
-        ignite(0).createCache(ccfg);
-
-        ThreadLocalRandom rnd = ThreadLocalRandom.current();
 
-        QueryCursor<?> cur = null;
+        ignite(0).createCache(ccfg);
 
-        final Class<Factory<CacheEntryEventFilter>> evtFilterFactory =
+        final Class<Factory<CacheEntryEventFilter>> evtFilterFactoryCls =
             (Class<Factory<CacheEntryEventFilter>>)getExternalClassLoader().
                 loadClass("org.apache.ignite.tests.p2p.CacheDeploymentEntryEventFilterFactory");
 
-        final CountDownLatch latch = new CountDownLatch(10);
+        testContinuousQuery(ccfg, isClient, false, evtFilterFactoryCls);
+        testContinuousQuery(ccfg, isClient, true, evtFilterFactoryCls);
+    }
+
+    /**
+     * @param ccfg Cache configuration.
+     * @param isClient Client.
+     * @param joinNode If a node should be added to topology after a query is started.
+     * @param evtFilterFactoryCls Remote filter factory class.
+     * @throws Exception If failed.
+     */
+    private void testContinuousQuery(CacheConfiguration<Object, Object> ccfg,
+        boolean isClient, boolean joinNode,
+        Class<Factory<CacheEntryEventFilter>> evtFilterFactoryCls) throws Exception {
+
+        ThreadLocalRandom rnd = ThreadLocalRandom.current();
+
+        final CountDownLatch latch = new CountDownLatch(UPDATES);
 
         ContinuousQuery<Integer, Integer> qry = new ContinuousQuery<>();
 
-        TestLocalListener localLsnr = new TestLocalListener() {
-            @Override public void onEvent(Iterable<CacheEntryEvent<? extends Integer, ? extends Integer>> evts)
-                throws CacheEntryListenerException {
+        AtomicReference<String> err = new AtomicReference<>();
+
+        TestLocalListener locLsnr = new TestLocalListener() {
+            @Override protected void onEvent(Iterable<CacheEntryEvent<? extends Integer, ? extends Integer>> evts) {
                 for (CacheEntryEvent<? extends Integer, ? extends Integer> evt : evts) {
                     latch.countDown();
 
                     log.info("Received event: " + evt);
+
+                    int key = evt.getKey();
+
+                    if (key % 2 == 0)
+                        err.set("Event received on entry, that doesn't pass a filter: " + key);
                 }
             }
         };
 
+        qry.setLocalListener(locLsnr);
+
+        qry.setRemoteFilterFactory(
+            (Factory<? extends CacheEntryEventFilter<Integer, Integer>>)(Object)evtFilterFactoryCls.newInstance());
+
         MutableCacheEntryListenerConfiguration<Integer, Integer> lsnrCfg =
             new MutableCacheEntryListenerConfiguration<>(
-                new FactoryBuilder.SingletonFactory<>(localLsnr),
+                new FactoryBuilder.SingletonFactory<>(locLsnr),
                 (Factory<? extends CacheEntryEventFilter<? super Integer, ? super Integer>>)
-                    (Object)evtFilterFactory.newInstance(),
+                    (Object)evtFilterFactoryCls.newInstance(),
                 true,
                 true
             );
 
-        qry.setLocalListener(localLsnr);
+        IgniteCache<Integer, Integer> cache;
 
-        qry.setRemoteFilterFactory(
-            (Factory<? extends CacheEntryEventFilter<Integer, Integer>>)(Object)evtFilterFactory.newInstance());
-
-        IgniteCache<Integer, Integer> cache = null;
-
-        try {
-            if (isClient)
-                cache = grid(NODES - 1).cache(ccfg.getName());
-            else
-                cache = grid(rnd.nextInt(NODES - 1)).cache(ccfg.getName());
-
-            cur = cache.query(qry);
+        cache = isClient
+            ? grid(NODES - 1).cache(ccfg.getName())
+            : grid(rnd.nextInt(NODES - 1)).cache(ccfg.getName());
 
+        try (QueryCursor<?> cur = cache.query(qry)) {
             cache.registerCacheEntryListener(lsnrCfg);
 
-            for (int i = 0; i < 10; i++)
+            if (joinNode) {
+                startGrid(NODES);
+                awaitPartitionMapExchange();
+            }
+
+            for (int i = 0; i < UPDATES; i++)
                 cache.put(i, i);
 
-            assertTrue(latch.await(3, TimeUnit.SECONDS));
-        }
-        finally {
-            if (cur != null)
-                cur.close();
+            assertTrue("Failed to wait for local listener invocations: " + latch.getCount(),
+                latch.await(3, TimeUnit.SECONDS));
 
-            if (cache != null)
-                cache.deregisterCacheEntryListener(lsnrCfg);
+            assertNull(err.get(), err.get());
         }
     }
 
     /**
-     *
      * @param cacheMode Cache mode.
      * @param backups Number of backups.
      * @param atomicityMode Cache atomicity mode.
      * @return Cache configuration.
      */
-    private CacheConfiguration<Object, Object> cacheConfiguration(
+    protected CacheConfiguration<Object, Object> cacheConfiguration(
         CacheMode cacheMode,
         int backups,
         CacheAtomicityMode atomicityMode) {
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/ContinuousQueryDeserializationErrorOnNodeJoinTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/ContinuousQueryDeserializationErrorOnNodeJoinTest.java
new file mode 100644
index 0000000..1ef027f
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/ContinuousQueryDeserializationErrorOnNodeJoinTest.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.query.continuous;
+
+import java.util.concurrent.ThreadLocalRandom;
+import javax.cache.configuration.Factory;
+import javax.cache.event.CacheEntryEventFilter;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.cache.query.ContinuousQuery;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.junit.Test;
+
+/**
+ * Checks, that errors during CQ remote filter deserialization doesn't prevent a node from joining a cluster.
+ */
+public class ContinuousQueryDeserializationErrorOnNodeJoinTest extends GridCommonAbstractTest {
+    /** */
+    private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true);
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
+        IgniteConfiguration igniteCfg = super.getConfiguration(igniteInstanceName);
+        ((TcpDiscoverySpi)igniteCfg.getDiscoverySpi()).setIpFinder(IP_FINDER);
+
+        igniteCfg.setPeerClassLoadingEnabled(false);
+
+        return igniteCfg;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTest() throws Exception {
+        stopAllGrids();
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testDeserializationErrorOnJoiningNode() throws Exception {
+        String cacheName = "cache";
+        int recordsNum = 1000;
+
+        Ignite node1 = startGrid(1);
+
+        IgniteCache<Integer, Integer> cacheNode1 = node1.getOrCreateCache(cacheName);
+
+        ContinuousQuery<Integer, String> qry = new ContinuousQuery<>();
+        qry.setLocalListener((evt) -> log.info("Event received: " + evt));
+        qry.setRemoteFilterFactory(remoteFilterFactory());
+
+        cacheNode1.query(qry);
+
+        // Deserialization error will happen, when the new node tries to deserialize the discovery data.
+        Ignite node2 = startGrid(2);
+
+        awaitPartitionMapExchange();
+
+        // Check, that node and cache are functional.
+        IgniteCache<Integer, Integer> cacheNode2 = node2.cache(cacheName);
+
+        ThreadLocalRandom rnd = ThreadLocalRandom.current();
+
+        for (int i = 0; i < recordsNum; i++) {
+            IgniteCache<Integer, Integer> cache =
+                rnd.nextBoolean() ? cacheNode1 : cacheNode2;
+
+            cache.put(i, i);
+        }
+
+        for (int i = 0; i < recordsNum; i++) {
+            assertEquals(new Integer(i), cacheNode1.get(i));
+            assertEquals(new Integer(i), cacheNode2.get(i));
+        }
+    }
+
+    /**
+     * @param <K> Key type.
+     * @param <V> Value type.
+     * @return Remote filter.
+     * @throws ReflectiveOperationException In case of instantiation failure.
+     */
+    private <K, V> Factory<? extends CacheEntryEventFilter<K, V>> remoteFilterFactory()
+        throws ReflectiveOperationException {
+        final Class<Factory<CacheEntryEventFilter>> evtFilterFactoryCls =
+            (Class<Factory<CacheEntryEventFilter>>)getExternalClassLoader().
+                loadClass("org.apache.ignite.tests.p2p.CacheDeploymentEntryEventFilterFactory");
+
+        return (Factory<? extends CacheEntryEventFilter<K, V>>)(Object)evtFilterFactoryCls.newInstance();
+    }
+}
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/ContinuousQueryPeerClassLoadingTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/ContinuousQueryPeerClassLoadingTest.java
index 945e04e..8fd8a23 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/ContinuousQueryPeerClassLoadingTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/ContinuousQueryPeerClassLoadingTest.java
@@ -19,7 +19,6 @@ package org.apache.ignite.internal.processors.cache.query.continuous;
 
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
 import javax.cache.event.CacheEntryEvent;
 import javax.cache.event.CacheEntryListenerException;
 import javax.cache.event.CacheEntryUpdatedListener;
@@ -78,6 +77,13 @@ public class ContinuousQueryPeerClassLoadingTest extends GridCommonAbstractTest
     }
 
     /**
+     * @throws Exception If failed.
+     */
+    public void testRemoteFilterFactoryFromClientToServer() throws Exception {
+        check("server1", "client", "server2");
+    }
+
+    /**
      * @param node1Name Node 1 name.
      * @param node2Name Node 2 name.
      * @param node3Name Node 3 name.
@@ -98,9 +104,6 @@ public class ContinuousQueryPeerClassLoadingTest extends GridCommonAbstractTest
         qry1.setRemoteFilterFactory(new DummyEventFilterFactory<>());
         qry2.setRemoteFilterFactory(new DummyEventFilterFactory<>());
 
-        final AtomicInteger client1Evts = new AtomicInteger(0);
-        final AtomicInteger client2Evts = new AtomicInteger(0);
-
         final CountDownLatch latch1 = new CountDownLatch(20);
         final CountDownLatch latch2 = new CountDownLatch(10);
 
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryNodesFilteringTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryNodesFilteringTest.java
index 2907b90..7688a7b 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryNodesFilteringTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryNodesFilteringTest.java
@@ -19,6 +19,8 @@ package org.apache.ignite.internal.processors.cache.query.continuous;
 
 import java.io.Serializable;
 import java.util.Collections;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
 import javax.cache.configuration.Factory;
 import javax.cache.event.CacheEntryEvent;
 import javax.cache.event.CacheEntryEventFilter;
@@ -30,6 +32,8 @@ import org.apache.ignite.cache.query.ContinuousQuery;
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.failure.FailureHandler;
+import org.apache.ignite.failure.TestFailureHandler;
 import org.apache.ignite.lang.IgnitePredicate;
 import org.apache.ignite.testframework.GridStringLogger;
 import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
@@ -63,14 +67,16 @@ public class GridCacheContinuousQueryNodesFilteringTest extends GridCommonAbstra
     @Test
     public void testNodeWithAttributeFailure() throws Exception {
         try (Ignite node1 = startNodeWithCache()) {
-            GridStringLogger log = new GridStringLogger();
+            CountDownLatch latch = new CountDownLatch(1);
 
-            try (Ignite node2 = startGrid("node2", getConfiguration("node2", true, log))) {
-                fail();
-            }
-            catch (IgniteException ignored) {
-                assertTrue(log.toString().contains("Class not found for continuous query remote filter " +
-                    "[name=org.apache.ignite.tests.p2p.CacheDeploymentEntryEventFilter]"));
+            FailureHandler failHnd = new TestFailureHandler(false, latch);
+
+            IgniteConfiguration node2Cfg = getConfiguration("node2", true, null)
+                .setFailureHandler(failHnd);
+
+            try (Ignite node2 = startGrid(node2Cfg)) {
+                assertTrue("Failure handler hasn't been invoked on the joined node.",
+                    latch.await(5, TimeUnit.SECONDS));
             }
         }
     }
diff --git a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite6.java b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite6.java
index 1de177c..8e8e43b 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite6.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite6.java
@@ -29,6 +29,7 @@ import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinu
 import org.apache.ignite.internal.processors.cache.query.continuous.CacheKeepBinaryIterationNearEnabledTest;
 import org.apache.ignite.internal.processors.cache.query.continuous.CacheKeepBinaryIterationStoreEnabledTest;
 import org.apache.ignite.internal.processors.cache.query.continuous.CacheKeepBinaryIterationTest;
+import org.apache.ignite.internal.processors.cache.query.continuous.ContinuousQueryDeserializationErrorOnNodeJoinTest;
 import org.apache.ignite.internal.processors.cache.query.continuous.ContinuousQueryMarshallerTest;
 import org.apache.ignite.internal.processors.cache.query.continuous.GridCacheContinuousQueryLocalAtomicSelfTest;
 import org.apache.ignite.internal.processors.cache.query.continuous.GridCacheContinuousQueryMultiNodesFilteringTest;
@@ -57,6 +58,7 @@ import org.junit.runners.Suite;
     CacheKeepBinaryIterationNearEnabledTest.class,
     GridCacheContinuousQueryPartitionedOnlySelfTest.class,
     CacheContinuousQueryOperationP2PTest.class,
+    ContinuousQueryDeserializationErrorOnNodeJoinTest.class,
     CacheContinuousBatchAckTest.class,
     CacheContinuousQueryOrderingEventTest.class,
     IgniteCacheContinuousQueryClientTest.class,
diff --git a/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpiTestSuite3.java b/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpiTestSuite3.java
index 2af8005..8403656 100644
--- a/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpiTestSuite3.java
+++ b/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpiTestSuite3.java
@@ -24,6 +24,8 @@ import org.apache.ignite.internal.processors.cache.datastructures.replicated.Gri
 import org.apache.ignite.internal.processors.cache.distributed.replicated.GridCacheReplicatedNodeRestartSelfTest;
 import org.apache.ignite.internal.processors.cache.multijvm.GridCacheAtomicMultiJvmFullApiSelfTest;
 import org.apache.ignite.internal.processors.cache.multijvm.GridCachePartitionedMultiJvmFullApiSelfTest;
+import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryLongP2PTest;
+import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryOperationP2PTest;
 import org.apache.ignite.internal.processors.continuous.GridEventConsumeSelfTest;
 import org.apache.ignite.p2p.GridP2PContinuousDeploymentSelfTest;
 import org.junit.BeforeClass;
@@ -43,7 +45,9 @@ import org.junit.runners.Suite;
     GridCachePartitionedSequenceApiSelfTest.class,
     GridCacheAtomicMultiJvmFullApiSelfTest.class,
     GridCachePartitionedMultiJvmFullApiSelfTest.class,
-    GridP2PContinuousDeploymentSelfTest.class
+    GridP2PContinuousDeploymentSelfTest.class,
+    CacheContinuousQueryOperationP2PTest.class,
+    CacheContinuousQueryLongP2PTest.class
 })
 public class ZookeeperDiscoverySpiTestSuite3 {
     /** */