You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by av...@apache.org on 2020/01/17 05:42:01 UTC

[ignite] branch master updated: IGNITE-12342 Continuous Queries: Remote filter and transformer have to run with appropriate SecurityContext (#7125)

This is an automated email from the ASF dual-hosted git repository.

av pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/master by this push:
     new 0a8b250  IGNITE-12342 Continuous Queries: Remote filter and transformer have to run with appropriate SecurityContext (#7125)
0a8b250 is described below

commit 0a8b2502ceabd590f3b271a2c292e95e934c2f3a
Author: Denis Garus <ga...@gmail.com>
AuthorDate: Fri Jan 17 08:41:39 2020 +0300

    IGNITE-12342 Continuous Queries: Remote filter and transformer have to run with appropriate SecurityContext (#7125)
---
 .../org/apache/ignite/internal/IgniteFeatures.java |   5 +-
 .../internal/cluster/ClusterGroupAdapter.java      |  11 +-
 .../processors/cache/GridCacheAdapter.java         |   8 +-
 .../AbstractSecurityAwareExternalizable.java       |  68 ++++++++++
 .../continuous/CacheContinuousQueryManager.java    |  69 ++++++++--
 .../query/continuous/SecurityAwareFilter.java      |  62 +++++++++
 .../continuous/SecurityAwareFilterFactory.java     |  52 ++++++++
 .../SecurityAwareTransformerFactory.java           |  67 ++++++++++
 .../GridResourceProxiedIgniteInjector.java         |  42 +-----
 .../processors/security/SecurityUtils.java         |   2 +-
 ...cheOperationRemoteSecurityContextCheckTest.java |  21 ++-
 .../AbstractRemoteSecurityContextCheckTest.java    | 122 ++++++++++++------
 .../cache/ContinuousQueryPermissionCheckTest.java  | 143 +++++++++++++++++++++
 ...tinuousQueryRemoteSecurityContextCheckTest.java | 125 ++++++++++++++++++
 ...tinuousQueryRemoteSecurityContextCheckTest.java | 106 +++++++++++++++
 ...hTransformerRemoteSecurityContextCheckTest.java | 118 +++++++++++++++++
 ...tryProcessorRemoteSecurityContextCheckTest.java |   2 +-
 .../ScanQueryRemoteSecurityContextCheckTest.java   |   2 +-
 ...teTaskCancelRemoteSecurityContextCheckTest.java |   1 -
 ...DataStreamerRemoteSecurityContextCheckTest.java |   2 +-
 .../ignite/testsuites/SecurityTestSuite.java       |   6 +
 21 files changed, 920 insertions(+), 114 deletions(-)

diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteFeatures.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteFeatures.java
index 2266c58..956d72f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteFeatures.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteFeatures.java
@@ -80,7 +80,10 @@ public enum IgniteFeatures {
     PME_FREE_SWITCH(19),
 
     /** Master key change. See {@link GridEncryptionManager#changeMasterKey(String)}. */
-    MASTER_KEY_CHANGE(20);
+    MASTER_KEY_CHANGE(20),
+
+    /** ContinuousQuery with security subject id support. */
+    CONT_QRY_SECURITY_AWARE(21);
 
     /**
      * Unique feature identifier.
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/cluster/ClusterGroupAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/cluster/ClusterGroupAdapter.java
index 7579ea2..3d4b71f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/cluster/ClusterGroupAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/cluster/ClusterGroupAdapter.java
@@ -44,6 +44,7 @@ import org.apache.ignite.internal.ClusterMetricsSnapshot;
 import org.apache.ignite.internal.GridKernalContext;
 import org.apache.ignite.internal.IgniteComputeImpl;
 import org.apache.ignite.internal.IgniteEventsImpl;
+import org.apache.ignite.internal.IgniteEx;
 import org.apache.ignite.internal.IgniteKernal;
 import org.apache.ignite.internal.IgniteMessagingImpl;
 import org.apache.ignite.internal.IgniteNodeAttributes;
@@ -769,7 +770,8 @@ public class ClusterGroupAdapter implements ClusterGroupEx, Externalizable {
         private boolean clients;
 
         /** Injected Ignite instance. */
-        private transient IgniteKernal ignite;
+        @IgniteInstanceResource
+        private transient IgniteEx ignite;
 
         /**
          * @param cacheName Cache name.
@@ -800,13 +802,6 @@ public class ClusterGroupAdapter implements ClusterGroupEx, Externalizable {
 
             return false;
         }
-
-        /** */
-        @IgniteInstanceResource
-        private void ignite(Ignite ignite) {
-            if (ignite != null)
-                this.ignite = ignite instanceof IgniteKernal ? (IgniteKernal)ignite : IgnitionEx.gridx(ignite.name());
-        }
     }
 
     /**
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
index abbbd57..f660c6b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
@@ -6688,6 +6688,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
         protected ComputeJobContext jobCtx;
 
         /** Injected grid instance. */
+        @IgniteInstanceResource
         protected IgniteEx ignite;
 
         /** Affinity topology version. */
@@ -6755,13 +6756,6 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
 
             return true;
         }
-
-        /** */
-        @IgniteInstanceResource
-        private void ignite(Ignite ignite) {
-            if (ignite != null)
-                this.ignite = ignite instanceof IgniteEx ? (IgniteEx)ignite : IgnitionEx.gridx(ignite.name());
-        }
     }
 
     /**
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/AbstractSecurityAwareExternalizable.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/AbstractSecurityAwareExternalizable.java
new file mode 100644
index 0000000..cc1fed0
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/AbstractSecurityAwareExternalizable.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.query.continuous;
+
+import java.io.Externalizable;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import java.util.UUID;
+import org.apache.ignite.internal.util.typedef.internal.U;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * Abstract security aware Externalizable.
+ */
+public abstract class AbstractSecurityAwareExternalizable<T> implements Externalizable {
+    /** Security subject id. */
+    protected UUID subjectId;
+
+    /** Original component. */
+    protected T original;
+
+    /**
+     * Default constructor.
+     */
+    protected AbstractSecurityAwareExternalizable() {
+        // No-op.
+    }
+
+    /**
+     * @param subjectId Security subject id.
+     * @param original Original component.
+     */
+    protected AbstractSecurityAwareExternalizable(UUID subjectId, T original) {
+        this.subjectId = requireNonNull(subjectId, "Parameter 'subjectId' cannot be null.");
+        this.original = requireNonNull(original, "Parameter 'original' cannot be null.");
+    }
+
+    /** {@inheritDoc} */
+    @Override public void writeExternal(ObjectOutput out) throws IOException {
+        U.writeUuid(out, subjectId);
+
+        out.writeObject(original);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+        subjectId = U.readUuid(in);
+
+        original = (T)in.readObject();
+    }
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
index d2fecbf..ecb32ea 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
@@ -33,6 +33,7 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.BiFunction;
 import javax.cache.Cache;
 import javax.cache.configuration.CacheEntryListenerConfiguration;
 import javax.cache.configuration.Factory;
@@ -86,11 +87,13 @@ import static javax.cache.event.EventType.REMOVED;
 import static javax.cache.event.EventType.UPDATED;
 import static org.apache.ignite.events.EventType.EVT_CACHE_QUERY_OBJECT_READ;
 import static org.apache.ignite.internal.GridTopic.TOPIC_CACHE;
+import static org.apache.ignite.internal.IgniteFeatures.CONT_QRY_SECURITY_AWARE;
+import static org.apache.ignite.internal.IgniteFeatures.allNodesSupports;
 
 /**
  * Continuous queries manager.
  */
-public class CacheContinuousQueryManager extends GridCacheManagerAdapter {
+public class CacheContinuousQueryManager<K, V> extends GridCacheManagerAdapter<K, V> {
     /** */
     private static final byte CREATED_FLAG = 0b0001;
 
@@ -514,9 +517,9 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter {
      */
     public UUID executeQuery(@Nullable final CacheEntryUpdatedListener locLsnr,
         @Nullable final EventListener locTransLsnr,
-        @Nullable final CacheEntryEventSerializableFilter rmtFilter,
-        @Nullable final Factory<? extends CacheEntryEventFilter> rmtFilterFactory,
-        @Nullable final Factory<? extends IgniteClosure> rmtTransFactory,
+        @Nullable final CacheEntryEventSerializableFilter<K, V> rmtFilter,
+        @Nullable final Factory<CacheEntryEventFilter<K, V>> rmtFilterFactory,
+        @Nullable final Factory<IgniteClosure<K, V>> rmtTransFactory,
         int bufSize,
         long timeInterval,
         boolean autoUnsubscribe,
@@ -533,8 +536,8 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter {
                         cctx.name(),
                         TOPIC_CACHE.topic(topicPrefix, cctx.localNodeId(), seq.getAndIncrement()),
                         locTransLsnr,
-                        rmtFilterFactory,
-                        rmtTransFactory,
+                        securityAwareFilterFactory(rmtFilterFactory),
+                        securityAwareTransformerFactory(rmtTransFactory),
                         true,
                         false,
                         !includeExpired,
@@ -549,7 +552,7 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter {
                         cctx.name(),
                         TOPIC_CACHE.topic(topicPrefix, cctx.localNodeId(), seq.getAndIncrement()),
                         locLsnr,
-                        rmtFilterFactory,
+                        securityAwareFilterFactory(rmtFilterFactory),
                         true,
                         false,
                         !includeExpired,
@@ -567,7 +570,7 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter {
                         cctx.name(),
                         TOPIC_CACHE.topic(topicPrefix, cctx.localNodeId(), seq.getAndIncrement()),
                         locLsnr,
-                        rmtFilter,
+                        securityAwareFilter(rmtFilter),
                         true,
                         false,
                         !includeExpired,
@@ -852,6 +855,50 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter {
     }
 
     /**
+     * @param factory Original factory.
+     * @return Security aware factory.
+     */
+    private Factory<IgniteClosure<K, V>> securityAwareTransformerFactory(Factory<IgniteClosure<K, V>> factory) {
+        return securityAwareComponent(factory, SecurityAwareTransformerFactory::new);
+    }
+
+    /**
+     * @param factory Original factory.
+     * @return Security aware factory.
+     */
+    private Factory<CacheEntryEventFilter<K, V>> securityAwareFilterFactory(Factory<CacheEntryEventFilter<K, V>> factory) {
+        return securityAwareComponent(factory, SecurityAwareFilterFactory::new);
+    }
+
+    /**
+     * @param filter Original filter.
+     * @return Security aware filter.
+     */
+    private CacheEntryEventSerializableFilter<K, V> securityAwareFilter(CacheEntryEventSerializableFilter<K, V> filter) {
+        return securityAwareComponent(filter, SecurityAwareFilter::new);
+    }
+
+    /**
+     * @param component Original component.
+     * @param f Function that converts the original component to a security aware component.
+     * @return Security aware component.
+     */
+    private <T> T securityAwareComponent(T component, BiFunction<UUID, T, T> f) {
+        if (component == null)
+            return null;
+
+        GridKernalContext ctx = cctx.kernalContext();
+
+        if (ctx.security().enabled() && allNodesSupports(ctx.discovery().allNodes(), CONT_QRY_SECURITY_AWARE)) {
+            final UUID subjectId = ctx.security().securityContext().subject().id();
+
+            return f.apply(subjectId, component);
+        }
+
+        return component;
+    }
+
+    /**
      * @param keepBinary Keep binary flag.
      * @param filter Filter.
      * @return Iterable for events created for existing cache entries.
@@ -1057,14 +1104,14 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter {
                 new IgniteOutClosure<CacheContinuousQueryHandler>() {
                     @Override public CacheContinuousQueryHandler apply() {
                         CacheContinuousQueryHandler hnd;
-                        Factory<CacheEntryEventFilter> rmtFilterFactory = cfg.getCacheEntryEventFilterFactory();
+                        Factory<CacheEntryEventFilter<K, V>> rmtFilterFactory = cfg.getCacheEntryEventFilterFactory();
 
                         if (rmtFilterFactory != null)
                             hnd = new CacheContinuousQueryHandlerV2(
                                 cctx.name(),
                                 TOPIC_CACHE.topic(topicPrefix, cctx.localNodeId(), seq.getAndIncrement()),
                                 locLsnr,
-                                rmtFilterFactory,
+                                securityAwareFilterFactory(rmtFilterFactory),
                                 cfg.isOldValueRequired(),
                                 cfg.isSynchronous(),
                                 false,
@@ -1090,7 +1137,7 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter {
                                 cctx.name(),
                                 TOPIC_CACHE.topic(topicPrefix, cctx.localNodeId(), seq.getAndIncrement()),
                                 locLsnr,
-                                jCacheFilter,
+                                securityAwareFilter(jCacheFilter),
                                 cfg.isOldValueRequired(),
                                 cfg.isSynchronous(),
                                 false,
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/SecurityAwareFilter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/SecurityAwareFilter.java
new file mode 100644
index 0000000..fe7cca4
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/SecurityAwareFilter.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.query.continuous;
+
+import java.util.UUID;
+import javax.cache.event.CacheEntryEvent;
+import javax.cache.event.CacheEntryEventFilter;
+import javax.cache.event.CacheEntryListenerException;
+import org.apache.ignite.cache.CacheEntryEventSerializableFilter;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.processors.security.OperationSecurityContext;
+import org.apache.ignite.resources.IgniteInstanceResource;
+
+/**
+ * Security aware remote filter.
+ */
+public class SecurityAwareFilter<K, V> extends AbstractSecurityAwareExternalizable<CacheEntryEventFilter<K, V>>
+    implements CacheEntryEventSerializableFilter<K, V> {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** Ignite. */
+    @IgniteInstanceResource
+    private transient IgniteEx ignite;
+
+    /**
+     * Default constructor.
+     */
+    public SecurityAwareFilter() {
+        // No-op.
+    }
+
+    /**
+     * @param subjectId Security subject id.
+     * @param original Original filter.
+     */
+    public SecurityAwareFilter(UUID subjectId, CacheEntryEventFilter<K, V> original) {
+        super(subjectId, original);
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean evaluate(CacheEntryEvent<? extends K, ? extends V> evt) throws CacheEntryListenerException {
+        try (OperationSecurityContext c = ignite.context().security().withContext(subjectId)) {
+            return original.evaluate(evt);
+        }
+    }
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/SecurityAwareFilterFactory.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/SecurityAwareFilterFactory.java
new file mode 100644
index 0000000..0460e98
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/SecurityAwareFilterFactory.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.query.continuous;
+
+import java.util.UUID;
+import javax.cache.configuration.Factory;
+import javax.cache.event.CacheEntryEventFilter;
+
+/**
+ * Security aware remote filter factory.
+ */
+public class SecurityAwareFilterFactory<K, V> extends
+    AbstractSecurityAwareExternalizable<Factory<CacheEntryEventFilter<K, V>>> implements
+    Factory<CacheEntryEventFilter<K, V>> {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /**
+     * Default constructor.
+     */
+    public SecurityAwareFilterFactory() {
+        // No-op.
+    }
+
+    /**
+     * @param subjectId Security subject id.
+     * @param original Original factory.
+     */
+    public SecurityAwareFilterFactory(UUID subjectId, Factory<CacheEntryEventFilter<K, V>> original) {
+        super(subjectId, original);
+    }
+
+    /** {@inheritDoc} */
+    @Override public CacheEntryEventFilter<K, V> create() {
+        return new SecurityAwareFilter<>(subjectId, original.create());
+    }
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/SecurityAwareTransformerFactory.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/SecurityAwareTransformerFactory.java
new file mode 100644
index 0000000..b765464
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/SecurityAwareTransformerFactory.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.query.continuous;
+
+import java.util.UUID;
+import javax.cache.configuration.Factory;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.processors.security.OperationSecurityContext;
+import org.apache.ignite.lang.IgniteClosure;
+import org.apache.ignite.resources.IgniteInstanceResource;
+
+/**
+ *  Security aware transformer factory.
+ */
+public class SecurityAwareTransformerFactory<E, R> extends
+    AbstractSecurityAwareExternalizable<Factory<IgniteClosure<E, R>>> implements Factory<IgniteClosure<E, R>> {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /**
+     * Default constructor.
+     */
+    public SecurityAwareTransformerFactory() {
+        // No-op.
+    }
+
+    /**
+     * @param subjectId Security subject id.
+     * @param original Original factory.
+     */
+    public SecurityAwareTransformerFactory(UUID subjectId, Factory<IgniteClosure<E, R>> original) {
+        super(subjectId, original);
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgniteClosure<E, R> create() {
+        final IgniteClosure<E, R> cl = original.create();
+
+        return new IgniteClosure<E, R>() {
+            /** Ignite. */
+            @IgniteInstanceResource
+            private IgniteEx ignite;
+
+            /** {@inheritDoc} */
+            @Override public R apply(E e) {
+                try (OperationSecurityContext c = ignite.context().security().withContext(subjectId)) {
+                    return cl.apply(e);
+                }
+            }
+        };
+    }
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/resource/GridResourceProxiedIgniteInjector.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/resource/GridResourceProxiedIgniteInjector.java
index de9dc45..1a19de0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/resource/GridResourceProxiedIgniteInjector.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/resource/GridResourceProxiedIgniteInjector.java
@@ -17,39 +17,16 @@
 
 package org.apache.ignite.internal.processors.resource;
 
-import java.util.concurrent.Callable;
 import org.apache.ignite.Ignite;
 import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.IgniteDataStreamer;
-import org.apache.ignite.compute.ComputeJob;
-import org.apache.ignite.compute.ComputeTask;
+import org.apache.ignite.internal.IgniteEx;
 import org.apache.ignite.internal.managers.deployment.GridDeployment;
-import org.apache.ignite.lang.IgniteBiClosure;
-import org.apache.ignite.lang.IgniteBiPredicate;
-import org.apache.ignite.lang.IgniteCallable;
-import org.apache.ignite.lang.IgniteClosure;
-import org.apache.ignite.lang.IgnitePredicate;
-import org.apache.ignite.lang.IgniteRunnable;
 
+import static org.apache.ignite.internal.processors.security.SecurityUtils.isSystemType;
 import static org.apache.ignite.internal.processors.security.sandbox.SandboxIgniteComponentProxy.proxy;
 
 /** Ignite instance injector. */
 public class GridResourceProxiedIgniteInjector extends GridResourceBasicInjector<Ignite> {
-    /** Array of classes that should get a proxied instance of Ignite. */
-    private static final Class[] PROXIED_CLASSES = new Class[]{
-        Runnable.class,
-        IgniteRunnable.class,
-        Callable.class,
-        IgniteCallable.class,
-        ComputeTask.class,
-        ComputeJob.class,
-        IgniteClosure.class,
-        IgniteBiClosure.class,
-        IgniteDataStreamer.class,
-        IgnitePredicate.class,
-        IgniteBiPredicate.class,
-    };
-
     /**
      * @param rsrc Resource.
      */
@@ -59,19 +36,8 @@ public class GridResourceProxiedIgniteInjector extends GridResourceBasicInjector
 
     /** */
     private Ignite ignite(Object target) {
-        return shouldUseProxy(target) ? proxy(Ignite.class, getResource()) : getResource();
-    }
-
-    /**
-     * @return True if {@code target} should get a proxy instance of Ignite.
-     */
-    private boolean shouldUseProxy(Object target){
-        for (Class cls : PROXIED_CLASSES) {
-            if (cls.isInstance(target))
-                return true;
-        }
-
-        return false;
+        return !isSystemType(((IgniteEx)getResource()).context(), target)
+            ? proxy(Ignite.class, getResource()) : getResource();
     }
 
     /** {@inheritDoc} */
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/security/SecurityUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/security/SecurityUtils.java
index d2b1eaf..298eddb 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/security/SecurityUtils.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/security/SecurityUtils.java
@@ -170,7 +170,7 @@ public class SecurityUtils {
     /**
      * @return True if class of {@code target} is a system type.
      */
-    private static boolean isSystemType(GridKernalContext ctx, Object target) {
+    public static boolean isSystemType(GridKernalContext ctx, Object target) {
         Class cls = target instanceof GridInternalWrapper
             ? ((GridInternalWrapper)target).userObject().getClass()
             : target.getClass();
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/security/AbstractCacheOperationRemoteSecurityContextCheckTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/security/AbstractCacheOperationRemoteSecurityContextCheckTest.java
index f7380c2..97ebcc0 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/security/AbstractCacheOperationRemoteSecurityContextCheckTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/security/AbstractCacheOperationRemoteSecurityContextCheckTest.java
@@ -48,13 +48,14 @@ public abstract class AbstractCacheOperationRemoteSecurityContextCheckTest exten
     }
 
     /**
-     * Getting the key that is contained on primary partition on passed node for {@link #CACHE_NAME} cache.
+     * Getting the key that is contained on primary partition on passed node for cache.
      *
      * @param ignite Node.
+     * @param cacheName Cache name.
      * @return Key.
      */
-    protected Integer prmKey(IgniteEx ignite) {
-        return findKeys(ignite.localNode(), ignite.cache(CACHE_NAME), 1, 0, 0)
+    protected Integer primaryKey(IgniteEx ignite, String cacheName) {
+        return findKeys(ignite.localNode(), ignite.cache(cacheName), 1, 0, 0)
             .stream()
             .findFirst()
             .orElseThrow(() -> new IllegalStateException(ignite.name() + " isn't primary node for any key."));
@@ -63,10 +64,20 @@ public abstract class AbstractCacheOperationRemoteSecurityContextCheckTest exten
     /**
      * Getting the key that is contained on primary partition on passed node for {@link #CACHE_NAME} cache.
      *
+     * @param ignite Node.
+     * @return Key.
+     */
+    protected Integer primaryKey(IgniteEx ignite) {
+        return primaryKey(ignite, CACHE_NAME);
+    }
+
+    /**
+     * Getting the key that is contained on primary partition on passed node for {@link #CACHE_NAME} cache.
+     *
      * @param nodeName Node name.
      * @return Key.
      */
-    protected Integer prmKey(String nodeName) {
-        return prmKey((IgniteEx)G.ignite(nodeName));
+    protected Integer primaryKey(String nodeName) {
+        return primaryKey((IgniteEx)G.ignite(nodeName));
     }
 }
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/security/AbstractRemoteSecurityContextCheckTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/security/AbstractRemoteSecurityContextCheckTest.java
index 9acc07e..40e23b5 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/security/AbstractRemoteSecurityContextCheckTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/security/AbstractRemoteSecurityContextCheckTest.java
@@ -17,15 +17,15 @@
 
 package org.apache.ignite.internal.processors.security;
 
-import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 import java.util.UUID;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.stream.Stream;
 import javax.cache.Cache;
 import javax.cache.processor.EntryProcessor;
@@ -130,7 +130,7 @@ public abstract class AbstractRemoteSecurityContextCheckTest extends AbstractSec
      */
     protected void runAndCheck(IgniteEx initiator, Stream<IgniteRunnable> ops) {
         ops.forEach(r -> {
-            VERIFIER.clear().initiator(initiator);
+            VERIFIER.initiator(initiator);
 
             setupVerifier(VERIFIER);
 
@@ -147,12 +147,12 @@ public abstract class AbstractRemoteSecurityContextCheckTest extends AbstractSec
         /**
          * Map that contains an expected behaviour.
          */
-        private final Map<String, T2<Integer, Integer>> expInvokes = new HashMap<>();
+        private final Map<T2<String, String>, T2<Integer, AtomicInteger>> expInvokes = new ConcurrentHashMap<>();
 
         /**
-         * List of registered security subjects.
+         * Checked errors.
          */
-        private final List<T2<UUID, String>> registeredSubjects = new ArrayList<>();
+        private final Collection<String> errors = new ArrayBlockingQueue<>(10);
 
         /**
          * Expected security subject id.
@@ -160,24 +160,35 @@ public abstract class AbstractRemoteSecurityContextCheckTest extends AbstractSec
         private UUID expSecSubjId;
 
         /** */
-        public Verifier clear() {
-            registeredSubjects.clear();
+        private void clear() {
             expInvokes.clear();
 
-            expSecSubjId = null;
+            errors.clear();
 
-            return this;
+            expSecSubjId = null;
         }
 
         /**
-         * Adds expected behaivior the method {@link #register} will be invoke exp times on the node with
-         * passed name.
+         * Adds expected behaivior the method {@link #register} will be invoke expected times on the node with passed
+         * name.
          *
          * @param nodeName Node name.
          * @param num Expected number of invokes.
          */
         public Verifier expect(String nodeName, int num) {
-            expInvokes.put(nodeName, new T2<>(num, 0));
+            return expect(nodeName, null, num);
+        }
+
+        /**
+         * Adds expected behaivior the method {@link #register} will be invoke expected times on the node with passed
+         * name and the passed operation name.
+         *
+         * @param nodeName Node name.
+         * @param opName Operation name.
+         * @param num Expected number of invokes.
+         */
+        public Verifier expect(String nodeName, String opName, int num) {
+            expInvokes.put(new T2<>(nodeName, opName), new T2<>(num, new AtomicInteger()));
 
             return this;
         }
@@ -186,49 +197,75 @@ public abstract class AbstractRemoteSecurityContextCheckTest extends AbstractSec
          * Registers a security subject referred for {@code localIgnite} and increments invoke counter.
          */
         public void register() {
-            register((IgniteEx)localIgnite());
+            register((IgniteEx)localIgnite(), null);
+        }
+
+        /**
+         * Registers a security subject referred for {@code localIgnite} with the passed operation name and increments
+         * invoke counter.
+         *
+         * @param opName Operation name.
+         */
+        public void register(String opName) {
+            register((IgniteEx)localIgnite(), opName);
         }
 
         /**
          * Registers a security subject referred for the passed {@code ignite} and increments invoke counter.
+         *
+         * @param ignite Instance of ignite.
          */
-        public synchronized void register(IgniteEx ignite) {
-            registeredSubjects.add(new T2<>(secSubjectId(ignite), ignite.name()));
+        public void register(IgniteEx ignite) {
+            register(ignite, null);
+        }
 
-            expInvokes.computeIfPresent(ignite.name(), (name, t2) -> {
-                Integer val = t2.getValue();
+        /**
+         * Registers a security subject referred for the passed {@code ignite} with the passed operation name and
+         * increments invoke counter.
+         *
+         * @param ignite Instance of ignite.
+         * @param opName Operation name.
+         */
+        public void register(IgniteEx ignite, String opName) {
+            if (expSecSubjId == null) {
+                error("SubjectId cannot be null.");
 
-                t2.setValue(++val);
+                return;
+            }
+
+            UUID actualSubjId = secSubjectId(ignite);
+
+            if (!expSecSubjId.equals(actualSubjId)) {
+                error("Actual subjectId does not equal expected subjectId " + "[expected=" + expSecSubjId +
+                    ", actual=" + actualSubjId + "].");
+
+                return;
+            }
 
-                return t2;
-            });
+            T2<Integer, AtomicInteger> v = expInvokes.get(new T2<>(ignite.name(), opName));
+
+            if (v != null)
+                v.get2().incrementAndGet();
+            else
+                error("Unexpected registration parameters [node=" + ignite.name() + ", opName=" + opName + "].");
         }
 
         /**
          * Checks result of test and clears expected behavior.
          */
         public void checkResult() {
-            registeredSubjects.forEach(t ->
-                assertEquals("Invalide security context on node " + t.get2(),
-                    expSecSubjId, t.get1())
-            );
-
-            expInvokes.forEach((key, value) ->
-                assertEquals("Node " + key + ". Execution of register: ",
-                    value.get1(), value.get2()));
+            if(!errors.isEmpty())
+                throw new AssertionError(errors.stream().reduce((s1, s2) -> s1 + "\n" + s2).get());
 
-            clear();
-        }
-
-        /** */
-        private Verifier expectSubjId(UUID expSecSubjId) {
-            this.expSecSubjId = expSecSubjId;
-
-            return this;
+            expInvokes.forEach((k, v) -> assertEquals("Node \"" + k.get1() + '\"' +
+                (k.get2() != null ? ", operation \"" + k.get2() + '\"' : "") +
+                ". Execution of register: ", v.get1(), Integer.valueOf(v.get2().get())));
         }
 
         /** */
         public Verifier initiator(IgniteEx initiator) {
+            clear();
+
             expSecSubjId = secSubjectId(initiator);
 
             return this;
@@ -238,6 +275,13 @@ public abstract class AbstractRemoteSecurityContextCheckTest extends AbstractSec
         private UUID secSubjectId(IgniteEx node) {
             return node.context().security().securityContext().subject().id();
         }
+
+        /**
+         * @param msg Error message.
+         */
+        private void error(String msg) {
+            errors.add(msg);
+        }
     }
 
     /** */
@@ -341,7 +385,7 @@ public abstract class AbstractRemoteSecurityContextCheckTest extends AbstractSec
             run();
 
             if (k instanceof Cache.Entry)
-                return (V) ((Cache.Entry)k).getValue();
+                return (V)((Cache.Entry)k).getValue();
 
             return null;
         }
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/security/cache/ContinuousQueryPermissionCheckTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/security/cache/ContinuousQueryPermissionCheckTest.java
new file mode 100644
index 0000000..0292c6c
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/security/cache/ContinuousQueryPermissionCheckTest.java
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.security.cache;
+
+import java.util.function.Consumer;
+import javax.cache.Cache;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.cache.query.ContinuousQuery;
+import org.apache.ignite.cache.query.ContinuousQueryWithTransformer;
+import org.apache.ignite.cache.query.QueryCursor;
+import org.apache.ignite.cluster.ClusterState;
+import org.apache.ignite.internal.processors.security.AbstractCacheOperationPermissionCheckTest;
+import org.apache.ignite.plugin.security.SecurityException;
+import org.apache.ignite.plugin.security.SecurityPermissionSet;
+import org.apache.ignite.plugin.security.SecurityPermissionSetBuilder;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+import static org.apache.ignite.plugin.security.SecurityPermission.CACHE_READ;
+import static org.apache.ignite.testframework.GridTestUtils.assertThrowsWithCause;
+
+/**
+ * Tests check a cache permission for continuous queries.
+ */
+@RunWith(JUnit4.class)
+public class ContinuousQueryPermissionCheckTest extends AbstractCacheOperationPermissionCheckTest {
+    /** Test server node name. */
+    private static final String SRV = "srv_test_node";
+
+    /** Test client node name. */
+    private static final String CLNT = "cnt_test_node";
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTestsStarted() throws Exception {
+        Ignite srv = startGridAllowAll("server");
+
+        startGrid(SRV, permissions(), true);
+
+        startGrid(CLNT, permissions(), true);
+
+        srv.cluster().state(ClusterState.ACTIVE);
+    }
+
+    /**
+     * Tests {@link ContinuousQuery} from a client.
+     */
+    @Test
+    public void testClientContinuousQuery() {
+        check(n -> startContinuousQuery(grid(CLNT), n));
+    }
+
+    /**
+     * Tests {@link ContinuousQuery} from a server.
+     */
+    @Test
+    public void testServerContinuousQuery() {
+        check(n -> startContinuousQuery(grid(SRV), n));
+    }
+
+    /**
+     * Tests {@link ContinuousQueryWithTransformer} from a client.
+     */
+    @Test
+    public void testClientContinuousQueryWithTransformer() {
+        check(n -> startContinuousQueryWithTransformer(grid(CLNT), n));
+    }
+
+    /**
+     * Tests {@link ContinuousQueryWithTransformer} from a server.
+     */
+    @Test
+    public void testServerContinuousQueryWithTransformer() {
+        check(n -> startContinuousQueryWithTransformer(grid(SRV), n));
+    }
+
+    /**
+     * @param c Consumer that accepts a cache name.
+     */
+    private void check(final Consumer<String> c) {
+        c.accept(CACHE_NAME);
+
+        assertThrowsWithCause(() -> c.accept(FORBIDDEN_CACHE), SecurityException.class);
+    }
+
+    /**
+     * Starts {@link ContinuousQuery}.
+     *
+     * @param node Ignie node.
+     * @param cacheName Cache name.
+     */
+    private void startContinuousQuery(Ignite node, String cacheName) {
+        ContinuousQuery<String, Integer> q = new ContinuousQuery<>();
+
+        q.setLocalListener(e -> {/* No-op. */});
+
+        try (QueryCursor<Cache.Entry<String, Integer>> cur = node.cache(cacheName).query(q)) {
+            // No-op.
+        }
+    }
+
+    /**
+     * Starts {@link ContinuousQueryWithTransformer}.
+     *
+     * @param node Ignite node.
+     * @param cacheName Cache name.
+     */
+    private void startContinuousQueryWithTransformer(Ignite node, String cacheName) {
+        ContinuousQueryWithTransformer<String, Integer, String> q = new ContinuousQueryWithTransformer<>();
+
+        q.setLocalListener(e -> {/* No-op. */});
+
+        q.setRemoteTransformerFactory(() -> e -> "value");
+
+        try (QueryCursor<Cache.Entry<String, Integer>> cur = node.cache(cacheName).query(q)) {
+            // No-op.
+        }
+    }
+
+    /**
+     * @return Subject Ignite permissions.
+     */
+    private SecurityPermissionSet permissions() {
+        return SecurityPermissionSetBuilder.create()
+            .appendCachePermissions(CACHE_NAME, CACHE_READ)
+            .appendCachePermissions(FORBIDDEN_CACHE, EMPTY_PERMS).build();
+    }
+}
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/security/cache/closure/AbstractContinuousQueryRemoteSecurityContextCheckTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/security/cache/closure/AbstractContinuousQueryRemoteSecurityContextCheckTest.java
new file mode 100644
index 0000000..090cb4b
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/security/cache/closure/AbstractContinuousQueryRemoteSecurityContextCheckTest.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.security.cache.closure;
+
+import java.util.concurrent.atomic.AtomicInteger;
+import javax.cache.Cache;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.cache.CacheEntryEventSerializableFilter;
+import org.apache.ignite.cache.CacheMode;
+import org.apache.ignite.cache.query.Query;
+import org.apache.ignite.cache.query.QueryCursor;
+import org.apache.ignite.cluster.ClusterState;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.processors.security.AbstractCacheOperationRemoteSecurityContextCheckTest;
+import org.apache.ignite.internal.processors.security.SecurityContext;
+import org.apache.ignite.lang.IgniteBiPredicate;
+
+import static org.apache.ignite.Ignition.localIgnite;
+
+/**
+ * The base class for tests that check continuous queries {@link SecurityContext} on a remote node.
+ */
+public class AbstractContinuousQueryRemoteSecurityContextCheckTest extends
+    AbstractCacheOperationRemoteSecurityContextCheckTest {
+    /** Server node to change cache state. */
+    private static final String SRV = "srv";
+
+    /** Cache index. */
+    private static final AtomicInteger CACHE_INDEX = new AtomicInteger();
+
+    /** Open continuous query operation. */
+    protected static final String OPERATION_OPEN_CQ = "open_cq";
+
+    /** Init query, filter or transform operation. */
+    protected static final String OPERATION_CQ_COMPONENT = "cq_component";
+
+    /** Preidacte for inital query tests. */
+    protected static final IgniteBiPredicate<Integer, Integer> INITIAL_QUERY_FILTER = (k, v) -> {
+        VERIFIER.register(OPERATION_CQ_COMPONENT);
+
+        return true;
+    };
+
+    /** Remote filter. */
+    protected static final CacheEntryEventSerializableFilter<Integer, Integer> RMT_FILTER = e -> {
+        VERIFIER.register(OPERATION_CQ_COMPONENT);
+
+        return true;
+    };
+
+    /** {@inheritDoc} */
+    @Override protected CacheConfiguration[] getCacheConfigurations() {
+        return new CacheConfiguration[] {};
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTestsStarted() throws Exception {
+        IgniteEx srv = startGridAllowAll(SRV);
+
+        startGridAllowAll(SRV_INITIATOR);
+
+        startClientAllowAll(CLNT_INITIATOR);
+
+        startGridAllowAll(SRV_RUN);
+
+        startClientAllowAll(CLNT_RUN);
+
+        startGridAllowAll(SRV_CHECK);
+
+        srv.cluster().state(ClusterState.ACTIVE);
+
+        awaitPartitionMapExchange();
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void setupVerifier(Verifier verifier) {
+        verifier
+            .expect(SRV_RUN, OPERATION_OPEN_CQ, 1)
+            .expect(CLNT_RUN, OPERATION_OPEN_CQ, 1)
+            .expect(SRV_CHECK, OPERATION_CQ_COMPONENT, 2);
+    }
+
+    /**
+     * Opens query cursor.
+     *
+     * @param q {@link Query}.
+     * @param init True if needing put data to a cache before openning a cursor.
+     */
+    protected void executeQuery(Query<Cache.Entry<Integer, Integer>> q, boolean init) {
+        Ignite ignite = localIgnite();
+
+        IgniteCache<Integer, Integer> cache = ignite.getOrCreateCache(
+            new CacheConfiguration<Integer, Integer>()
+                .setName(CACHE_NAME + CACHE_INDEX.incrementAndGet())
+                .setCacheMode(CacheMode.PARTITIONED)
+        );
+
+        if (init)
+            cache.put(primaryKey(grid(SRV_CHECK), cache.getName()), 100);
+
+        try (QueryCursor<Cache.Entry<Integer, Integer>> cur = cache.query(q)) {
+            if (!init)
+                cache.put(primaryKey(grid(SRV_CHECK), cache.getName()), 100);
+
+            cur.getAll();
+        }
+    }
+}
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/security/cache/closure/ContinuousQueryRemoteSecurityContextCheckTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/security/cache/closure/ContinuousQueryRemoteSecurityContextCheckTest.java
new file mode 100644
index 0000000..767e2ad
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/security/cache/closure/ContinuousQueryRemoteSecurityContextCheckTest.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.security.cache.closure;
+
+import java.util.function.Consumer;
+import org.apache.ignite.cache.query.ContinuousQuery;
+import org.apache.ignite.cache.query.ScanQuery;
+import org.apache.ignite.lang.IgniteRunnable;
+import org.junit.Test;
+
+/**
+ * Tests check appropriate security context when the scan query, remote filter, or
+ * remote filter factory of a {@link ContinuousQuery} is executed on a remote node.
+ * <p>
+ * The initiator node broadcasts a task to 'run' node that starts a {@link ContinuousQuery}'s component. That component
+ * is executed on 'check' node. On every step, it is performed verification that operation securitycontext is the
+ * initiator context.
+ */
+public class ContinuousQueryRemoteSecurityContextCheckTest extends
+    AbstractContinuousQueryRemoteSecurityContextCheckTest {
+    /**
+     * Test initial query of {@link ContinuousQuery}.
+     */
+    @Test
+    public void testInitialQuery() {
+        Consumer<ContinuousQuery<Integer, Integer>> consumer = new Consumer<ContinuousQuery<Integer, Integer>>() {
+            @Override public void accept(ContinuousQuery<Integer, Integer> q) {
+                q.setInitialQuery(new ScanQuery<>(INITIAL_QUERY_FILTER));
+                q.setLocalListener(e -> {/* No-op. */});
+            }
+        };
+
+        runAndCheck(grid(SRV_INITIATOR), operation(consumer, true));
+        runAndCheck(grid(CLNT_INITIATOR), operation(consumer, true));
+    }
+
+    /**
+     * Tests remote filter factory of {@link ContinuousQuery}.
+     */
+    @Test
+    public void testRemoteFilterFactory() {
+        Consumer<ContinuousQuery<Integer, Integer>> consumer = new Consumer<ContinuousQuery<Integer, Integer>>() {
+            @Override public void accept(ContinuousQuery<Integer, Integer> q) {
+                q.setRemoteFilterFactory(() -> RMT_FILTER);
+            }
+        };
+
+        runAndCheck(grid(SRV_INITIATOR), operation(consumer));
+        runAndCheck(grid(CLNT_INITIATOR), operation(consumer));
+    }
+
+    /**
+     * Tests remote filter of {@link ContinuousQuery}.
+     */
+    @Test
+    public void testRemoteFilter() {
+        Consumer<ContinuousQuery<Integer, Integer>> consumer = new Consumer<ContinuousQuery<Integer, Integer>>() {
+            @Override public void accept(ContinuousQuery<Integer, Integer> q) {
+                q.setRemoteFilter(RMT_FILTER);
+            }
+        };
+
+        runAndCheck(grid(SRV_INITIATOR), operation(consumer));
+        runAndCheck(grid(CLNT_INITIATOR), operation(consumer));
+    }
+
+    /**
+     * @param c Consumer that setups a {@link ContinuousQuery}.
+     * @param init True if needing put data to a cache before openning a cursor.
+     * @return Test operation.
+     */
+    private IgniteRunnable operation(Consumer<ContinuousQuery<Integer, Integer>> c, boolean init) {
+        return () -> {
+            VERIFIER.register(OPERATION_OPEN_CQ);
+
+            ContinuousQuery<Integer, Integer> cq = new ContinuousQuery<>();
+
+            c.accept(cq);
+
+            executeQuery(cq, init);
+        };
+    }
+
+    /**
+     * @param c Consumer that setups a {@link ContinuousQuery}.
+     * @return Test operation.
+     */
+    private IgniteRunnable operation(Consumer<ContinuousQuery<Integer, Integer>> c) {
+       return operation(c, false);
+    }
+}
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/security/cache/closure/ContinuousQueryWithTransformerRemoteSecurityContextCheckTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/security/cache/closure/ContinuousQueryWithTransformerRemoteSecurityContextCheckTest.java
new file mode 100644
index 0000000..f9ed65f
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/security/cache/closure/ContinuousQueryWithTransformerRemoteSecurityContextCheckTest.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.security.cache.closure;
+
+import java.util.function.Consumer;
+import javax.cache.Cache;
+import org.apache.ignite.cache.query.ContinuousQueryWithTransformer;
+import org.apache.ignite.cache.query.ScanQuery;
+import org.apache.ignite.lang.IgniteRunnable;
+import org.junit.Test;
+
+/**
+ * Tests check appropriate security context when the scan query, transformer factory}, or remote filter factory of a
+ * {@link ContinuousQueryWithTransformer} is executed on a remote node.
+ * <p>
+ * The initiator node broadcasts a task to 'run' node that starts a {@link ContinuousQueryWithTransformer}'s component.
+ * That component is executed on 'check' node. On every step, it is performed verification that operation
+ * securitycontext is the initiator context.
+ */
+public class ContinuousQueryWithTransformerRemoteSecurityContextCheckTest extends
+    AbstractContinuousQueryRemoteSecurityContextCheckTest {
+    /**
+     * Tests initial query of {@link ContinuousQueryWithTransformer}.
+     */
+    @Test
+    public void testInitialQuery() {
+        Consumer<ContinuousQueryWithTransformer<Integer, Integer, Integer>> consumer =
+            new Consumer<ContinuousQueryWithTransformer<Integer, Integer, Integer>>() {
+                @Override public void accept(ContinuousQueryWithTransformer<Integer, Integer, Integer> q) {
+                    q.setInitialQuery(new ScanQuery<>(INITIAL_QUERY_FILTER));
+                    q.setRemoteTransformerFactory(() -> Cache.Entry::getValue);
+                    q.setLocalListener(e -> {/* No-op. */});
+                }
+            };
+
+        runAndCheck(grid(SRV_INITIATOR), operation(consumer, true));
+        runAndCheck(grid(CLNT_INITIATOR), operation(consumer, true));
+    }
+
+    /**
+     * Tests remote filter factory of {@link ContinuousQueryWithTransformer}.
+     */
+    @Test
+    public void testRemoteFilterFactory() {
+        Consumer<ContinuousQueryWithTransformer<Integer, Integer, Integer>> consumer =
+            new Consumer<ContinuousQueryWithTransformer<Integer, Integer, Integer>>() {
+                @Override public void accept(ContinuousQueryWithTransformer<Integer, Integer, Integer> q) {
+                    q.setRemoteFilterFactory(() -> RMT_FILTER);
+                    q.setRemoteTransformerFactory(() -> Cache.Entry::getValue);
+                }
+            };
+
+        runAndCheck(grid(SRV_INITIATOR), operation(consumer));
+        runAndCheck(grid(CLNT_INITIATOR), operation(consumer));
+    }
+
+    /**
+     * Tests transformer factory of {@link ContinuousQueryWithTransformer}.
+     */
+    @Test
+    public void testTransformerFactory() {
+        Consumer<ContinuousQueryWithTransformer<Integer, Integer, Integer>> consumer =
+            new Consumer<ContinuousQueryWithTransformer<Integer, Integer, Integer>>() {
+                @Override public void accept(ContinuousQueryWithTransformer<Integer, Integer, Integer> q) {
+                    q.setRemoteTransformerFactory(() -> e -> {
+                        VERIFIER.register(OPERATION_CQ_COMPONENT);
+
+                        return e.getValue();
+                    });
+                    q.setLocalListener(e -> {/* No-op. */});
+                }
+            };
+
+        runAndCheck(grid(SRV_INITIATOR), operation(consumer));
+        runAndCheck(grid(CLNT_INITIATOR), operation(consumer));
+    }
+
+    /**
+     * @param c Consumer that setups a {@link ContinuousQueryWithTransformer}.
+     * @param init True if needing put data to a cache before openning a cursor.
+     * @return Test operation.
+     */
+    private IgniteRunnable operation(Consumer<ContinuousQueryWithTransformer<Integer, Integer, Integer>> c,
+        boolean init) {
+        return () -> {
+            VERIFIER.register(OPERATION_OPEN_CQ);
+
+            ContinuousQueryWithTransformer<Integer, Integer, Integer> cq = new ContinuousQueryWithTransformer<>();
+
+            c.accept(cq);
+
+            executeQuery(cq, init);
+        };
+    }
+
+    /**
+     * @param c Consumer that setups a {@link ContinuousQueryWithTransformer}.
+     * @return Test operation.
+     */
+    private IgniteRunnable operation(Consumer<ContinuousQueryWithTransformer<Integer, Integer, Integer>> c) {
+        return operation(c, false);
+    }
+}
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/security/cache/closure/EntryProcessorRemoteSecurityContextCheckTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/security/cache/closure/EntryProcessorRemoteSecurityContextCheckTest.java
index 4ca1f60..facd7ec 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/security/cache/closure/EntryProcessorRemoteSecurityContextCheckTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/security/cache/closure/EntryProcessorRemoteSecurityContextCheckTest.java
@@ -87,7 +87,7 @@ public class EntryProcessorRemoteSecurityContextCheckTest extends AbstractCacheO
      * @return Stream of runnables to call invoke methods.
      */
     private Stream<IgniteRunnable> operations() {
-        final Integer key = prmKey(grid(SRV_CHECK));
+        final Integer key = primaryKey(grid(SRV_CHECK));
 
         return Stream.<IgniteRunnable>of(
             () -> localIgnite().<Integer, Integer>cache(CACHE_NAME).invoke(key, createRunner()),
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/security/cache/closure/ScanQueryRemoteSecurityContextCheckTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/security/cache/closure/ScanQueryRemoteSecurityContextCheckTest.java
index 27f7116..24035ad 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/security/cache/closure/ScanQueryRemoteSecurityContextCheckTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/security/cache/closure/ScanQueryRemoteSecurityContextCheckTest.java
@@ -73,7 +73,7 @@ public class ScanQueryRemoteSecurityContextCheckTest extends AbstractCacheOperat
     @Test
     public void test() throws Exception {
         grid(SRV_INITIATOR).cache(CACHE_NAME)
-            .put(prmKey(grid(SRV_CHECK)), 1);
+            .put(primaryKey(grid(SRV_CHECK)), 1);
 
         awaitPartitionMapExchange();
 
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/security/compute/closure/ComputeTaskCancelRemoteSecurityContextCheckTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/security/compute/closure/ComputeTaskCancelRemoteSecurityContextCheckTest.java
index 15c342c..c79245d 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/security/compute/closure/ComputeTaskCancelRemoteSecurityContextCheckTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/security/compute/closure/ComputeTaskCancelRemoteSecurityContextCheckTest.java
@@ -126,7 +126,6 @@ public class ComputeTaskCancelRemoteSecurityContextCheckTest extends AbstractRem
     /** */
     private void checkCancel(IgniteEx initator, IgniteEx rmt, Consumer<IgniteFuture> consumer) throws Exception {
         VERIFIER
-            .clear()
             .initiator(initator)
             .expect(rmt.name(), 1);
 
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/security/datastreamer/closure/DataStreamerRemoteSecurityContextCheckTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/security/datastreamer/closure/DataStreamerRemoteSecurityContextCheckTest.java
index fbb82aa..fa7bb8f 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/security/datastreamer/closure/DataStreamerRemoteSecurityContextCheckTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/security/datastreamer/closure/DataStreamerRemoteSecurityContextCheckTest.java
@@ -74,7 +74,7 @@ public class DataStreamerRemoteSecurityContextCheckTest extends AbstractCacheOpe
             try (IgniteDataStreamer<Integer, Integer> strm = Ignition.localIgnite().dataStreamer(CACHE_NAME)) {
                 strm.receiver(StreamVisitor.from(new ExecRegisterAndForwardAdapter<>(endpoints())));
 
-                strm.addData(prmKey(grid(SRV_CHECK)), 100);
+                strm.addData(primaryKey(grid(SRV_CHECK)), 100);
             }
         };
 
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/SecurityTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/SecurityTestSuite.java
index 1d54e75..eb8c88b 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/SecurityTestSuite.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/SecurityTestSuite.java
@@ -19,9 +19,12 @@ package org.apache.ignite.testsuites;
 
 import org.apache.ignite.internal.processors.security.InvalidServerTest;
 import org.apache.ignite.internal.processors.security.cache.CacheOperationPermissionCheckTest;
+import org.apache.ignite.internal.processors.security.cache.ContinuousQueryPermissionCheckTest;
 import org.apache.ignite.internal.processors.security.cache.EntryProcessorPermissionCheckTest;
 import org.apache.ignite.internal.processors.security.cache.ScanQueryPermissionCheckTest;
 import org.apache.ignite.internal.processors.security.cache.closure.CacheLoadRemoteSecurityContextCheckTest;
+import org.apache.ignite.internal.processors.security.cache.closure.ContinuousQueryRemoteSecurityContextCheckTest;
+import org.apache.ignite.internal.processors.security.cache.closure.ContinuousQueryWithTransformerRemoteSecurityContextCheckTest;
 import org.apache.ignite.internal.processors.security.cache.closure.EntryProcessorRemoteSecurityContextCheckTest;
 import org.apache.ignite.internal.processors.security.cache.closure.ScanQueryRemoteSecurityContextCheckTest;
 import org.apache.ignite.internal.processors.security.client.ThinClientPermissionCheckTest;
@@ -52,6 +55,7 @@ import org.junit.runners.Suite;
     EntryProcessorPermissionCheckTest.class,
     ComputePermissionCheckTest.class,
     ThinClientPermissionCheckTest.class,
+    ContinuousQueryPermissionCheckTest.class,
 
     DistributedClosureRemoteSecurityContextCheckTest.class,
     ComputeTaskRemoteSecurityContextCheckTest.class,
@@ -61,6 +65,8 @@ import org.junit.runners.Suite;
     EntryProcessorRemoteSecurityContextCheckTest.class,
     DataStreamerRemoteSecurityContextCheckTest.class,
     CacheLoadRemoteSecurityContextCheckTest.class,
+    ContinuousQueryRemoteSecurityContextCheckTest.class,
+    ContinuousQueryWithTransformerRemoteSecurityContextCheckTest.class,
 
     InvalidServerTest.class,