You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by av...@apache.org on 2020/01/17 05:42:01 UTC
[ignite] branch master updated: IGNITE-12342 Continuous Queries:
Remote filter and transformer have to run with appropriate SecurityContext
(#7125)
This is an automated email from the ASF dual-hosted git repository.
av pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new 0a8b250 IGNITE-12342 Continuous Queries: Remote filter and transformer have to run with appropriate SecurityContext (#7125)
0a8b250 is described below
commit 0a8b2502ceabd590f3b271a2c292e95e934c2f3a
Author: Denis Garus <ga...@gmail.com>
AuthorDate: Fri Jan 17 08:41:39 2020 +0300
IGNITE-12342 Continuous Queries: Remote filter and transformer have to run with appropriate SecurityContext (#7125)
---
.../org/apache/ignite/internal/IgniteFeatures.java | 5 +-
.../internal/cluster/ClusterGroupAdapter.java | 11 +-
.../processors/cache/GridCacheAdapter.java | 8 +-
.../AbstractSecurityAwareExternalizable.java | 68 ++++++++++
.../continuous/CacheContinuousQueryManager.java | 69 ++++++++--
.../query/continuous/SecurityAwareFilter.java | 62 +++++++++
.../continuous/SecurityAwareFilterFactory.java | 52 ++++++++
.../SecurityAwareTransformerFactory.java | 67 ++++++++++
.../GridResourceProxiedIgniteInjector.java | 42 +-----
.../processors/security/SecurityUtils.java | 2 +-
...cheOperationRemoteSecurityContextCheckTest.java | 21 ++-
.../AbstractRemoteSecurityContextCheckTest.java | 122 ++++++++++++------
.../cache/ContinuousQueryPermissionCheckTest.java | 143 +++++++++++++++++++++
...tinuousQueryRemoteSecurityContextCheckTest.java | 125 ++++++++++++++++++
...tinuousQueryRemoteSecurityContextCheckTest.java | 106 +++++++++++++++
...hTransformerRemoteSecurityContextCheckTest.java | 118 +++++++++++++++++
...tryProcessorRemoteSecurityContextCheckTest.java | 2 +-
.../ScanQueryRemoteSecurityContextCheckTest.java | 2 +-
...teTaskCancelRemoteSecurityContextCheckTest.java | 1 -
...DataStreamerRemoteSecurityContextCheckTest.java | 2 +-
.../ignite/testsuites/SecurityTestSuite.java | 6 +
21 files changed, 920 insertions(+), 114 deletions(-)
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteFeatures.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteFeatures.java
index 2266c58..956d72f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteFeatures.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteFeatures.java
@@ -80,7 +80,10 @@ public enum IgniteFeatures {
PME_FREE_SWITCH(19),
/** Master key change. See {@link GridEncryptionManager#changeMasterKey(String)}. */
- MASTER_KEY_CHANGE(20);
+ MASTER_KEY_CHANGE(20),
+
+ /** ContinuousQuery with security subject id support. */
+ CONT_QRY_SECURITY_AWARE(21);
/**
* Unique feature identifier.
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/cluster/ClusterGroupAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/cluster/ClusterGroupAdapter.java
index 7579ea2..3d4b71f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/cluster/ClusterGroupAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/cluster/ClusterGroupAdapter.java
@@ -44,6 +44,7 @@ import org.apache.ignite.internal.ClusterMetricsSnapshot;
import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.IgniteComputeImpl;
import org.apache.ignite.internal.IgniteEventsImpl;
+import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.IgniteKernal;
import org.apache.ignite.internal.IgniteMessagingImpl;
import org.apache.ignite.internal.IgniteNodeAttributes;
@@ -769,7 +770,8 @@ public class ClusterGroupAdapter implements ClusterGroupEx, Externalizable {
private boolean clients;
/** Injected Ignite instance. */
- private transient IgniteKernal ignite;
+ @IgniteInstanceResource
+ private transient IgniteEx ignite;
/**
* @param cacheName Cache name.
@@ -800,13 +802,6 @@ public class ClusterGroupAdapter implements ClusterGroupEx, Externalizable {
return false;
}
-
- /** */
- @IgniteInstanceResource
- private void ignite(Ignite ignite) {
- if (ignite != null)
- this.ignite = ignite instanceof IgniteKernal ? (IgniteKernal)ignite : IgnitionEx.gridx(ignite.name());
- }
}
/**
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
index abbbd57..f660c6b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
@@ -6688,6 +6688,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
protected ComputeJobContext jobCtx;
/** Injected grid instance. */
+ @IgniteInstanceResource
protected IgniteEx ignite;
/** Affinity topology version. */
@@ -6755,13 +6756,6 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
return true;
}
-
- /** */
- @IgniteInstanceResource
- private void ignite(Ignite ignite) {
- if (ignite != null)
- this.ignite = ignite instanceof IgniteEx ? (IgniteEx)ignite : IgnitionEx.gridx(ignite.name());
- }
}
/**
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/AbstractSecurityAwareExternalizable.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/AbstractSecurityAwareExternalizable.java
new file mode 100644
index 0000000..cc1fed0
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/AbstractSecurityAwareExternalizable.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.query.continuous;
+
+import java.io.Externalizable;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import java.util.UUID;
+import org.apache.ignite.internal.util.typedef.internal.U;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * Abstract security aware Externalizable.
+ */
+public abstract class AbstractSecurityAwareExternalizable<T> implements Externalizable {
+ /** Security subject id. */
+ protected UUID subjectId;
+
+ /** Original component. */
+ protected T original;
+
+ /**
+ * Default constructor.
+ */
+ protected AbstractSecurityAwareExternalizable() {
+ // No-op.
+ }
+
+ /**
+ * @param subjectId Security subject id.
+ * @param original Original component.
+ */
+ protected AbstractSecurityAwareExternalizable(UUID subjectId, T original) {
+ this.subjectId = requireNonNull(subjectId, "Parameter 'subjectId' cannot be null.");
+ this.original = requireNonNull(original, "Parameter 'original' cannot be null.");
+ }
+
+ /** {@inheritDoc} */
+ @Override public void writeExternal(ObjectOutput out) throws IOException {
+ U.writeUuid(out, subjectId);
+
+ out.writeObject(original);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+ subjectId = U.readUuid(in);
+
+ original = (T)in.readObject();
+ }
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
index d2fecbf..ecb32ea 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
@@ -33,6 +33,7 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.BiFunction;
import javax.cache.Cache;
import javax.cache.configuration.CacheEntryListenerConfiguration;
import javax.cache.configuration.Factory;
@@ -86,11 +87,13 @@ import static javax.cache.event.EventType.REMOVED;
import static javax.cache.event.EventType.UPDATED;
import static org.apache.ignite.events.EventType.EVT_CACHE_QUERY_OBJECT_READ;
import static org.apache.ignite.internal.GridTopic.TOPIC_CACHE;
+import static org.apache.ignite.internal.IgniteFeatures.CONT_QRY_SECURITY_AWARE;
+import static org.apache.ignite.internal.IgniteFeatures.allNodesSupports;
/**
* Continuous queries manager.
*/
-public class CacheContinuousQueryManager extends GridCacheManagerAdapter {
+public class CacheContinuousQueryManager<K, V> extends GridCacheManagerAdapter<K, V> {
/** */
private static final byte CREATED_FLAG = 0b0001;
@@ -514,9 +517,9 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter {
*/
public UUID executeQuery(@Nullable final CacheEntryUpdatedListener locLsnr,
@Nullable final EventListener locTransLsnr,
- @Nullable final CacheEntryEventSerializableFilter rmtFilter,
- @Nullable final Factory<? extends CacheEntryEventFilter> rmtFilterFactory,
- @Nullable final Factory<? extends IgniteClosure> rmtTransFactory,
+ @Nullable final CacheEntryEventSerializableFilter<K, V> rmtFilter,
+ @Nullable final Factory<CacheEntryEventFilter<K, V>> rmtFilterFactory,
+ @Nullable final Factory<IgniteClosure<K, V>> rmtTransFactory,
int bufSize,
long timeInterval,
boolean autoUnsubscribe,
@@ -533,8 +536,8 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter {
cctx.name(),
TOPIC_CACHE.topic(topicPrefix, cctx.localNodeId(), seq.getAndIncrement()),
locTransLsnr,
- rmtFilterFactory,
- rmtTransFactory,
+ securityAwareFilterFactory(rmtFilterFactory),
+ securityAwareTransformerFactory(rmtTransFactory),
true,
false,
!includeExpired,
@@ -549,7 +552,7 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter {
cctx.name(),
TOPIC_CACHE.topic(topicPrefix, cctx.localNodeId(), seq.getAndIncrement()),
locLsnr,
- rmtFilterFactory,
+ securityAwareFilterFactory(rmtFilterFactory),
true,
false,
!includeExpired,
@@ -567,7 +570,7 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter {
cctx.name(),
TOPIC_CACHE.topic(topicPrefix, cctx.localNodeId(), seq.getAndIncrement()),
locLsnr,
- rmtFilter,
+ securityAwareFilter(rmtFilter),
true,
false,
!includeExpired,
@@ -852,6 +855,50 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter {
}
/**
+ * @param factory Original factory.
+ * @return Security aware factory.
+ */
+ private Factory<IgniteClosure<K, V>> securityAwareTransformerFactory(Factory<IgniteClosure<K, V>> factory) {
+ return securityAwareComponent(factory, SecurityAwareTransformerFactory::new);
+ }
+
+ /**
+ * @param factory Original factory.
+ * @return Security aware factory.
+ */
+ private Factory<CacheEntryEventFilter<K, V>> securityAwareFilterFactory(Factory<CacheEntryEventFilter<K, V>> factory) {
+ return securityAwareComponent(factory, SecurityAwareFilterFactory::new);
+ }
+
+ /**
+ * @param filter Original filter.
+ * @return Security aware filter.
+ */
+ private CacheEntryEventSerializableFilter<K, V> securityAwareFilter(CacheEntryEventSerializableFilter<K, V> filter) {
+ return securityAwareComponent(filter, SecurityAwareFilter::new);
+ }
+
+ /**
+ * @param component Original component.
+ * @param f Function that converts the original component to a security aware component.
+ * @return Security aware component.
+ */
+ private <T> T securityAwareComponent(T component, BiFunction<UUID, T, T> f) {
+ if (component == null)
+ return null;
+
+ GridKernalContext ctx = cctx.kernalContext();
+
+ if (ctx.security().enabled() && allNodesSupports(ctx.discovery().allNodes(), CONT_QRY_SECURITY_AWARE)) {
+ final UUID subjectId = ctx.security().securityContext().subject().id();
+
+ return f.apply(subjectId, component);
+ }
+
+ return component;
+ }
+
+ /**
* @param keepBinary Keep binary flag.
* @param filter Filter.
* @return Iterable for events created for existing cache entries.
@@ -1057,14 +1104,14 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter {
new IgniteOutClosure<CacheContinuousQueryHandler>() {
@Override public CacheContinuousQueryHandler apply() {
CacheContinuousQueryHandler hnd;
- Factory<CacheEntryEventFilter> rmtFilterFactory = cfg.getCacheEntryEventFilterFactory();
+ Factory<CacheEntryEventFilter<K, V>> rmtFilterFactory = cfg.getCacheEntryEventFilterFactory();
if (rmtFilterFactory != null)
hnd = new CacheContinuousQueryHandlerV2(
cctx.name(),
TOPIC_CACHE.topic(topicPrefix, cctx.localNodeId(), seq.getAndIncrement()),
locLsnr,
- rmtFilterFactory,
+ securityAwareFilterFactory(rmtFilterFactory),
cfg.isOldValueRequired(),
cfg.isSynchronous(),
false,
@@ -1090,7 +1137,7 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter {
cctx.name(),
TOPIC_CACHE.topic(topicPrefix, cctx.localNodeId(), seq.getAndIncrement()),
locLsnr,
- jCacheFilter,
+ securityAwareFilter(jCacheFilter),
cfg.isOldValueRequired(),
cfg.isSynchronous(),
false,
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/SecurityAwareFilter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/SecurityAwareFilter.java
new file mode 100644
index 0000000..fe7cca4
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/SecurityAwareFilter.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.query.continuous;
+
+import java.util.UUID;
+import javax.cache.event.CacheEntryEvent;
+import javax.cache.event.CacheEntryEventFilter;
+import javax.cache.event.CacheEntryListenerException;
+import org.apache.ignite.cache.CacheEntryEventSerializableFilter;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.processors.security.OperationSecurityContext;
+import org.apache.ignite.resources.IgniteInstanceResource;
+
+/**
+ * Security aware remote filter.
+ */
+public class SecurityAwareFilter<K, V> extends AbstractSecurityAwareExternalizable<CacheEntryEventFilter<K, V>>
+ implements CacheEntryEventSerializableFilter<K, V> {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** Ignite. */
+ @IgniteInstanceResource
+ private transient IgniteEx ignite;
+
+ /**
+ * Default constructor.
+ */
+ public SecurityAwareFilter() {
+ // No-op.
+ }
+
+ /**
+ * @param subjectId Security subject id.
+ * @param original Original filter.
+ */
+ public SecurityAwareFilter(UUID subjectId, CacheEntryEventFilter<K, V> original) {
+ super(subjectId, original);
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean evaluate(CacheEntryEvent<? extends K, ? extends V> evt) throws CacheEntryListenerException {
+ try (OperationSecurityContext c = ignite.context().security().withContext(subjectId)) {
+ return original.evaluate(evt);
+ }
+ }
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/SecurityAwareFilterFactory.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/SecurityAwareFilterFactory.java
new file mode 100644
index 0000000..0460e98
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/SecurityAwareFilterFactory.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.query.continuous;
+
+import java.util.UUID;
+import javax.cache.configuration.Factory;
+import javax.cache.event.CacheEntryEventFilter;
+
+/**
+ * Security aware remote filter factory.
+ */
+public class SecurityAwareFilterFactory<K, V> extends
+ AbstractSecurityAwareExternalizable<Factory<CacheEntryEventFilter<K, V>>> implements
+ Factory<CacheEntryEventFilter<K, V>> {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /**
+ * Default constructor.
+ */
+ public SecurityAwareFilterFactory() {
+ // No-op.
+ }
+
+ /**
+ * @param subjectId Security subject id.
+ * @param original Original factory.
+ */
+ public SecurityAwareFilterFactory(UUID subjectId, Factory<CacheEntryEventFilter<K, V>> original) {
+ super(subjectId, original);
+ }
+
+ /** {@inheritDoc} */
+ @Override public CacheEntryEventFilter<K, V> create() {
+ return new SecurityAwareFilter<>(subjectId, original.create());
+ }
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/SecurityAwareTransformerFactory.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/SecurityAwareTransformerFactory.java
new file mode 100644
index 0000000..b765464
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/SecurityAwareTransformerFactory.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.query.continuous;
+
+import java.util.UUID;
+import javax.cache.configuration.Factory;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.processors.security.OperationSecurityContext;
+import org.apache.ignite.lang.IgniteClosure;
+import org.apache.ignite.resources.IgniteInstanceResource;
+
+/**
+ * Security aware transformer factory.
+ */
+public class SecurityAwareTransformerFactory<E, R> extends
+ AbstractSecurityAwareExternalizable<Factory<IgniteClosure<E, R>>> implements Factory<IgniteClosure<E, R>> {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /**
+ * Default constructor.
+ */
+ public SecurityAwareTransformerFactory() {
+ // No-op.
+ }
+
+ /**
+ * @param subjectId Security subject id.
+ * @param original Original factory.
+ */
+ public SecurityAwareTransformerFactory(UUID subjectId, Factory<IgniteClosure<E, R>> original) {
+ super(subjectId, original);
+ }
+
+ /** {@inheritDoc} */
+ @Override public IgniteClosure<E, R> create() {
+ final IgniteClosure<E, R> cl = original.create();
+
+ return new IgniteClosure<E, R>() {
+ /** Ignite. */
+ @IgniteInstanceResource
+ private IgniteEx ignite;
+
+ /** {@inheritDoc} */
+ @Override public R apply(E e) {
+ try (OperationSecurityContext c = ignite.context().security().withContext(subjectId)) {
+ return cl.apply(e);
+ }
+ }
+ };
+ }
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/resource/GridResourceProxiedIgniteInjector.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/resource/GridResourceProxiedIgniteInjector.java
index de9dc45..1a19de0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/resource/GridResourceProxiedIgniteInjector.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/resource/GridResourceProxiedIgniteInjector.java
@@ -17,39 +17,16 @@
package org.apache.ignite.internal.processors.resource;
-import java.util.concurrent.Callable;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.IgniteDataStreamer;
-import org.apache.ignite.compute.ComputeJob;
-import org.apache.ignite.compute.ComputeTask;
+import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.managers.deployment.GridDeployment;
-import org.apache.ignite.lang.IgniteBiClosure;
-import org.apache.ignite.lang.IgniteBiPredicate;
-import org.apache.ignite.lang.IgniteCallable;
-import org.apache.ignite.lang.IgniteClosure;
-import org.apache.ignite.lang.IgnitePredicate;
-import org.apache.ignite.lang.IgniteRunnable;
+import static org.apache.ignite.internal.processors.security.SecurityUtils.isSystemType;
import static org.apache.ignite.internal.processors.security.sandbox.SandboxIgniteComponentProxy.proxy;
/** Ignite instance injector. */
public class GridResourceProxiedIgniteInjector extends GridResourceBasicInjector<Ignite> {
- /** Array of classes that should get a proxied instance of Ignite. */
- private static final Class[] PROXIED_CLASSES = new Class[]{
- Runnable.class,
- IgniteRunnable.class,
- Callable.class,
- IgniteCallable.class,
- ComputeTask.class,
- ComputeJob.class,
- IgniteClosure.class,
- IgniteBiClosure.class,
- IgniteDataStreamer.class,
- IgnitePredicate.class,
- IgniteBiPredicate.class,
- };
-
/**
* @param rsrc Resource.
*/
@@ -59,19 +36,8 @@ public class GridResourceProxiedIgniteInjector extends GridResourceBasicInjector
/** */
private Ignite ignite(Object target) {
- return shouldUseProxy(target) ? proxy(Ignite.class, getResource()) : getResource();
- }
-
- /**
- * @return True if {@code target} should get a proxy instance of Ignite.
- */
- private boolean shouldUseProxy(Object target){
- for (Class cls : PROXIED_CLASSES) {
- if (cls.isInstance(target))
- return true;
- }
-
- return false;
+ return !isSystemType(((IgniteEx)getResource()).context(), target)
+ ? proxy(Ignite.class, getResource()) : getResource();
}
/** {@inheritDoc} */
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/security/SecurityUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/security/SecurityUtils.java
index d2b1eaf..298eddb 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/security/SecurityUtils.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/security/SecurityUtils.java
@@ -170,7 +170,7 @@ public class SecurityUtils {
/**
* @return True if class of {@code target} is a system type.
*/
- private static boolean isSystemType(GridKernalContext ctx, Object target) {
+ public static boolean isSystemType(GridKernalContext ctx, Object target) {
Class cls = target instanceof GridInternalWrapper
? ((GridInternalWrapper)target).userObject().getClass()
: target.getClass();
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/security/AbstractCacheOperationRemoteSecurityContextCheckTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/security/AbstractCacheOperationRemoteSecurityContextCheckTest.java
index f7380c2..97ebcc0 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/security/AbstractCacheOperationRemoteSecurityContextCheckTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/security/AbstractCacheOperationRemoteSecurityContextCheckTest.java
@@ -48,13 +48,14 @@ public abstract class AbstractCacheOperationRemoteSecurityContextCheckTest exten
}
/**
- * Getting the key that is contained on primary partition on passed node for {@link #CACHE_NAME} cache.
+ * Getting the key that is contained on primary partition on passed node for cache.
*
* @param ignite Node.
+ * @param cacheName Cache name.
* @return Key.
*/
- protected Integer prmKey(IgniteEx ignite) {
- return findKeys(ignite.localNode(), ignite.cache(CACHE_NAME), 1, 0, 0)
+ protected Integer primaryKey(IgniteEx ignite, String cacheName) {
+ return findKeys(ignite.localNode(), ignite.cache(cacheName), 1, 0, 0)
.stream()
.findFirst()
.orElseThrow(() -> new IllegalStateException(ignite.name() + " isn't primary node for any key."));
@@ -63,10 +64,20 @@ public abstract class AbstractCacheOperationRemoteSecurityContextCheckTest exten
/**
* Getting the key that is contained on primary partition on passed node for {@link #CACHE_NAME} cache.
*
+ * @param ignite Node.
+ * @return Key.
+ */
+ protected Integer primaryKey(IgniteEx ignite) {
+ return primaryKey(ignite, CACHE_NAME);
+ }
+
+ /**
+ * Getting the key that is contained on primary partition on passed node for {@link #CACHE_NAME} cache.
+ *
* @param nodeName Node name.
* @return Key.
*/
- protected Integer prmKey(String nodeName) {
- return prmKey((IgniteEx)G.ignite(nodeName));
+ protected Integer primaryKey(String nodeName) {
+ return primaryKey((IgniteEx)G.ignite(nodeName));
}
}
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/security/AbstractRemoteSecurityContextCheckTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/security/AbstractRemoteSecurityContextCheckTest.java
index 9acc07e..40e23b5 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/security/AbstractRemoteSecurityContextCheckTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/security/AbstractRemoteSecurityContextCheckTest.java
@@ -17,15 +17,15 @@
package org.apache.ignite.internal.processors.security;
-import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.UUID;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Stream;
import javax.cache.Cache;
import javax.cache.processor.EntryProcessor;
@@ -130,7 +130,7 @@ public abstract class AbstractRemoteSecurityContextCheckTest extends AbstractSec
*/
protected void runAndCheck(IgniteEx initiator, Stream<IgniteRunnable> ops) {
ops.forEach(r -> {
- VERIFIER.clear().initiator(initiator);
+ VERIFIER.initiator(initiator);
setupVerifier(VERIFIER);
@@ -147,12 +147,12 @@ public abstract class AbstractRemoteSecurityContextCheckTest extends AbstractSec
/**
* Map that contains an expected behaviour.
*/
- private final Map<String, T2<Integer, Integer>> expInvokes = new HashMap<>();
+ private final Map<T2<String, String>, T2<Integer, AtomicInteger>> expInvokes = new ConcurrentHashMap<>();
/**
- * List of registered security subjects.
+ * Checked errors.
*/
- private final List<T2<UUID, String>> registeredSubjects = new ArrayList<>();
+ private final Collection<String> errors = new ArrayBlockingQueue<>(10);
/**
* Expected security subject id.
@@ -160,24 +160,35 @@ public abstract class AbstractRemoteSecurityContextCheckTest extends AbstractSec
private UUID expSecSubjId;
/** */
- public Verifier clear() {
- registeredSubjects.clear();
+ private void clear() {
expInvokes.clear();
- expSecSubjId = null;
+ errors.clear();
- return this;
+ expSecSubjId = null;
}
/**
- * Adds expected behaivior the method {@link #register} will be invoke exp times on the node with
- * passed name.
+ * Adds expected behaivior the method {@link #register} will be invoke expected times on the node with passed
+ * name.
*
* @param nodeName Node name.
* @param num Expected number of invokes.
*/
public Verifier expect(String nodeName, int num) {
- expInvokes.put(nodeName, new T2<>(num, 0));
+ return expect(nodeName, null, num);
+ }
+
+ /**
+ * Adds expected behaivior the method {@link #register} will be invoke expected times on the node with passed
+ * name and the passed operation name.
+ *
+ * @param nodeName Node name.
+ * @param opName Operation name.
+ * @param num Expected number of invokes.
+ */
+ public Verifier expect(String nodeName, String opName, int num) {
+ expInvokes.put(new T2<>(nodeName, opName), new T2<>(num, new AtomicInteger()));
return this;
}
@@ -186,49 +197,75 @@ public abstract class AbstractRemoteSecurityContextCheckTest extends AbstractSec
* Registers a security subject referred for {@code localIgnite} and increments invoke counter.
*/
public void register() {
- register((IgniteEx)localIgnite());
+ register((IgniteEx)localIgnite(), null);
+ }
+
+ /**
+ * Registers a security subject referred for {@code localIgnite} with the passed operation name and increments
+ * invoke counter.
+ *
+ * @param opName Operation name.
+ */
+ public void register(String opName) {
+ register((IgniteEx)localIgnite(), opName);
}
/**
* Registers a security subject referred for the passed {@code ignite} and increments invoke counter.
+ *
+ * @param ignite Instance of ignite.
*/
- public synchronized void register(IgniteEx ignite) {
- registeredSubjects.add(new T2<>(secSubjectId(ignite), ignite.name()));
+ public void register(IgniteEx ignite) {
+ register(ignite, null);
+ }
- expInvokes.computeIfPresent(ignite.name(), (name, t2) -> {
- Integer val = t2.getValue();
+ /**
+ * Registers a security subject referred for the passed {@code ignite} with the passed operation name and
+ * increments invoke counter.
+ *
+ * @param ignite Instance of ignite.
+ * @param opName Operation name.
+ */
+ public void register(IgniteEx ignite, String opName) {
+ if (expSecSubjId == null) {
+ error("SubjectId cannot be null.");
- t2.setValue(++val);
+ return;
+ }
+
+ UUID actualSubjId = secSubjectId(ignite);
+
+ if (!expSecSubjId.equals(actualSubjId)) {
+ error("Actual subjectId does not equal expected subjectId " + "[expected=" + expSecSubjId +
+ ", actual=" + actualSubjId + "].");
+
+ return;
+ }
- return t2;
- });
+ T2<Integer, AtomicInteger> v = expInvokes.get(new T2<>(ignite.name(), opName));
+
+ if (v != null)
+ v.get2().incrementAndGet();
+ else
+ error("Unexpected registration parameters [node=" + ignite.name() + ", opName=" + opName + "].");
}
/**
* Checks result of test and clears expected behavior.
*/
public void checkResult() {
- registeredSubjects.forEach(t ->
- assertEquals("Invalide security context on node " + t.get2(),
- expSecSubjId, t.get1())
- );
-
- expInvokes.forEach((key, value) ->
- assertEquals("Node " + key + ". Execution of register: ",
- value.get1(), value.get2()));
+ if(!errors.isEmpty())
+ throw new AssertionError(errors.stream().reduce((s1, s2) -> s1 + "\n" + s2).get());
- clear();
- }
-
- /** */
- private Verifier expectSubjId(UUID expSecSubjId) {
- this.expSecSubjId = expSecSubjId;
-
- return this;
+ expInvokes.forEach((k, v) -> assertEquals("Node \"" + k.get1() + '\"' +
+ (k.get2() != null ? ", operation \"" + k.get2() + '\"' : "") +
+ ". Execution of register: ", v.get1(), Integer.valueOf(v.get2().get())));
}
/** */
public Verifier initiator(IgniteEx initiator) {
+ clear();
+
expSecSubjId = secSubjectId(initiator);
return this;
@@ -238,6 +275,13 @@ public abstract class AbstractRemoteSecurityContextCheckTest extends AbstractSec
private UUID secSubjectId(IgniteEx node) {
return node.context().security().securityContext().subject().id();
}
+
+ /**
+ * @param msg Error message.
+ */
+ private void error(String msg) {
+ errors.add(msg);
+ }
}
/** */
@@ -341,7 +385,7 @@ public abstract class AbstractRemoteSecurityContextCheckTest extends AbstractSec
run();
if (k instanceof Cache.Entry)
- return (V) ((Cache.Entry)k).getValue();
+ return (V)((Cache.Entry)k).getValue();
return null;
}
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/security/cache/ContinuousQueryPermissionCheckTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/security/cache/ContinuousQueryPermissionCheckTest.java
new file mode 100644
index 0000000..0292c6c
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/security/cache/ContinuousQueryPermissionCheckTest.java
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.security.cache;
+
+import java.util.function.Consumer;
+import javax.cache.Cache;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.cache.query.ContinuousQuery;
+import org.apache.ignite.cache.query.ContinuousQueryWithTransformer;
+import org.apache.ignite.cache.query.QueryCursor;
+import org.apache.ignite.cluster.ClusterState;
+import org.apache.ignite.internal.processors.security.AbstractCacheOperationPermissionCheckTest;
+import org.apache.ignite.plugin.security.SecurityException;
+import org.apache.ignite.plugin.security.SecurityPermissionSet;
+import org.apache.ignite.plugin.security.SecurityPermissionSetBuilder;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+import static org.apache.ignite.plugin.security.SecurityPermission.CACHE_READ;
+import static org.apache.ignite.testframework.GridTestUtils.assertThrowsWithCause;
+
+/**
+ * Tests check a cache permission for continuous queries.
+ */
+@RunWith(JUnit4.class)
+public class ContinuousQueryPermissionCheckTest extends AbstractCacheOperationPermissionCheckTest {
+ /** Test server node name. */
+ private static final String SRV = "srv_test_node";
+
+ /** Test client node name. */
+ private static final String CLNT = "cnt_test_node";
+
+ /** {@inheritDoc} */
+ @Override protected void beforeTestsStarted() throws Exception {
+ Ignite srv = startGridAllowAll("server");
+
+ startGrid(SRV, permissions(), true);
+
+ startGrid(CLNT, permissions(), true);
+
+ srv.cluster().state(ClusterState.ACTIVE);
+ }
+
+ /**
+ * Tests {@link ContinuousQuery} from a client.
+ */
+ @Test
+ public void testClientContinuousQuery() {
+ check(n -> startContinuousQuery(grid(CLNT), n));
+ }
+
+ /**
+ * Tests {@link ContinuousQuery} from a server.
+ */
+ @Test
+ public void testServerContinuousQuery() {
+ check(n -> startContinuousQuery(grid(SRV), n));
+ }
+
+ /**
+ * Tests {@link ContinuousQueryWithTransformer} from a client.
+ */
+ @Test
+ public void testClientContinuousQueryWithTransformer() {
+ check(n -> startContinuousQueryWithTransformer(grid(CLNT), n));
+ }
+
+ /**
+ * Tests {@link ContinuousQueryWithTransformer} from a server.
+ */
+ @Test
+ public void testServerContinuousQueryWithTransformer() {
+ check(n -> startContinuousQueryWithTransformer(grid(SRV), n));
+ }
+
+ /**
+ * @param c Consumer that accepts a cache name.
+ */
+ private void check(final Consumer<String> c) {
+ c.accept(CACHE_NAME);
+
+ assertThrowsWithCause(() -> c.accept(FORBIDDEN_CACHE), SecurityException.class);
+ }
+
+ /**
+ * Starts {@link ContinuousQuery}.
+ *
+ * @param node Ignie node.
+ * @param cacheName Cache name.
+ */
+ private void startContinuousQuery(Ignite node, String cacheName) {
+ ContinuousQuery<String, Integer> q = new ContinuousQuery<>();
+
+ q.setLocalListener(e -> {/* No-op. */});
+
+ try (QueryCursor<Cache.Entry<String, Integer>> cur = node.cache(cacheName).query(q)) {
+ // No-op.
+ }
+ }
+
+ /**
+ * Starts {@link ContinuousQueryWithTransformer}.
+ *
+ * @param node Ignite node.
+ * @param cacheName Cache name.
+ */
+ private void startContinuousQueryWithTransformer(Ignite node, String cacheName) {
+ ContinuousQueryWithTransformer<String, Integer, String> q = new ContinuousQueryWithTransformer<>();
+
+ q.setLocalListener(e -> {/* No-op. */});
+
+ q.setRemoteTransformerFactory(() -> e -> "value");
+
+ try (QueryCursor<Cache.Entry<String, Integer>> cur = node.cache(cacheName).query(q)) {
+ // No-op.
+ }
+ }
+
+ /**
+ * @return Subject Ignite permissions.
+ */
+ private SecurityPermissionSet permissions() {
+ return SecurityPermissionSetBuilder.create()
+ .appendCachePermissions(CACHE_NAME, CACHE_READ)
+ .appendCachePermissions(FORBIDDEN_CACHE, EMPTY_PERMS).build();
+ }
+}
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/security/cache/closure/AbstractContinuousQueryRemoteSecurityContextCheckTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/security/cache/closure/AbstractContinuousQueryRemoteSecurityContextCheckTest.java
new file mode 100644
index 0000000..090cb4b
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/security/cache/closure/AbstractContinuousQueryRemoteSecurityContextCheckTest.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.security.cache.closure;
+
+import java.util.concurrent.atomic.AtomicInteger;
+import javax.cache.Cache;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.cache.CacheEntryEventSerializableFilter;
+import org.apache.ignite.cache.CacheMode;
+import org.apache.ignite.cache.query.Query;
+import org.apache.ignite.cache.query.QueryCursor;
+import org.apache.ignite.cluster.ClusterState;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.processors.security.AbstractCacheOperationRemoteSecurityContextCheckTest;
+import org.apache.ignite.internal.processors.security.SecurityContext;
+import org.apache.ignite.lang.IgniteBiPredicate;
+
+import static org.apache.ignite.Ignition.localIgnite;
+
+/**
+ * The base class for tests that check continuous queries {@link SecurityContext} on a remote node.
+ */
+public class AbstractContinuousQueryRemoteSecurityContextCheckTest extends
+ AbstractCacheOperationRemoteSecurityContextCheckTest {
+ /** Server node to change cache state. */
+ private static final String SRV = "srv";
+
+ /** Cache index. */
+ private static final AtomicInteger CACHE_INDEX = new AtomicInteger();
+
+ /** Open continuous query operation. */
+ protected static final String OPERATION_OPEN_CQ = "open_cq";
+
+ /** Init query, filter or transform operation. */
+ protected static final String OPERATION_CQ_COMPONENT = "cq_component";
+
+ /** Preidacte for inital query tests. */
+ protected static final IgniteBiPredicate<Integer, Integer> INITIAL_QUERY_FILTER = (k, v) -> {
+ VERIFIER.register(OPERATION_CQ_COMPONENT);
+
+ return true;
+ };
+
+ /** Remote filter. */
+ protected static final CacheEntryEventSerializableFilter<Integer, Integer> RMT_FILTER = e -> {
+ VERIFIER.register(OPERATION_CQ_COMPONENT);
+
+ return true;
+ };
+
+ /** {@inheritDoc} */
+ @Override protected CacheConfiguration[] getCacheConfigurations() {
+ return new CacheConfiguration[] {};
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void beforeTestsStarted() throws Exception {
+ IgniteEx srv = startGridAllowAll(SRV);
+
+ startGridAllowAll(SRV_INITIATOR);
+
+ startClientAllowAll(CLNT_INITIATOR);
+
+ startGridAllowAll(SRV_RUN);
+
+ startClientAllowAll(CLNT_RUN);
+
+ startGridAllowAll(SRV_CHECK);
+
+ srv.cluster().state(ClusterState.ACTIVE);
+
+ awaitPartitionMapExchange();
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void setupVerifier(Verifier verifier) {
+ verifier
+ .expect(SRV_RUN, OPERATION_OPEN_CQ, 1)
+ .expect(CLNT_RUN, OPERATION_OPEN_CQ, 1)
+ .expect(SRV_CHECK, OPERATION_CQ_COMPONENT, 2);
+ }
+
+ /**
+ * Opens query cursor.
+ *
+ * @param q {@link Query}.
+ * @param init True if needing put data to a cache before openning a cursor.
+ */
+ protected void executeQuery(Query<Cache.Entry<Integer, Integer>> q, boolean init) {
+ Ignite ignite = localIgnite();
+
+ IgniteCache<Integer, Integer> cache = ignite.getOrCreateCache(
+ new CacheConfiguration<Integer, Integer>()
+ .setName(CACHE_NAME + CACHE_INDEX.incrementAndGet())
+ .setCacheMode(CacheMode.PARTITIONED)
+ );
+
+ if (init)
+ cache.put(primaryKey(grid(SRV_CHECK), cache.getName()), 100);
+
+ try (QueryCursor<Cache.Entry<Integer, Integer>> cur = cache.query(q)) {
+ if (!init)
+ cache.put(primaryKey(grid(SRV_CHECK), cache.getName()), 100);
+
+ cur.getAll();
+ }
+ }
+}
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/security/cache/closure/ContinuousQueryRemoteSecurityContextCheckTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/security/cache/closure/ContinuousQueryRemoteSecurityContextCheckTest.java
new file mode 100644
index 0000000..767e2ad
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/security/cache/closure/ContinuousQueryRemoteSecurityContextCheckTest.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.security.cache.closure;
+
+import java.util.function.Consumer;
+import org.apache.ignite.cache.query.ContinuousQuery;
+import org.apache.ignite.cache.query.ScanQuery;
+import org.apache.ignite.lang.IgniteRunnable;
+import org.junit.Test;
+
+/**
+ * Tests check appropriate security context when the scan query, remote filter, or
+ * remote filter factory of a {@link ContinuousQuery} is executed on a remote node.
+ * <p>
+ * The initiator node broadcasts a task to 'run' node that starts a {@link ContinuousQuery}'s component. That component
+ * is executed on 'check' node. On every step, it is performed verification that operation securitycontext is the
+ * initiator context.
+ */
+public class ContinuousQueryRemoteSecurityContextCheckTest extends
+ AbstractContinuousQueryRemoteSecurityContextCheckTest {
+ /**
+ * Test initial query of {@link ContinuousQuery}.
+ */
+ @Test
+ public void testInitialQuery() {
+ Consumer<ContinuousQuery<Integer, Integer>> consumer = new Consumer<ContinuousQuery<Integer, Integer>>() {
+ @Override public void accept(ContinuousQuery<Integer, Integer> q) {
+ q.setInitialQuery(new ScanQuery<>(INITIAL_QUERY_FILTER));
+ q.setLocalListener(e -> {/* No-op. */});
+ }
+ };
+
+ runAndCheck(grid(SRV_INITIATOR), operation(consumer, true));
+ runAndCheck(grid(CLNT_INITIATOR), operation(consumer, true));
+ }
+
+ /**
+ * Tests remote filter factory of {@link ContinuousQuery}.
+ */
+ @Test
+ public void testRemoteFilterFactory() {
+ Consumer<ContinuousQuery<Integer, Integer>> consumer = new Consumer<ContinuousQuery<Integer, Integer>>() {
+ @Override public void accept(ContinuousQuery<Integer, Integer> q) {
+ q.setRemoteFilterFactory(() -> RMT_FILTER);
+ }
+ };
+
+ runAndCheck(grid(SRV_INITIATOR), operation(consumer));
+ runAndCheck(grid(CLNT_INITIATOR), operation(consumer));
+ }
+
+ /**
+ * Tests remote filter of {@link ContinuousQuery}.
+ */
+ @Test
+ public void testRemoteFilter() {
+ Consumer<ContinuousQuery<Integer, Integer>> consumer = new Consumer<ContinuousQuery<Integer, Integer>>() {
+ @Override public void accept(ContinuousQuery<Integer, Integer> q) {
+ q.setRemoteFilter(RMT_FILTER);
+ }
+ };
+
+ runAndCheck(grid(SRV_INITIATOR), operation(consumer));
+ runAndCheck(grid(CLNT_INITIATOR), operation(consumer));
+ }
+
+ /**
+ * @param c Consumer that setups a {@link ContinuousQuery}.
+ * @param init True if needing put data to a cache before openning a cursor.
+ * @return Test operation.
+ */
+ private IgniteRunnable operation(Consumer<ContinuousQuery<Integer, Integer>> c, boolean init) {
+ return () -> {
+ VERIFIER.register(OPERATION_OPEN_CQ);
+
+ ContinuousQuery<Integer, Integer> cq = new ContinuousQuery<>();
+
+ c.accept(cq);
+
+ executeQuery(cq, init);
+ };
+ }
+
+ /**
+ * @param c Consumer that setups a {@link ContinuousQuery}.
+ * @return Test operation.
+ */
+ private IgniteRunnable operation(Consumer<ContinuousQuery<Integer, Integer>> c) {
+ return operation(c, false);
+ }
+}
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/security/cache/closure/ContinuousQueryWithTransformerRemoteSecurityContextCheckTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/security/cache/closure/ContinuousQueryWithTransformerRemoteSecurityContextCheckTest.java
new file mode 100644
index 0000000..f9ed65f
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/security/cache/closure/ContinuousQueryWithTransformerRemoteSecurityContextCheckTest.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.security.cache.closure;
+
+import java.util.function.Consumer;
+import javax.cache.Cache;
+import org.apache.ignite.cache.query.ContinuousQueryWithTransformer;
+import org.apache.ignite.cache.query.ScanQuery;
+import org.apache.ignite.lang.IgniteRunnable;
+import org.junit.Test;
+
+/**
+ * Tests check appropriate security context when the scan query, transformer factory}, or remote filter factory of a
+ * {@link ContinuousQueryWithTransformer} is executed on a remote node.
+ * <p>
+ * The initiator node broadcasts a task to 'run' node that starts a {@link ContinuousQueryWithTransformer}'s component.
+ * That component is executed on 'check' node. On every step, it is performed verification that operation
+ * securitycontext is the initiator context.
+ */
+public class ContinuousQueryWithTransformerRemoteSecurityContextCheckTest extends
+ AbstractContinuousQueryRemoteSecurityContextCheckTest {
+ /**
+ * Tests initial query of {@link ContinuousQueryWithTransformer}.
+ */
+ @Test
+ public void testInitialQuery() {
+ Consumer<ContinuousQueryWithTransformer<Integer, Integer, Integer>> consumer =
+ new Consumer<ContinuousQueryWithTransformer<Integer, Integer, Integer>>() {
+ @Override public void accept(ContinuousQueryWithTransformer<Integer, Integer, Integer> q) {
+ q.setInitialQuery(new ScanQuery<>(INITIAL_QUERY_FILTER));
+ q.setRemoteTransformerFactory(() -> Cache.Entry::getValue);
+ q.setLocalListener(e -> {/* No-op. */});
+ }
+ };
+
+ runAndCheck(grid(SRV_INITIATOR), operation(consumer, true));
+ runAndCheck(grid(CLNT_INITIATOR), operation(consumer, true));
+ }
+
+ /**
+ * Tests remote filter factory of {@link ContinuousQueryWithTransformer}.
+ */
+ @Test
+ public void testRemoteFilterFactory() {
+ Consumer<ContinuousQueryWithTransformer<Integer, Integer, Integer>> consumer =
+ new Consumer<ContinuousQueryWithTransformer<Integer, Integer, Integer>>() {
+ @Override public void accept(ContinuousQueryWithTransformer<Integer, Integer, Integer> q) {
+ q.setRemoteFilterFactory(() -> RMT_FILTER);
+ q.setRemoteTransformerFactory(() -> Cache.Entry::getValue);
+ }
+ };
+
+ runAndCheck(grid(SRV_INITIATOR), operation(consumer));
+ runAndCheck(grid(CLNT_INITIATOR), operation(consumer));
+ }
+
+ /**
+ * Tests transformer factory of {@link ContinuousQueryWithTransformer}.
+ */
+ @Test
+ public void testTransformerFactory() {
+ Consumer<ContinuousQueryWithTransformer<Integer, Integer, Integer>> consumer =
+ new Consumer<ContinuousQueryWithTransformer<Integer, Integer, Integer>>() {
+ @Override public void accept(ContinuousQueryWithTransformer<Integer, Integer, Integer> q) {
+ q.setRemoteTransformerFactory(() -> e -> {
+ VERIFIER.register(OPERATION_CQ_COMPONENT);
+
+ return e.getValue();
+ });
+ q.setLocalListener(e -> {/* No-op. */});
+ }
+ };
+
+ runAndCheck(grid(SRV_INITIATOR), operation(consumer));
+ runAndCheck(grid(CLNT_INITIATOR), operation(consumer));
+ }
+
+ /**
+ * @param c Consumer that setups a {@link ContinuousQueryWithTransformer}.
+ * @param init True if needing put data to a cache before openning a cursor.
+ * @return Test operation.
+ */
+ private IgniteRunnable operation(Consumer<ContinuousQueryWithTransformer<Integer, Integer, Integer>> c,
+ boolean init) {
+ return () -> {
+ VERIFIER.register(OPERATION_OPEN_CQ);
+
+ ContinuousQueryWithTransformer<Integer, Integer, Integer> cq = new ContinuousQueryWithTransformer<>();
+
+ c.accept(cq);
+
+ executeQuery(cq, init);
+ };
+ }
+
+ /**
+ * @param c Consumer that setups a {@link ContinuousQueryWithTransformer}.
+ * @return Test operation.
+ */
+ private IgniteRunnable operation(Consumer<ContinuousQueryWithTransformer<Integer, Integer, Integer>> c) {
+ return operation(c, false);
+ }
+}
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/security/cache/closure/EntryProcessorRemoteSecurityContextCheckTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/security/cache/closure/EntryProcessorRemoteSecurityContextCheckTest.java
index 4ca1f60..facd7ec 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/security/cache/closure/EntryProcessorRemoteSecurityContextCheckTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/security/cache/closure/EntryProcessorRemoteSecurityContextCheckTest.java
@@ -87,7 +87,7 @@ public class EntryProcessorRemoteSecurityContextCheckTest extends AbstractCacheO
* @return Stream of runnables to call invoke methods.
*/
private Stream<IgniteRunnable> operations() {
- final Integer key = prmKey(grid(SRV_CHECK));
+ final Integer key = primaryKey(grid(SRV_CHECK));
return Stream.<IgniteRunnable>of(
() -> localIgnite().<Integer, Integer>cache(CACHE_NAME).invoke(key, createRunner()),
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/security/cache/closure/ScanQueryRemoteSecurityContextCheckTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/security/cache/closure/ScanQueryRemoteSecurityContextCheckTest.java
index 27f7116..24035ad 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/security/cache/closure/ScanQueryRemoteSecurityContextCheckTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/security/cache/closure/ScanQueryRemoteSecurityContextCheckTest.java
@@ -73,7 +73,7 @@ public class ScanQueryRemoteSecurityContextCheckTest extends AbstractCacheOperat
@Test
public void test() throws Exception {
grid(SRV_INITIATOR).cache(CACHE_NAME)
- .put(prmKey(grid(SRV_CHECK)), 1);
+ .put(primaryKey(grid(SRV_CHECK)), 1);
awaitPartitionMapExchange();
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/security/compute/closure/ComputeTaskCancelRemoteSecurityContextCheckTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/security/compute/closure/ComputeTaskCancelRemoteSecurityContextCheckTest.java
index 15c342c..c79245d 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/security/compute/closure/ComputeTaskCancelRemoteSecurityContextCheckTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/security/compute/closure/ComputeTaskCancelRemoteSecurityContextCheckTest.java
@@ -126,7 +126,6 @@ public class ComputeTaskCancelRemoteSecurityContextCheckTest extends AbstractRem
/** */
private void checkCancel(IgniteEx initator, IgniteEx rmt, Consumer<IgniteFuture> consumer) throws Exception {
VERIFIER
- .clear()
.initiator(initator)
.expect(rmt.name(), 1);
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/security/datastreamer/closure/DataStreamerRemoteSecurityContextCheckTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/security/datastreamer/closure/DataStreamerRemoteSecurityContextCheckTest.java
index fbb82aa..fa7bb8f 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/security/datastreamer/closure/DataStreamerRemoteSecurityContextCheckTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/security/datastreamer/closure/DataStreamerRemoteSecurityContextCheckTest.java
@@ -74,7 +74,7 @@ public class DataStreamerRemoteSecurityContextCheckTest extends AbstractCacheOpe
try (IgniteDataStreamer<Integer, Integer> strm = Ignition.localIgnite().dataStreamer(CACHE_NAME)) {
strm.receiver(StreamVisitor.from(new ExecRegisterAndForwardAdapter<>(endpoints())));
- strm.addData(prmKey(grid(SRV_CHECK)), 100);
+ strm.addData(primaryKey(grid(SRV_CHECK)), 100);
}
};
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/SecurityTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/SecurityTestSuite.java
index 1d54e75..eb8c88b 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/SecurityTestSuite.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/SecurityTestSuite.java
@@ -19,9 +19,12 @@ package org.apache.ignite.testsuites;
import org.apache.ignite.internal.processors.security.InvalidServerTest;
import org.apache.ignite.internal.processors.security.cache.CacheOperationPermissionCheckTest;
+import org.apache.ignite.internal.processors.security.cache.ContinuousQueryPermissionCheckTest;
import org.apache.ignite.internal.processors.security.cache.EntryProcessorPermissionCheckTest;
import org.apache.ignite.internal.processors.security.cache.ScanQueryPermissionCheckTest;
import org.apache.ignite.internal.processors.security.cache.closure.CacheLoadRemoteSecurityContextCheckTest;
+import org.apache.ignite.internal.processors.security.cache.closure.ContinuousQueryRemoteSecurityContextCheckTest;
+import org.apache.ignite.internal.processors.security.cache.closure.ContinuousQueryWithTransformerRemoteSecurityContextCheckTest;
import org.apache.ignite.internal.processors.security.cache.closure.EntryProcessorRemoteSecurityContextCheckTest;
import org.apache.ignite.internal.processors.security.cache.closure.ScanQueryRemoteSecurityContextCheckTest;
import org.apache.ignite.internal.processors.security.client.ThinClientPermissionCheckTest;
@@ -52,6 +55,7 @@ import org.junit.runners.Suite;
EntryProcessorPermissionCheckTest.class,
ComputePermissionCheckTest.class,
ThinClientPermissionCheckTest.class,
+ ContinuousQueryPermissionCheckTest.class,
DistributedClosureRemoteSecurityContextCheckTest.class,
ComputeTaskRemoteSecurityContextCheckTest.class,
@@ -61,6 +65,8 @@ import org.junit.runners.Suite;
EntryProcessorRemoteSecurityContextCheckTest.class,
DataStreamerRemoteSecurityContextCheckTest.class,
CacheLoadRemoteSecurityContextCheckTest.class,
+ ContinuousQueryRemoteSecurityContextCheckTest.class,
+ ContinuousQueryWithTransformerRemoteSecurityContextCheckTest.class,
InvalidServerTest.class,