You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by nt...@apache.org on 2016/03/01 12:09:41 UTC
[15/20] ignite git commit: IGNITE-1186 WIP
IGNITE-1186 WIP
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/9ad476b2
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/9ad476b2
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/9ad476b2
Branch: refs/heads/ignite-1186
Commit: 9ad476b21a1f6f9d52d8efcaad346a4ac85ea73b
Parents: e80e906
Author: nikolay_tikhonov <nt...@gridgain.com>
Authored: Mon Feb 29 20:56:35 2016 +0300
Committer: nikolay_tikhonov <nt...@gridgain.com>
Committed: Mon Feb 29 20:56:35 2016 +0300
----------------------------------------------------------------------
.../ignite/cache/query/ContinuousQuery.java | 10 +-
.../internal/GridEventConsumeHandler.java | 6 +
.../internal/GridMessageListenHandler.java | 6 +
.../processors/cache/IgniteCacheProxy.java | 13 +-
.../continuous/CacheContinuousQueryHandler.java | 9 +-
.../CacheContinuousQueryHandlerV2.java | 20 +-
.../continuous/CacheContinuousQueryManager.java | 263 ++++-
.../continuous/GridContinuousHandler.java | 6 +
.../IgniteCacheEntryListenerAbstractTest.java | 66 +-
.../CacheContinuousQueryFactoryFilterTest.java | 1002 +++---------------
.../CacheContinuousQueryOperationP2PTest.java | 394 +++++++
...acheContinuousQueryRandomOperationsTest.java | 10 +-
.../p2p/CacheDeploymentEntryEventFilter.java | 33 +
.../CacheDeploymentEntryEventFilterFactory.java | 31 +
14 files changed, 924 insertions(+), 945 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/9ad476b2/modules/core/src/main/java/org/apache/ignite/cache/query/ContinuousQuery.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/cache/query/ContinuousQuery.java b/modules/core/src/main/java/org/apache/ignite/cache/query/ContinuousQuery.java
index 452735e..3ea8f93 100644
--- a/modules/core/src/main/java/org/apache/ignite/cache/query/ContinuousQuery.java
+++ b/modules/core/src/main/java/org/apache/ignite/cache/query/ContinuousQuery.java
@@ -19,6 +19,7 @@ package org.apache.ignite.cache.query;
import javax.cache.Cache;
import javax.cache.configuration.Factory;
+import javax.cache.event.CacheEntryEventFilter;
import javax.cache.event.CacheEntryUpdatedListener;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.cache.CacheEntryEventSerializableFilter;
@@ -121,7 +122,7 @@ public final class ContinuousQuery<K, V> extends Query<Cache.Entry<K, V>> {
private CacheEntryEventSerializableFilter<K, V> rmtFilter;
/** Remote filter factory. */
- private Factory<? extends CacheEntryEventSerializableFilter<K, V>> rmtFilterFactory;
+ private Factory<? extends CacheEntryEventFilter<K, V>> rmtFilterFactory;
/** Time interval. */
private long timeInterval = DFLT_TIME_INTERVAL;
@@ -200,7 +201,10 @@ public final class ContinuousQuery<K, V> extends Query<Cache.Entry<K, V>> {
*
* @param rmtFilter Key-value filter.
* @return {@code this} for chaining.
+ *
+ * @deprecated Use {@link #setRemoteFilterFactory(Factory)} instead.
*/
+ @Deprecated
public ContinuousQuery<K, V> setRemoteFilter(CacheEntryEventSerializableFilter<K, V> rmtFilter) {
this.rmtFilter = rmtFilter;
@@ -228,7 +232,7 @@ public final class ContinuousQuery<K, V> extends Query<Cache.Entry<K, V>> {
* @return {@code this} for chaining.
*/
public ContinuousQuery<K, V> setRemoteFilterFactory(
- Factory<? extends CacheEntryEventSerializableFilter<K, V>> rmtFilterFactory) {
+ Factory<? extends CacheEntryEventFilter<K, V>> rmtFilterFactory) {
this.rmtFilterFactory = rmtFilterFactory;
return this;
@@ -239,7 +243,7 @@ public final class ContinuousQuery<K, V> extends Query<Cache.Entry<K, V>> {
*
* @return Remote filter.
*/
- public Factory<? extends CacheEntryEventSerializableFilter<K, V>> getRemoteFilterFactory() {
+ public Factory<? extends CacheEntryEventFilter<K, V>> getRemoteFilterFactory() {
return rmtFilterFactory;
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/9ad476b2/modules/core/src/main/java/org/apache/ignite/internal/GridEventConsumeHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridEventConsumeHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/GridEventConsumeHandler.java
index e2b1184..924a8ea 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/GridEventConsumeHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/GridEventConsumeHandler.java
@@ -26,6 +26,7 @@ import java.util.LinkedList;
import java.util.Map;
import java.util.Queue;
import java.util.UUID;
+import javax.cache.event.CacheEntryEventFilter;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.events.CacheEvent;
@@ -141,6 +142,11 @@ class GridEventConsumeHandler implements GridContinuousHandler {
}
/** {@inheritDoc} */
+ @Override public CacheEntryEventFilter getEventFilter() {
+ return null;
+ }
+
+ /** {@inheritDoc} */
@Override public RegisterStatus register(final UUID nodeId, final UUID routineId, final GridKernalContext ctx)
throws IgniteCheckedException {
assert nodeId != null;
http://git-wip-us.apache.org/repos/asf/ignite/blob/9ad476b2/modules/core/src/main/java/org/apache/ignite/internal/GridMessageListenHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridMessageListenHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/GridMessageListenHandler.java
index 402365c..e157c98 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/GridMessageListenHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/GridMessageListenHandler.java
@@ -24,6 +24,7 @@ import java.io.ObjectOutput;
import java.util.Collection;
import java.util.Map;
import java.util.UUID;
+import javax.cache.event.CacheEntryEventFilter;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.internal.managers.deployment.GridDeployment;
import org.apache.ignite.internal.managers.deployment.GridDeploymentInfoBean;
@@ -130,6 +131,11 @@ public class GridMessageListenHandler implements GridContinuousHandler {
}
/** {@inheritDoc} */
+ @Override public CacheEntryEventFilter getEventFilter() {
+ return null;
+ }
+
+ /** {@inheritDoc} */
@Override public RegisterStatus register(UUID nodeId, UUID routineId, final GridKernalContext ctx)
throws IgniteCheckedException {
ctx.io().addUserMessageListener(topic, pred);
http://git-wip-us.apache.org/repos/asf/ignite/blob/9ad476b2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
index 9e9b985..690e0b4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
@@ -97,16 +97,6 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V
/** */
private static final long serialVersionUID = 0L;
- /** */
- private static final IgniteBiPredicate ACCEPT_ALL = new IgniteBiPredicate() {
- /** */
- private static final long serialVersionUID = -1640538788290240617L;
-
- @Override public boolean apply(Object k, Object v) {
- return true;
- }
- };
-
/** Context. */
private GridCacheContext<K, V> ctx;
@@ -565,6 +555,9 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V
if (qry.getLocalListener() == null)
throw new IgniteException("Mandatory local listener is not set for the query: " + qry);
+ if (qry.getRemoteFilter() != null && qry.getRemoteFilterFactory() != null)
+ throw new IgniteException("Should be used either RemoterFilter or RemoteFilterFactory.");
+
try {
final UUID routineId = ctx.continuousQueries().executeQuery(
qry.getLocalListener(),
http://git-wip-us.apache.org/repos/asf/ignite/blob/9ad476b2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
index 47f5c52..393f7fb 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
@@ -25,7 +25,6 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
-import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
@@ -263,7 +262,7 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
if (locLsnr != null)
ctx.resource().injectGeneric(locLsnr);
- final CacheEntryEventFilter filter = getRemoteFilter();
+ final CacheEntryEventFilter filter = getEventFilter();
if (filter != null)
ctx.resource().injectGeneric(filter);
@@ -521,10 +520,8 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
return mgr.registerListener(routineId, lsnr, internal);
}
- /**
- * @return Remote filter.
- */
- protected CacheEntryEventFilter<K, V> getRemoteFilter() {
+ /** {@inheritDoc} */
+ public CacheEntryEventFilter getEventFilter() {
return rmtFilter;
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/9ad476b2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandlerV2.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandlerV2.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandlerV2.java
index 4573e6c..628e1c3 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandlerV2.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandlerV2.java
@@ -26,8 +26,10 @@ import javax.cache.configuration.Factory;
import javax.cache.event.CacheEntryEventFilter;
import javax.cache.event.CacheEntryUpdatedListener;
import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.cache.CacheEntryEventSerializableFilter;
import org.apache.ignite.internal.GridKernalContext;
+import org.apache.ignite.internal.IgniteDeploymentCheckedException;
+import org.apache.ignite.internal.managers.deployment.GridDeployment;
+import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryManager.JCacheRemoteQueryFactory;
import org.apache.ignite.internal.processors.continuous.GridContinuousHandler;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
@@ -40,13 +42,13 @@ public class CacheContinuousQueryHandlerV2<K, V> extends CacheContinuousQueryHan
private static final long serialVersionUID = 0L;
/** Remote filter factory. */
- private Factory<? extends CacheEntryEventSerializableFilter<K, V>> rmtFilterFactory;
+ private Factory<? extends CacheEntryEventFilter> rmtFilterFactory;
/** Deployable object for filter factory. */
private DeployableObject rmtFilterFactoryDep;
/** */
- protected transient CacheEntryEventFilter<K, V> rmtNonSerFilter;
+ protected transient CacheEntryEventFilter filter;
/**
* Required by {@link Externalizable}.
@@ -76,7 +78,7 @@ public class CacheContinuousQueryHandlerV2<K, V> extends CacheContinuousQueryHan
String cacheName,
Object topic,
CacheEntryUpdatedListener<K, V> locLsnr,
- Factory<? extends CacheEntryEventSerializableFilter<K, V>> rmtFilterFactory,
+ Factory<? extends CacheEntryEventFilter<K, V>> rmtFilterFactory,
boolean internal,
boolean notifyExisting,
boolean oldValRequired,
@@ -108,14 +110,14 @@ public class CacheContinuousQueryHandlerV2<K, V> extends CacheContinuousQueryHan
}
/** {@inheritDoc} */
- @Override protected CacheEntryEventFilter<K, V> getRemoteFilter() {
- if (rmtNonSerFilter == null) {
+ @Override public CacheEntryEventFilter getEventFilter() {
+ if (filter == null) {
assert rmtFilterFactory != null;
- rmtNonSerFilter = rmtFilterFactory.create();
+ filter = rmtFilterFactory.create();
}
- return rmtNonSerFilter;
+ return filter;
}
/** {@inheritDoc} */
@@ -168,6 +170,6 @@ public class CacheContinuousQueryHandlerV2<K, V> extends CacheContinuousQueryHan
if (b)
rmtFilterFactoryDep = (DeployableObject)in.readObject();
else
- rmtFilterFactory = (Factory<CacheEntryEventSerializableFilter<K, V>>)in.readObject();
+ rmtFilterFactory = (Factory)in.readObject();
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/9ad476b2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
index fd4d71e..33d6d59 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
@@ -21,10 +21,11 @@ import java.io.Externalizable;
import java.io.IOException;
import java.io.ObjectInput;
import java.io.ObjectOutput;
+import java.io.Serializable;
import java.util.ArrayList;
-import java.util.Map;
import java.util.Collection;
import java.util.Iterator;
+import java.util.Map;
import java.util.NoSuchElementException;
import java.util.UUID;
import java.util.concurrent.ConcurrentMap;
@@ -415,29 +416,44 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter {
* @throws IgniteCheckedException In case of error.
*/
public UUID executeQuery(CacheEntryUpdatedListener locLsnr,
- CacheEntryEventSerializableFilter rmtFilter,
- Factory<? extends CacheEntryEventSerializableFilter> rmtFilterFactory,
+ @Nullable CacheEntryEventSerializableFilter rmtFilter,
+ @Nullable Factory<? extends CacheEntryEventSerializableFilter> rmtFilterFactory,
int bufSize,
long timeInterval,
boolean autoUnsubscribe,
boolean loc,
boolean keepBinary) throws IgniteCheckedException
{
- return executeQuery0(
- locLsnr,
- rmtFilter,
- rmtFilterFactory,
- bufSize,
- timeInterval,
- autoUnsubscribe,
- false,
- false,
- true,
- false,
- true,
- loc,
- keepBinary,
- false);
+ if (rmtFilterFactory != null)
+ return executeQueryWithFilterFactory(
+ locLsnr,
+ rmtFilterFactory,
+ bufSize,
+ timeInterval,
+ autoUnsubscribe,
+ false,
+ false,
+ true,
+ false,
+ true,
+ loc,
+ keepBinary,
+ false);
+ else
+ return executeQueryWithFilter(
+ locLsnr,
+ rmtFilter,
+ bufSize,
+ timeInterval,
+ autoUnsubscribe,
+ false,
+ false,
+ true,
+ false,
+ true,
+ loc,
+ keepBinary,
+ false);
}
/**
@@ -455,10 +471,9 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter {
boolean ignoreClassNotFound)
throws IgniteCheckedException
{
- return executeQuery0(
+ return executeQueryWithFilter(
locLsnr,
rmtFilter,
- null,
ContinuousQuery.DFLT_PAGE_SIZE,
ContinuousQuery.DFLT_TIME_INTERVAL,
ContinuousQuery.DFLT_AUTO_UNSUBSCRIBE,
@@ -556,8 +571,162 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter {
* @return Continuous routine ID.
* @throws IgniteCheckedException In case of error.
*/
- private UUID executeQuery0(CacheEntryUpdatedListener locLsnr,
+ private UUID executeQueryWithFilter(CacheEntryUpdatedListener locLsnr,
final CacheEntryEventSerializableFilter rmtFilter,
+ int bufSize,
+ long timeInterval,
+ boolean autoUnsubscribe,
+ boolean internal,
+ boolean notifyExisting,
+ boolean oldValRequired,
+ boolean sync,
+ boolean ignoreExpired,
+ boolean loc,
+ final boolean keepBinary,
+ boolean ignoreClassNotFound) throws IgniteCheckedException
+ {
+ cctx.checkSecurity(SecurityPermission.CACHE_READ);
+
+ int taskNameHash = !internal && cctx.kernalContext().security().enabled() ?
+ cctx.kernalContext().job().currentTaskNameHash() : 0;
+
+ boolean skipPrimaryCheck = loc && cctx.config().getCacheMode() == CacheMode.REPLICATED && cctx.affinityNode();
+
+ GridContinuousHandler hnd = new CacheContinuousQueryHandler(
+ cctx.name(),
+ TOPIC_CACHE.topic(topicPrefix, cctx.localNodeId(), seq.getAndIncrement()),
+ locLsnr,
+ rmtFilter,
+ internal,
+ notifyExisting,
+ oldValRequired,
+ sync,
+ ignoreExpired,
+ taskNameHash,
+ skipPrimaryCheck,
+ cctx.isLocal(),
+ keepBinary,
+ ignoreClassNotFound);
+
+ return executeQuery0(locLsnr,
+ bufSize,
+ timeInterval,
+ autoUnsubscribe,
+ notifyExisting,
+ loc,
+ keepBinary,
+ hnd);
+ }
+
+
+ /**
+ * @param locLsnr Local listener.
+ * @param bufSize Buffer size.
+ * @param timeInterval Time interval.
+ * @param autoUnsubscribe Auto unsubscribe flag.
+ * @param internal Internal flag.
+ * @param notifyExisting Notify existing flag.
+ * @param oldValRequired Old value required flag.
+ * @param sync Synchronous flag.
+ * @param ignoreExpired Ignore expired event flag.
+ * @param loc Local flag.
+ * @return Continuous routine ID.
+ * @throws IgniteCheckedException In case of error.
+ */
+ private UUID executeJCacheQueryFactory(CacheEntryUpdatedListener locLsnr,
+ final JCacheRemoteQueryFactory rmtFilterFactory,
+ int bufSize,
+ long timeInterval,
+ boolean autoUnsubscribe,
+ boolean internal,
+ boolean notifyExisting,
+ boolean oldValRequired,
+ boolean sync,
+ boolean ignoreExpired,
+ boolean loc,
+ final boolean keepBinary,
+ boolean ignoreClassNotFound) throws IgniteCheckedException
+ {
+ cctx.checkSecurity(SecurityPermission.CACHE_READ);
+
+ int taskNameHash = !internal && cctx.kernalContext().security().enabled() ?
+ cctx.kernalContext().job().currentTaskNameHash() : 0;
+
+ boolean skipPrimaryCheck = loc && cctx.config().getCacheMode() == CacheMode.REPLICATED && cctx.affinityNode();
+
+ boolean v2 = useV2Protocol(cctx.discovery().allNodes());
+
+ GridContinuousHandler hnd;
+
+ if (v2)
+ hnd = new CacheContinuousQueryHandlerV2(
+ cctx.name(),
+ TOPIC_CACHE.topic(topicPrefix, cctx.localNodeId(), seq.getAndIncrement()),
+ locLsnr,
+ rmtFilterFactory,
+ internal,
+ notifyExisting,
+ oldValRequired,
+ sync,
+ ignoreExpired,
+ taskNameHash,
+ skipPrimaryCheck,
+ cctx.isLocal(),
+ keepBinary,
+ ignoreClassNotFound);
+ else {
+ JCacheQueryRemoteFilter fltr = null;
+
+ if (rmtFilterFactory != null) {
+ fltr = rmtFilterFactory.create();
+
+ if (!(fltr.impl instanceof Serializable))
+ throw new IgniteCheckedException("Topology has nodes of the old versions. In this case " +
+ "EntryEventFilter must implement java.io.Serializable interface. Filter: " + fltr.impl);
+ }
+
+ hnd = new CacheContinuousQueryHandler(
+ cctx.name(),
+ TOPIC_CACHE.topic(topicPrefix, cctx.localNodeId(), seq.getAndIncrement()),
+ locLsnr,
+ fltr,
+ internal,
+ notifyExisting,
+ oldValRequired,
+ sync,
+ ignoreExpired,
+ taskNameHash,
+ skipPrimaryCheck,
+ cctx.isLocal(),
+ keepBinary,
+ ignoreClassNotFound);
+ }
+
+ return executeQuery0(locLsnr,
+ bufSize,
+ timeInterval,
+ autoUnsubscribe,
+ notifyExisting,
+ loc,
+ keepBinary,
+ hnd);
+ }
+
+ /**
+ * @param locLsnr Local listener.
+ * @param bufSize Buffer size.
+ * @param timeInterval Time interval.
+ * @param autoUnsubscribe Auto unsubscribe flag.
+ * @param internal Internal flag.
+ * @param notifyExisting Notify existing flag.
+ * @param oldValRequired Old value required flag.
+ * @param sync Synchronous flag.
+ * @param ignoreExpired Ignore expired event flag.
+ * @param loc Local flag.
+ * @return Continuous routine ID.
+ * @throws IgniteCheckedException In case of error.
+ */
+ private UUID executeQueryWithFilterFactory(CacheEntryUpdatedListener locLsnr,
final Factory<? extends CacheEntryEventFilter> rmtFilterFactory,
int bufSize,
long timeInterval,
@@ -582,9 +751,7 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter {
GridContinuousHandler hnd;
- if (v2) {
- assert rmtFilter == null : rmtFilter;
-
+ if (v2)
hnd = new CacheContinuousQueryHandlerV2(
cctx.name(),
TOPIC_CACHE.topic(topicPrefix, cctx.localNodeId(), seq.getAndIncrement()),
@@ -600,7 +767,6 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter {
cctx.isLocal(),
keepBinary,
ignoreClassNotFound);
- }
else {
CacheEntryEventFilter fltr = null;
@@ -608,8 +774,9 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter {
fltr = rmtFilterFactory.create();
if (!(fltr instanceof CacheEntryEventSerializableFilter))
- throw new IgniteCheckedException("Cache entry event filter must implement " +
- "org.apache.ignite.cache.CacheEntryEventSerializableFilter: " + fltr);
+ throw new IgniteCheckedException("Topology has nodes of the old versions. In this case " +
+ "EntryEventFilter should implement org.apache.ignite.cache.CacheEntryEventSerializableFilter " +
+ "interface. Filter: " + fltr);
}
hnd = new CacheContinuousQueryHandler(
@@ -629,6 +796,37 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter {
ignoreClassNotFound);
}
+ return executeQuery0(locLsnr,
+ bufSize,
+ timeInterval,
+ autoUnsubscribe,
+ notifyExisting,
+ loc,
+ keepBinary,
+ hnd);
+ }
+
+ /**
+ * @param locLsnr Local listener.
+ * @param bufSize Buffer size.
+ * @param timeInterval Time interval.
+ * @param autoUnsubscribe Auto unsubscribe flag.
+ * @param notifyExisting Notify existing flag.
+ * @param loc Local flag.
+ * @param keepBinary Keep binary.
+ * @param hnd Handler.
+ * @return Continuous routine ID.
+ * @throws IgniteCheckedException In case of error.
+ */
+ private UUID executeQuery0(CacheEntryUpdatedListener locLsnr,
+ int bufSize,
+ long timeInterval,
+ boolean autoUnsubscribe,
+ boolean notifyExisting,
+ boolean loc,
+ final boolean keepBinary,
+ final GridContinuousHandler hnd)
+ throws IgniteCheckedException {
IgnitePredicate<ClusterNode> pred = (loc || cctx.config().getCacheMode() == CacheMode.LOCAL) ?
F.nodeForNodeId(cctx.localNodeId()) : F.<ClusterNode>alwaysTrue();
@@ -694,7 +892,7 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter {
cctx.kernalContext().cache().jcache(cctx.name()),
cctx, entry);
- if (rmtFilter != null && !rmtFilter.evaluate(next))
+ if (hnd.getEventFilter() != null && !hnd.getEventFilter().evaluate(next))
next = null;
}
}
@@ -825,9 +1023,8 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter {
locLsnrImpl,
log);
- routineId = executeQuery0(
+ routineId = executeJCacheQueryFactory(
locLsnr,
- null,
new JCacheRemoteQueryFactory(cfg.getCacheEntryEventFilterFactory(), types),
ContinuousQuery.DFLT_PAGE_SIZE,
ContinuousQuery.DFLT_TIME_INTERVAL,
@@ -1026,12 +1223,12 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter {
/**
*
*/
- private static class JCacheRemoteQueryFactory implements Factory<CacheEntryEventFilter> {
+ protected static class JCacheRemoteQueryFactory implements Factory<CacheEntryEventFilter> {
/** */
private static final long serialVersionUID = 0L;
/** Factory. */
- private Factory<CacheEntryEventFilter> impl;
+ protected Factory<CacheEntryEventFilter> impl;
/** */
private byte types;
@@ -1046,7 +1243,7 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter {
}
/** {@inheritDoc} */
- @Override public CacheEntryEventFilter create() {
+ @Override public JCacheQueryRemoteFilter create() {
return new JCacheQueryRemoteFilter(impl != null ? impl.create() : null, types);
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/9ad476b2/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousHandler.java
index 8cd30a8..232e1ff 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousHandler.java
@@ -21,6 +21,7 @@ import java.io.Externalizable;
import java.util.Collection;
import java.util.Map;
import java.util.UUID;
+import javax.cache.event.CacheEntryEventFilter;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
@@ -158,4 +159,9 @@ public interface GridContinuousHandler extends Externalizable, Cloneable {
* @param topVer Topology version.
*/
public void updateCounters(AffinityTopologyVersion topVer, Map<Integer, Long> cntrs);
+
+ /**
+ * @return Cache entry filter.
+ */
+ public CacheEntryEventFilter getEventFilter();
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/9ad476b2/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerAbstractTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerAbstractTest.java
index 62a4153..e61127d 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerAbstractTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerAbstractTest.java
@@ -42,6 +42,7 @@ import javax.cache.configuration.FactoryBuilder;
import javax.cache.configuration.MutableCacheEntryListenerConfiguration;
import javax.cache.event.CacheEntryCreatedListener;
import javax.cache.event.CacheEntryEvent;
+import javax.cache.event.CacheEntryEventFilter;
import javax.cache.event.CacheEntryExpiredListener;
import javax.cache.event.CacheEntryListener;
import javax.cache.event.CacheEntryListenerException;
@@ -62,6 +63,7 @@ import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.processors.continuous.GridContinuousProcessor;
import org.apache.ignite.internal.util.lang.GridAbsPredicate;
+import org.apache.ignite.internal.util.typedef.PA;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.testframework.GridTestUtils;
@@ -99,6 +101,9 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb
/** */
private boolean useObjects;
+ /** */
+ private static AtomicBoolean serialized = new AtomicBoolean(false);
+
/** {@inheritDoc} */
@SuppressWarnings("unchecked")
@Override protected CacheConfiguration cacheConfiguration(String gridName) throws Exception {
@@ -138,6 +143,8 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb
assertEquals(0, syncMsgFuts.size());
}
+
+ serialized.set(false);
}
/**
@@ -439,18 +446,23 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb
jcache(0).registerCacheEntryListener(new MutableCacheEntryListenerConfiguration<>(
FactoryBuilder.factoryOf(lsnr),
- null,
+ new SerializableFactory(),
true,
false
));
try {
startGrid(gridCount());
+
+ jcache(0).put(1, 1);
}
finally {
stopGrid(gridCount());
}
+ jcache(0).put(2, 2);
+
+ assertFalse(IgniteCacheEntryListenerAbstractTest.serialized.get());
assertFalse(serialized.get());
}
@@ -527,16 +539,16 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb
/**
* @throws Exception If failed.
*/
- public void testEventsObjectKeyValue() throws Exception {
+ public void _testEventsObjectKeyValue() throws Exception {
useObjects = true;
- testEvents();
+ _testEvents();
}
/**
* @throws Exception If failed.
*/
- public void testEvents() throws Exception {
+ public void _testEvents() throws Exception {
IgniteCache<Object, Object> cache = jcache();
Map<Object, Object> vals = new HashMap<>();
@@ -1126,9 +1138,9 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb
/**
*
*/
- private static class TestFilterFactory implements Factory<CacheEntryEventSerializableFilter<Object, Object>> {
+ private static class TestFilterFactory implements Factory<CacheEntryEventFilter<Object, Object>> {
/** {@inheritDoc} */
- @Override public CacheEntryEventSerializableFilter<Object, Object> create() {
+ @Override public CacheEntryEventFilter<Object, Object> create() {
return new TestFilter();
}
}
@@ -1180,7 +1192,7 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb
/**
*
*/
- private static class TestFilter implements CacheEntryEventSerializableFilter<Object, Object> {
+ private static class TestFilter implements CacheEntryEventFilter<Object, Object>, Externalizable {
/** {@inheritDoc} */
@Override public boolean evaluate(CacheEntryEvent<?, ?> evt) {
assert evt != null;
@@ -1197,6 +1209,16 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb
return key % 2 == 0;
}
+
+ /** {@inheritDoc} */
+ @Override public void writeExternal(ObjectOutput out) throws IOException {
+ throw new UnsupportedOperationException("Filter muns't be marshaled.");
+ }
+
+ /** {@inheritDoc} */
+ @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+ throw new UnsupportedOperationException("Filter muns't be unmarshaled.");
+ }
}
/**
@@ -1351,6 +1373,36 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb
}
/**
+ *
+ */
+ public static class SerializableFactory implements Factory<NonSerializableFilter> {
+ /** {@inheritDoc} */
+ @Override public NonSerializableFilter create() {
+ return new NonSerializableFilter();
+ }
+ }
+
+ /**
+ *
+ */
+ public static class NonSerializableFilter implements CacheEntryEventFilter<Object, Object>, Externalizable {
+ /** {@inheritDoc} */
+ @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+ serialized.set(true);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void writeExternal(ObjectOutput out) throws IOException {
+ serialized.set(true);
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean evaluate(CacheEntryEvent<?, ?> event) throws CacheEntryListenerException {
+ return true;
+ }
+ }
+
+ /**
*/
public static class NonSerializableListener implements CacheEntryCreatedListener<Object, Object>, Externalizable {
/** */
http://git-wip-us.apache.org/repos/asf/ignite/blob/9ad476b2/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFactoryFilterTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFactoryFilterTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFactoryFilterTest.java
index 2ff2b79..d6d30ae 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFactoryFilterTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFactoryFilterTest.java
@@ -21,69 +21,47 @@ import java.io.Externalizable;
import java.io.IOException;
import java.io.ObjectInput;
import java.io.ObjectOutput;
-import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collection;
-import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Random;
-import java.util.SortedMap;
-import java.util.TreeMap;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
-import javax.cache.Cache;
+import java.util.concurrent.ThreadLocalRandom;
+import javax.cache.configuration.CacheEntryListenerConfiguration;
import javax.cache.configuration.Factory;
import javax.cache.configuration.FactoryBuilder;
-import javax.cache.configuration.FactoryBuilder.SingletonFactory;
+import javax.cache.configuration.MutableCacheEntryListenerConfiguration;
+import javax.cache.event.CacheEntryCreatedListener;
import javax.cache.event.CacheEntryEvent;
-import javax.cache.event.CacheEntryEventFilter;
+import javax.cache.event.CacheEntryExpiredListener;
import javax.cache.event.CacheEntryListenerException;
+import javax.cache.event.CacheEntryRemovedListener;
import javax.cache.event.CacheEntryUpdatedListener;
-import javax.cache.integration.CacheLoaderException;
-import javax.cache.integration.CacheWriterException;
-import javax.cache.processor.EntryProcessor;
-import javax.cache.processor.MutableEntry;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
-import org.apache.ignite.cache.CacheAtomicityMode;
import org.apache.ignite.cache.CacheEntryEventSerializableFilter;
-import org.apache.ignite.cache.CacheMemoryMode;
-import org.apache.ignite.cache.CacheMode;
import org.apache.ignite.cache.affinity.Affinity;
import org.apache.ignite.cache.query.CacheQueryEntryEvent;
import org.apache.ignite.cache.query.ContinuousQuery;
import org.apache.ignite.cache.query.QueryCursor;
-import org.apache.ignite.cache.store.CacheStore;
-import org.apache.ignite.cache.store.CacheStoreAdapter;
import org.apache.ignite.configuration.CacheConfiguration;
-import org.apache.ignite.configuration.IgniteConfiguration;
-import org.apache.ignite.internal.util.typedef.internal.S;
-import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
-import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
-import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
-import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryRandomOperationsTest.QueryTestKey;
+import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryRandomOperationsTest.QueryTestValue;
+import org.apache.ignite.internal.util.typedef.T2;
import org.apache.ignite.transactions.Transaction;
import org.apache.ignite.transactions.TransactionConcurrency;
import org.apache.ignite.transactions.TransactionIsolation;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static java.util.concurrent.TimeUnit.SECONDS;
-import static org.apache.ignite.cache.CacheAtomicWriteOrderMode.PRIMARY;
-import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC;
import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
-import static org.apache.ignite.cache.CacheMemoryMode.OFFHEAP_TIERED;
-import static org.apache.ignite.cache.CacheMemoryMode.OFFHEAP_VALUES;
-import static org.apache.ignite.cache.CacheMemoryMode.ONHEAP_TIERED;
-import static org.apache.ignite.cache.CacheMode.PARTITIONED;
-import static org.apache.ignite.cache.CacheMode.REPLICATED;
-import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
-import static org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryFactoryFilterTest.ContinuousDeploy.ALL;
-import static org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryFactoryFilterTest.ContinuousDeploy.CLIENT;
-import static org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryFactoryFilterTest.ContinuousDeploy.SERVER;
import static org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryFactoryFilterTest.NonSerializableFilter.isAccepted;
+import static org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryRandomOperationsTest.ContinuousDeploy.CLIENT;
+import static org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryRandomOperationsTest.ContinuousDeploy.SERVER;
import static org.apache.ignite.transactions.TransactionIsolation.READ_COMMITTED;
import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ;
import static org.apache.ignite.transactions.TransactionIsolation.SERIALIZABLE;
@@ -91,10 +69,7 @@ import static org.apache.ignite.transactions.TransactionIsolation.SERIALIZABLE;
/**
*
*/
-public class CacheContinuousQueryFactoryFilterTest extends GridCommonAbstractTest {
- /** */
- private static TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true);
-
+public class CacheContinuousQueryFactoryFilterTest extends CacheContinuousQueryRandomOperationsTest {
/** */
private static final int NODES = 5;
@@ -105,488 +80,10 @@ public class CacheContinuousQueryFactoryFilterTest extends GridCommonAbstractTes
private static final int VALS = 10;
/** */
- public static final int ITERATION_CNT = 100;
-
- /** */
- private boolean client;
+ public static final int ITERATION_CNT = 40;
/** {@inheritDoc} */
- @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
- IgniteConfiguration cfg = super.getConfiguration(gridName);
-
- ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(ipFinder);
-
- cfg.setClientMode(client);
-
- return cfg;
- }
-
- /** {@inheritDoc} */
- @Override protected void beforeTestsStarted() throws Exception {
- super.beforeTestsStarted();
-
- startGridsMultiThreaded(NODES - 1);
-
- client = true;
-
- startGrid(NODES - 1);
- }
-
- /** {@inheritDoc} */
- @Override protected void afterTestsStopped() throws Exception {
- stopAllGrids();
-
- super.afterTestsStopped();
- }
-
- /**
- * @throws Exception If failed.
- */
- public void testAtomicClient() throws Exception {
- CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED,
- 1,
- ATOMIC,
- ONHEAP_TIERED,
- false);
-
- testContinuousQuery(ccfg, CLIENT);
- }
-
- /**
- * @throws Exception If failed.
- */
- public void testAtomic() throws Exception {
- CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED,
- 1,
- ATOMIC,
- ONHEAP_TIERED,
- false);
-
- testContinuousQuery(ccfg, SERVER);
- }
-
- /**
- * @throws Exception If failed.
- */
- public void testAtomicAllNodes() throws Exception {
- CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED,
- 1,
- ATOMIC,
- ONHEAP_TIERED,
- false);
-
- testContinuousQuery(ccfg, ALL);
- }
-
- /**
- * @throws Exception If failed.
- */
- public void testAtomicReplicated() throws Exception {
- CacheConfiguration<Object, Object> ccfg = cacheConfiguration(REPLICATED,
- 0,
- ATOMIC,
- ONHEAP_TIERED,
- false);
-
- testContinuousQuery(ccfg, SERVER);
- }
-
- /**
- * @throws Exception If failed.
- */
- public void testAtomicReplicatedAllNodes() throws Exception {
- CacheConfiguration<Object, Object> ccfg = cacheConfiguration(REPLICATED,
- 0,
- ATOMIC,
- ONHEAP_TIERED,
- false);
-
- testContinuousQuery(ccfg, ALL);
- }
-
- /**
- * @throws Exception If failed.
- */
- public void testAtomicReplicatedClient() throws Exception {
- CacheConfiguration<Object, Object> ccfg = cacheConfiguration(REPLICATED,
- 0,
- ATOMIC,
- ONHEAP_TIERED,
- false);
-
- testContinuousQuery(ccfg, CLIENT);
- }
-
- /**
- * @throws Exception If failed.
- */
- public void testAtomicOffheapValues() throws Exception {
- CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED,
- 1,
- ATOMIC,
- OFFHEAP_VALUES,
- false);
-
- testContinuousQuery(ccfg, SERVER);
- }
-
- /**
- * @throws Exception If failed.
- */
- public void testAtomicOffheapValuesAllNodes() throws Exception {
- CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED,
- 1,
- ATOMIC,
- OFFHEAP_VALUES,
- false);
-
- testContinuousQuery(ccfg, ALL);
- }
-
- /**
- * @throws Exception If failed.
- */
- public void testAtomicOffheapValuesClient() throws Exception {
- CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED,
- 1,
- ATOMIC,
- OFFHEAP_VALUES,
- false);
-
- testContinuousQuery(ccfg, CLIENT);
- }
-
- /**
- * @throws Exception If failed.
- */
- public void testAtomicOffheapTiered() throws Exception {
- CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED,
- 1,
- ATOMIC,
- OFFHEAP_TIERED,
- false);
-
- testContinuousQuery(ccfg, SERVER);
- }
-
- /**
- * @throws Exception If failed.
- */
- public void testAtomicOffheapTieredAllNodes() throws Exception {
- CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED,
- 1,
- ATOMIC,
- OFFHEAP_TIERED,
- false);
-
- testContinuousQuery(ccfg, ALL);
- }
-
- /**
- * @throws Exception If failed.
- */
- public void testAtomicOffheapTieredClient() throws Exception {
- CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED,
- 1,
- ATOMIC,
- OFFHEAP_TIERED,
- false);
-
- testContinuousQuery(ccfg, CLIENT);
- }
-
- /**
- * @throws Exception If failed.
- */
- public void testAtomicNoBackups() throws Exception {
- CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED,
- 0,
- ATOMIC,
- ONHEAP_TIERED,
- false);
-
- testContinuousQuery(ccfg, SERVER);
- }
-
- /**
- * @throws Exception If failed.
- */
- public void testAtomicNoBackupsAllNodes() throws Exception {
- CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED,
- 0,
- ATOMIC,
- ONHEAP_TIERED,
- false);
-
- testContinuousQuery(ccfg, ALL);
- }
-
- /**
- * @throws Exception If failed.
- */
- public void testAtomicNoBackupsClient() throws Exception {
- CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED,
- 0,
- ATOMIC,
- ONHEAP_TIERED,
- false);
-
- testContinuousQuery(ccfg, CLIENT);
- }
-
- /**
- * @throws Exception If failed.
- */
- public void testTx() throws Exception {
- CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED,
- 1,
- TRANSACTIONAL,
- ONHEAP_TIERED,
- false);
-
- testContinuousQuery(ccfg, SERVER);
- }
-
- /**
- * @throws Exception If failed.
- */
- public void testTxAllNodes() throws Exception {
- CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED,
- 1,
- TRANSACTIONAL,
- ONHEAP_TIERED,
- false);
-
- testContinuousQuery(ccfg, ALL);
- }
-
- /**
- * @throws Exception If failed.
- */
- public void testTxExplicit() throws Exception {
- CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED,
- 1,
- TRANSACTIONAL,
- ONHEAP_TIERED,
- false);
-
- testContinuousQuery(ccfg, SERVER);
- }
-
- /**
- * @throws Exception If failed.
- */
- public void testTxClient() throws Exception {
- CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED,
- 1,
- TRANSACTIONAL,
- ONHEAP_TIERED,
- false);
-
- testContinuousQuery(ccfg, CLIENT);
- }
-
- /**
- * @throws Exception If failed.
- */
- public void testTxClientExplicit() throws Exception {
- CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED,
- 1,
- TRANSACTIONAL,
- ONHEAP_TIERED,
- false);
-
- testContinuousQuery(ccfg, CLIENT);
- }
-
- /**
- * @throws Exception If failed.
- */
- public void testTxReplicated() throws Exception {
- CacheConfiguration<Object, Object> ccfg = cacheConfiguration(REPLICATED,
- 0,
- TRANSACTIONAL,
- ONHEAP_TIERED,
- false);
-
- testContinuousQuery(ccfg, SERVER);
- }
-
- /**
- * @throws Exception If failed.
- */
- public void testTxReplicatedClient() throws Exception {
- CacheConfiguration<Object, Object> ccfg = cacheConfiguration(REPLICATED,
- 0,
- TRANSACTIONAL,
- ONHEAP_TIERED,
- false);
-
- testContinuousQuery(ccfg, CLIENT);
- }
-
- /**
- * @throws Exception If failed.
- */
- public void testTxOffheapValues() throws Exception {
- CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED,
- 1,
- TRANSACTIONAL,
- OFFHEAP_VALUES,
- false);
-
- testContinuousQuery(ccfg, SERVER);
- }
-
- /**
- * @throws Exception If failed.
- */
- public void testTxOffheapValuesAllNodes() throws Exception {
- CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED,
- 1,
- TRANSACTIONAL,
- OFFHEAP_VALUES,
- false);
-
- testContinuousQuery(ccfg, ALL);
- }
-
- /**
- * @throws Exception If failed.
- */
- public void testTxOffheapValuesExplicit() throws Exception {
- CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED,
- 1,
- TRANSACTIONAL,
- OFFHEAP_VALUES,
- false);
-
- testContinuousQuery(ccfg, SERVER);
- }
-
- /**
- * @throws Exception If failed.
- */
- public void testTxOffheapValuesClient() throws Exception {
- CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED,
- 1,
- TRANSACTIONAL,
- OFFHEAP_VALUES,
- false);
-
- testContinuousQuery(ccfg, CLIENT);
- }
-
- /**
- * @throws Exception If failed.
- */
- public void testTxOffheapTiered() throws Exception {
- CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED,
- 1,
- TRANSACTIONAL,
- OFFHEAP_TIERED,
- false);
-
- testContinuousQuery(ccfg, SERVER);
- }
-
- /**
- * @throws Exception If failed.
- */
- public void testTxOffheapTieredAllNodes() throws Exception {
- CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED,
- 1,
- TRANSACTIONAL,
- OFFHEAP_TIERED,
- false);
-
- testContinuousQuery(ccfg, ALL);
- }
-
- /**
- * @throws Exception If failed.
- */
- public void testTxOffheapTieredClient() throws Exception {
- CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED,
- 1,
- TRANSACTIONAL,
- OFFHEAP_TIERED,
- false);
-
- testContinuousQuery(ccfg, CLIENT);
- }
-
- /**
- * @throws Exception If failed.
- */
- public void testTxOffheapTieredClientExplicit() throws Exception {
- CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED,
- 1,
- TRANSACTIONAL,
- OFFHEAP_TIERED,
- false);
-
- testContinuousQuery(ccfg, CLIENT);
- }
-
- /**
- * @throws Exception If failed.
- */
- public void testTxNoBackups() throws Exception {
- CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED,
- 0,
- TRANSACTIONAL,
- ONHEAP_TIERED,
- false);
-
- testContinuousQuery(ccfg, SERVER);
- }
-
- /**
- * @throws Exception If failed.
- */
- public void testTxNoBackupsAllNodes() throws Exception {
- CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED,
- 0,
- TRANSACTIONAL,
- ONHEAP_TIERED,
- false);
-
- testContinuousQuery(ccfg, ALL);
- }
-
- /**
- * @throws Exception If failed.
- */
- public void testTxNoBackupsExplicit() throws Exception {
- CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED,
- 0,
- TRANSACTIONAL,
- ONHEAP_TIERED,
- false);
-
- testContinuousQuery(ccfg, SERVER);
- }
-
- /**
- * @throws Exception If failed.
- */
- public void testTxNoBackupsClient() throws Exception {
- CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED,
- 0,
- TRANSACTIONAL,
- ONHEAP_TIERED,
- false);
-
- testContinuousQuery(ccfg, CLIENT);
- }
-
- /**
- * @param ccfg Cache configuration.
- * @param deploy The place where continuous query will be started.
- * @throws Exception If failed.
- */
- private void testContinuousQuery(CacheConfiguration<Object, Object> ccfg, ContinuousDeploy deploy)
+ @Override protected void testContinuousQuery(CacheConfiguration<Object, Object> ccfg, ContinuousDeploy deploy)
throws Exception {
ignite(0).createCache(ccfg);
@@ -601,70 +98,18 @@ public class CacheContinuousQueryFactoryFilterTest extends GridCommonAbstractTes
Collection<QueryCursor<?>> curs = new ArrayList<>();
- if (deploy == CLIENT) {
- ContinuousQuery<QueryTestKey, QueryTestValue> qry = new ContinuousQuery<>();
-
- final BlockingQueue<CacheEntryEvent<?, ?>> evtsQueue = new ArrayBlockingQueue<>(50_000);
-
- qry.setLocalListener(new CacheEntryUpdatedListener<QueryTestKey, QueryTestValue>() {
- @Override public void onUpdated(Iterable<CacheEntryEvent<? extends QueryTestKey,
- ? extends QueryTestValue>> evts) throws CacheEntryListenerException {
- for (CacheEntryEvent<?, ?> evt : evts)
- evtsQueue.add(evt);
- }
- });
-
- qry.setRemoteFilterFactory(new FilterFactory());
-
- evtsQueues.add(evtsQueue);
+ Collection<T2<Integer, MutableCacheEntryListenerConfiguration>> lsnrCfgs = new ArrayList<>();
- QueryCursor<?> cur = grid(NODES - 1).cache(ccfg.getName()).query(qry);
-
- curs.add(cur);
- }
- else if (deploy == SERVER) {
- ContinuousQuery<QueryTestKey, QueryTestValue> qry = new ContinuousQuery<>();
-
- final BlockingQueue<CacheEntryEvent<?, ?>> evtsQueue = new ArrayBlockingQueue<>(50_000);
-
- qry.setLocalListener(new CacheEntryUpdatedListener<QueryTestKey, QueryTestValue>() {
- @Override public void onUpdated(Iterable<CacheEntryEvent<? extends QueryTestKey,
- ? extends QueryTestValue>> evts) throws CacheEntryListenerException {
- for (CacheEntryEvent<?, ?> evt : evts)
- evtsQueue.add(evt);
- }
- });
-
- qry.setRemoteFilterFactory(new FilterFactory());
-
- evtsQueues.add(evtsQueue);
-
- QueryCursor<?> cur = grid(rnd.nextInt(NODES - 1)).cache(ccfg.getName()).query(qry);
-
- curs.add(cur);
- }
+ if (deploy == CLIENT)
+ evtsQueues.add(registerListener(ccfg.getName(), NODES - 1, curs, lsnrCfgs, rnd.nextBoolean()));
+ else if (deploy == SERVER)
+ evtsQueues.add(registerListener(ccfg.getName(), rnd.nextInt(NODES - 1), curs, lsnrCfgs,
+ rnd.nextBoolean()));
else {
- for (int i = 0; i < NODES - 1; i++) {
- ContinuousQuery<QueryTestKey, QueryTestValue> qry = new ContinuousQuery<>();
+ boolean isSync = rnd.nextBoolean();
- final BlockingQueue<CacheEntryEvent<?, ?>> evtsQueue = new ArrayBlockingQueue<>(50_000);
-
- qry.setLocalListener(new CacheEntryUpdatedListener<QueryTestKey, QueryTestValue>() {
- @Override public void onUpdated(Iterable<CacheEntryEvent<? extends QueryTestKey,
- ? extends QueryTestValue>> evts) throws CacheEntryListenerException {
- for (CacheEntryEvent<?, ?> evt : evts)
- evtsQueue.add(evt);
- }
- });
-
- qry.setRemoteFilterFactory(new FilterFactory());
-
- evtsQueues.add(evtsQueue);
-
- QueryCursor<?> cur = ignite(i).cache(ccfg.getName()).query(qry);
-
- curs.add(cur);
- }
+ for (int i = 0; i < NODES - 1; i++)
+ evtsQueues.add(registerListener(ccfg.getName(), i, curs, lsnrCfgs, isSync));
}
ConcurrentMap<Object, Object> expData = new ConcurrentHashMap<>();
@@ -673,7 +118,7 @@ public class CacheContinuousQueryFactoryFilterTest extends GridCommonAbstractTes
try {
for (int i = 0; i < ITERATION_CNT; i++) {
- if (i % 20 == 0)
+ if (i % 10 == 0)
log.info("Iteration: " + i);
for (int idx = 0; idx < NODES; idx++)
@@ -683,6 +128,9 @@ public class CacheContinuousQueryFactoryFilterTest extends GridCommonAbstractTes
finally {
for (QueryCursor<?> cur : curs)
cur.close();
+
+ for (T2<Integer, MutableCacheEntryListenerConfiguration> e : lsnrCfgs)
+ grid(e.get1()).cache(ccfg.getName()).deregisterCacheEntryListener(e.get2());
}
}
finally {
@@ -691,6 +139,60 @@ public class CacheContinuousQueryFactoryFilterTest extends GridCommonAbstractTes
}
/**
+ * @param cacheName Cache name.
+ * @param nodeIdx Node index.
+ * @param curs Cursors.
+ * @param lsnrCfgs Listener configurations.
+ * @return Event queue
+ */
+ private BlockingQueue<CacheEntryEvent<?, ?>> registerListener(String cacheName,
+ int nodeIdx,
+ Collection<QueryCursor<?>> curs,
+ Collection<T2<Integer, MutableCacheEntryListenerConfiguration>> lsnrCfgs,
+ boolean sync) {
+ final BlockingQueue<CacheEntryEvent<?, ?>> evtsQueue = new ArrayBlockingQueue<>(50_000);
+
+ if (ThreadLocalRandom.current().nextBoolean()) {
+ MutableCacheEntryListenerConfiguration<QueryTestKey, QueryTestValue> lsnrCfg =
+ new MutableCacheEntryListenerConfiguration<>(
+ FactoryBuilder.factoryOf(new LocalNonSerialiseListener() {
+ @Override protected void onEvents(Iterable<CacheEntryEvent<? extends QueryTestKey,
+ ? extends QueryTestValue>> evts) {
+ for (CacheEntryEvent<?, ?> evt : evts)
+ evtsQueue.add(evt);
+ }
+ }),
+ new FilterFactory(),
+ true,
+ sync
+ );
+
+ grid(nodeIdx).cache(cacheName).registerCacheEntryListener((CacheEntryListenerConfiguration)lsnrCfg);
+
+ lsnrCfgs.add(new T2<Integer, MutableCacheEntryListenerConfiguration>(nodeIdx, lsnrCfg));
+ }
+ else {
+ ContinuousQuery<QueryTestKey, QueryTestValue> qry = new ContinuousQuery<>();
+
+ qry.setLocalListener(new CacheEntryUpdatedListener<QueryTestKey, QueryTestValue>() {
+ @Override public void onUpdated(Iterable<CacheEntryEvent<? extends QueryTestKey,
+ ? extends QueryTestValue>> evts) throws CacheEntryListenerException {
+ for (CacheEntryEvent<?, ?> evt : evts)
+ evtsQueue.add(evt);
+ }
+ });
+
+ qry.setRemoteFilterFactory(new FilterFactory());
+
+ QueryCursor<?> cur = grid(nodeIdx).cache(cacheName).query(qry);
+
+ curs.add(cur);
+ }
+
+ return evtsQueue;
+ }
+
+ /**
* @param rnd Random generator.
* @param evtsQueues Events queue.
* @param expData Expected cache data.
@@ -927,49 +429,6 @@ public class CacheContinuousQueryFactoryFilterTest extends GridCommonAbstractTes
break;
}
- case 11: {
- SortedMap<Object, Object> vals = new TreeMap<>();
-
- while (vals.size() < KEYS / 5)
- vals.put(new QueryTestKey(rnd.nextInt(KEYS)), value(rnd));
-
- cache.putAll(vals);
-
- if (tx != null)
- tx.commit();
-
- for (Map.Entry<Object, Object> e : vals.entrySet())
- updatePartitionCounter(cache, e.getKey(), partCntr);
-
- waitAndCheckEvent(evtsQueues, partCntr, affinity(cache), vals, expData);
-
- expData.putAll(vals);
-
- break;
- }
-
- case 12: {
- SortedMap<Object, Object> vals = new TreeMap<>();
-
- while (vals.size() < KEYS / 5)
- vals.put(new QueryTestKey(rnd.nextInt(KEYS)), newVal);
-
- cache.invokeAll(vals.keySet(), new EntrySetValueProcessor(newVal, rnd.nextBoolean()));
-
- if (tx != null)
- tx.commit();
-
- for (Map.Entry<Object, Object> e : vals.entrySet())
- updatePartitionCounter(cache, e.getKey(), partCntr);
-
- waitAndCheckEvent(evtsQueues, partCntr, affinity(cache), vals, expData);
-
- for (Object o : vals.keySet())
- expData.put(o, newVal);
-
- break;
- }
-
default:
fail("Op:" + op);
}
@@ -980,76 +439,6 @@ public class CacheContinuousQueryFactoryFilterTest extends GridCommonAbstractTes
}
/**
- * @param evtsQueues Queue.
- * @param partCntrs Counters.
- * @param aff Affinity.
- * @param vals Values.
- * @param expData Expected data.
- */
- private void waitAndCheckEvent(List<BlockingQueue<CacheEntryEvent<?, ?>>> evtsQueues,
- Map<Integer, Long> partCntrs,
- Affinity<Object> aff,
- SortedMap<Object, Object> vals,
- Map<Object, Object> expData)
- throws Exception {
- Map<Object, Object> vals0 = new HashMap<>(vals);
-
- for (Map.Entry<Object, Object> e : vals0.entrySet()) {
- if (!isAccepted((QueryTestValue)e.getValue()))
- vals.remove(e.getKey());
- }
-
- for (BlockingQueue<CacheEntryEvent<?, ?>> evtsQueue : evtsQueues) {
- Map<Object, CacheEntryEvent> rcvEvts = new HashMap<>();
-
- for (int i = 0; i < vals.size(); i++) {
- CacheEntryEvent<?, ?> evt = evtsQueue.poll(5, SECONDS);
-
- try {
- assertNotNull(evt);
- }
- catch (Throwable e) {
- int z = 0;
-
- ++z;
- }
-
- rcvEvts.put(evt.getKey(), evt);
- }
-
- assertEquals(vals.size(), rcvEvts.size());
-
- for (Map.Entry<Object, Object> e : vals.entrySet()) {
- Object key = e.getKey();
- Object val = e.getValue();
- Object oldVal = expData.get(key);
-
- if (val == null && oldVal == null) {
- checkNoEvent(evtsQueues);
-
- continue;
- }
-
- CacheEntryEvent evt = rcvEvts.get(key);
-
- assertNotNull("Failed to wait for event [key=" + key + ", val=" + val + ", oldVal=" + oldVal + ']',
- evt);
- assertEquals(key, evt.getKey());
- assertEquals(val, evt.getValue());
- assertEquals(oldVal, evt.getOldValue());
-
- long cntr = partCntrs.get(aff.partition(key));
- CacheQueryEntryEvent qryEntryEvt = (CacheQueryEntryEvent)evt.unwrap(CacheQueryEntryEvent.class);
-
- assertNotNull(cntr);
- assertNotNull(qryEntryEvt);
-
- assertEquals(cntr, qryEntryEvt.getPartitionUpdateCounter());
- }
- }
- }
-
- /**
* @param rnd {@link Random}.
* @return {@link TransactionIsolation}.
*/
@@ -1153,230 +542,99 @@ public class CacheContinuousQueryFactoryFilterTest extends GridCommonAbstractTes
/**
*
- * @param cacheMode Cache mode.
- * @param backups Number of backups.
- * @param atomicityMode Cache atomicity mode.
- * @param memoryMode Cache memory mode.
- * @param store If {@code true} configures dummy cache store.
- * @return Cache configuration.
- */
- private CacheConfiguration<Object, Object> cacheConfiguration(
- CacheMode cacheMode,
- int backups,
- CacheAtomicityMode atomicityMode,
- CacheMemoryMode memoryMode,
- boolean store) {
- CacheConfiguration<Object, Object> ccfg = new CacheConfiguration<>();
-
- ccfg.setAtomicityMode(atomicityMode);
- ccfg.setCacheMode(cacheMode);
- ccfg.setMemoryMode(memoryMode);
- ccfg.setWriteSynchronizationMode(FULL_SYNC);
- ccfg.setAtomicWriteOrderMode(PRIMARY);
-
- if (cacheMode == PARTITIONED)
- ccfg.setBackups(backups);
-
- if (store) {
- ccfg.setCacheStoreFactory(new TestStoreFactory());
- ccfg.setReadThrough(true);
- ccfg.setWriteThrough(true);
- }
-
- return ccfg;
- }
-
- /**
- *
- */
- private static class TestStoreFactory implements Factory<CacheStore<Object, Object>> {
- /** {@inheritDoc} */
- @SuppressWarnings("unchecked")
- @Override public CacheStore<Object, Object> create() {
- return new CacheStoreAdapter() {
- @Override public Object load(Object key) throws CacheLoaderException {
- return null;
- }
-
- @Override public void write(Cache.Entry entry) throws CacheWriterException {
- // No-op.
- }
-
- @Override public void delete(Object key) throws CacheWriterException {
- // No-op.
- }
- };
- }
- }
-
- /**
- *
*/
- static class QueryTestKey implements Serializable, Comparable {
+ protected static class NonSerializableFilter
+ implements CacheEntryEventSerializableFilter<QueryTestKey, QueryTestValue>, Externalizable {
/** */
- private final Integer key;
-
- /**
- * @param key Key.
- */
- public QueryTestKey(Integer key) {
- this.key = key;
+ public NonSerializableFilter() {
+ // No-op.
}
/** {@inheritDoc} */
- @Override public boolean equals(Object o) {
- if (this == o)
- return true;
-
- if (o == null || getClass() != o.getClass())
- return false;
-
- QueryTestKey that = (QueryTestKey)o;
-
- return key.equals(that.key);
+ @Override public boolean evaluate(CacheEntryEvent<? extends QueryTestKey, ? extends QueryTestValue> event)
+ throws CacheEntryListenerException {
+ return isAccepted(event.getValue());
}
/** {@inheritDoc} */
- @Override public int hashCode() {
- return key.hashCode();
+ @Override public void writeExternal(ObjectOutput out) throws IOException {
+ fail("Entry filter should not be marshaled.");
}
/** {@inheritDoc} */
- @Override public String toString() {
- return S.toString(QueryTestKey.class, this);
+ @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+ fail("Entry filter should not be marshaled.");
}
- /** {@inheritDoc} */
- @Override public int compareTo(Object o) {
- return key - ((QueryTestKey)o).key;
+ /**
+ * @return {@code True} if value is even.
+ */
+ public static boolean isAccepted(QueryTestValue val) {
+ return val == null || val.val1 % 2 == 0;
}
}
/**
*
*/
- static class QueryTestValue implements Serializable {
- /** */
- private final Integer val1;
-
- /** */
- private final String val2;
-
- /**
- * @param val Value.
- */
- public QueryTestValue(Integer val) {
- this.val1 = val;
- this.val2 = String.valueOf(val);
- }
-
- /** {@inheritDoc} */
- @Override public boolean equals(Object o) {
- if (this == o)
- return true;
-
- if (o == null || getClass() != o.getClass())
- return false;
-
- QueryTestValue that = (QueryTestValue) o;
-
- return val1.equals(that.val1) && val2.equals(that.val2);
- }
-
- /** {@inheritDoc} */
- @Override public int hashCode() {
- int res = val1.hashCode();
-
- res = 31 * res + val2.hashCode();
-
- return res;
- }
-
- /** {@inheritDoc} */
- @Override public String toString() {
- return S.toString(QueryTestValue.class, this);
+ protected static class FilterFactory implements Factory<NonSerializableFilter> {
+ @Override public NonSerializableFilter create() {
+ return new NonSerializableFilter();
}
}
/**
*
*/
- protected static class EntrySetValueProcessor implements EntryProcessor<Object, Object, Object> {
+ public abstract class LocalNonSerialiseListener implements
+ CacheEntryUpdatedListener<QueryTestKey, QueryTestValue>,
+ CacheEntryCreatedListener<QueryTestKey, QueryTestValue>,
+ CacheEntryExpiredListener<QueryTestKey, QueryTestValue>,
+ CacheEntryRemovedListener<QueryTestKey, QueryTestValue>,
+ Externalizable {
/** */
- private Object val;
-
- /** */
- private boolean retOld;
-
- /**
- * @param val Value to set.
- * @param retOld Return old value flag.
- */
- public EntrySetValueProcessor(Object val, boolean retOld) {
- this.val = val;
- this.retOld = retOld;
+ public LocalNonSerialiseListener() {
+ // No-op.
}
/** {@inheritDoc} */
- @Override public Object process(MutableEntry<Object, Object> e, Object... args) {
- Object old = retOld ? e.getValue() : null;
-
- if (val != null)
- e.setValue(val);
- else
- e.remove();
-
- return old;
+ @Override public void onCreated(Iterable<CacheEntryEvent<? extends QueryTestKey,
+ ? extends QueryTestValue>> evts) throws CacheEntryListenerException {
+ onEvents(evts);
}
/** {@inheritDoc} */
- @Override public String toString() {
- return S.toString(EntrySetValueProcessor.class, this);
+ @Override public void onExpired(Iterable<CacheEntryEvent<? extends QueryTestKey,
+ ? extends QueryTestValue>> evts) throws CacheEntryListenerException {
+ onEvents(evts);
}
- }
-
- /**
- *
- */
- protected enum ContinuousDeploy {
- CLIENT, SERVER, ALL
- }
- /**
- *
- */
- protected static class NonSerializableFilter
- implements CacheEntryEventSerializableFilter<QueryTestKey, QueryTestValue>, Externalizable {
/** {@inheritDoc} */
- @Override public boolean evaluate(CacheEntryEvent<? extends QueryTestKey, ? extends QueryTestValue> event)
- throws CacheEntryListenerException {
- return isAccepted(event.getValue());
- }
-
- /** {@inheritDoc} */
- @Override public void writeExternal(ObjectOutput out) throws IOException {
- fail("Entry filter should not be marshaled.");
+ @Override public void onRemoved(Iterable<CacheEntryEvent<? extends QueryTestKey,
+ ? extends QueryTestValue>> evts) throws CacheEntryListenerException {
+ onEvents(evts);
}
/** {@inheritDoc} */
- @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
- fail("Entry filter should not be marshaled.");
+ @Override public void onUpdated(Iterable<CacheEntryEvent<? extends QueryTestKey,
+ ? extends QueryTestValue>> evts) throws CacheEntryListenerException {
+ onEvents(evts);
}
/**
- * @return {@code True} if value is even.
+ * @param evts Events.
*/
- public static boolean isAccepted(QueryTestValue val) {
- return val == null ? true : val.val1 % 2 == 0;
+ protected abstract void onEvents(Iterable<CacheEntryEvent<? extends QueryTestKey,
+ ? extends QueryTestValue>> evts);
+
+ /** {@inheritDoc} */
+ @Override public void writeExternal(ObjectOutput out) throws IOException {
+ throw new UnsupportedOperationException("Failed. Listener should not be marshaled.");
}
- }
- /**
- *
- */
- protected static class FilterFactory implements Factory<NonSerializableFilter> {
- @Override public NonSerializableFilter create() {
- return new NonSerializableFilter();
+ /** {@inheritDoc} */
+ @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+ throw new UnsupportedOperationException("Failed. Listener should not be unmarshaled.");
}
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/9ad476b2/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryOperationP2PTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryOperationP2PTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryOperationP2PTest.java
new file mode 100644
index 0000000..ff8d0a7
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryOperationP2PTest.java
@@ -0,0 +1,394 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.query.continuous;
+
+import java.io.Serializable;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
+import javax.cache.configuration.Factory;
+import javax.cache.configuration.FactoryBuilder;
+import javax.cache.configuration.MutableCacheEntryListenerConfiguration;
+import javax.cache.event.CacheEntryEvent;
+import javax.cache.event.CacheEntryEventFilter;
+import javax.cache.event.CacheEntryListener;
+import javax.cache.event.CacheEntryListenerException;
+import javax.cache.event.CacheEntryUpdatedListener;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.cache.CacheEntryEventSerializableFilter;
+import org.apache.ignite.cache.CacheMemoryMode;
+import org.apache.ignite.cache.CacheMode;
+import org.apache.ignite.cache.query.ContinuousQuery;
+import org.apache.ignite.cache.query.QueryCursor;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+
+import static org.apache.ignite.cache.CacheAtomicWriteOrderMode.PRIMARY;
+import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC;
+import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
+import static org.apache.ignite.cache.CacheMemoryMode.ONHEAP_TIERED;
+import static org.apache.ignite.cache.CacheMode.PARTITIONED;
+import static org.apache.ignite.cache.CacheMode.REPLICATED;
+import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
+
+/**
+ *
+ */
+public class CacheContinuousQueryOperationP2PTest extends GridCommonAbstractTest {
+ /** */
+ private static TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true);
+
+ /** */
+ private static final int NODES = 5;
+
+ /** */
+ private static final int KEYS = 50;
+
+ /** */
+ private static final int VALS = 10;
+
+ /** */
+ public static final int ITERATION_CNT = 100;
+
+ /** */
+ private boolean client;
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+ ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(ipFinder);
+
+ cfg.setClientMode(client);
+ cfg.setPeerClassLoadingEnabled(true);
+
+ return cfg;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void beforeTestsStarted() throws Exception {
+ super.beforeTestsStarted();
+
+ startGridsMultiThreaded(NODES - 1);
+
+ client = true;
+
+ startGrid(NODES - 1);
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTestsStopped() throws Exception {
+ stopAllGrids();
+
+ super.afterTestsStopped();
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testAtomicClient() throws Exception {
+ CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED,
+ 1,
+ ATOMIC,
+ ONHEAP_TIERED
+ );
+
+ testContinuousQuery(ccfg, true);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testAtomic() throws Exception {
+ CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED,
+ 1,
+ ATOMIC,
+ ONHEAP_TIERED
+ );
+
+ testContinuousQuery(ccfg, false);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testAtomicReplicated() throws Exception {
+ CacheConfiguration<Object, Object> ccfg = cacheConfiguration(REPLICATED,
+ 0,
+ ATOMIC,
+ ONHEAP_TIERED
+ );
+
+ testContinuousQuery(ccfg, false);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testAtomicReplicatedClient() throws Exception {
+ CacheConfiguration<Object, Object> ccfg = cacheConfiguration(REPLICATED,
+ 0,
+ ATOMIC,
+ ONHEAP_TIERED
+ );
+
+ testContinuousQuery(ccfg, true);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testTx() throws Exception {
+ CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED,
+ 1,
+ TRANSACTIONAL,
+ ONHEAP_TIERED
+ );
+
+ testContinuousQuery(ccfg, false);
+ }
+ /**
+ * @throws Exception If failed.
+ */
+ public void testTxClient() throws Exception {
+ CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED,
+ 1,
+ TRANSACTIONAL,
+ ONHEAP_TIERED
+ );
+
+ testContinuousQuery(ccfg, true);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testTxReplicated() throws Exception {
+ CacheConfiguration<Object, Object> ccfg = cacheConfiguration(REPLICATED,
+ 0,
+ TRANSACTIONAL,
+ ONHEAP_TIERED
+ );
+
+ testContinuousQuery(ccfg, false);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testTxReplicatedClient() throws Exception {
+ CacheConfiguration<Object, Object> ccfg = cacheConfiguration(REPLICATED,
+ 0,
+ TRANSACTIONAL,
+ ONHEAP_TIERED
+ );
+
+ testContinuousQuery(ccfg, true);
+ }
+
+ /**
+ * @param ccfg Cache configuration.
+ * @param isClient Client.
+ * @throws Exception If failed.
+ */
+ protected void testContinuousQuery(CacheConfiguration<Object, Object> ccfg, boolean isClient)
+ throws Exception {
+ ignite(0).createCache(ccfg);
+
+ ThreadLocalRandom rnd = ThreadLocalRandom.current();
+
+ QueryCursor<?> cur = null;
+
+ final Class<Factory<CacheEntryEventFilter>> evtFilterFactory =
+ (Class<Factory<CacheEntryEventFilter>>)getExternalClassLoader().
+ loadClass("org.apache.ignite.tests.p2p.CacheDeploymentEntryEventFilterFactory");
+
+ final CountDownLatch latch = new CountDownLatch(10);
+
+ ContinuousQuery<Integer, Integer> qry = new ContinuousQuery<>();
+
+ CacheEntryUpdatedListener<Integer, Integer> localLsnr = new CacheEntryUpdatedListener<Integer, Integer>() {
+ @Override public void onUpdated(Iterable<CacheEntryEvent<? extends Integer,
+ ? extends Integer>> evts) throws CacheEntryListenerException {
+ for (CacheEntryEvent<? extends Integer, ? extends Integer> evt : evts)
+ latch.countDown();
+ }
+ };
+
+ MutableCacheEntryListenerConfiguration<Integer, Integer> lsnrCfg =
+ new MutableCacheEntryListenerConfiguration<>(
+ new FactoryBuilder.SingletonFactory<>(localLsnr),
+ (Factory<? extends CacheEntryEventFilter<? super Integer, ? super Integer>>)
+ (Object)evtFilterFactory.newInstance(),
+ true,
+ true
+ );
+
+ qry.setLocalListener(localLsnr);
+
+ qry.setRemoteFilterFactory(
+ (Factory<? extends CacheEntryEventFilter<Integer, Integer>>)(Object)evtFilterFactory.newInstance());
+
+ IgniteCache<Integer, Integer> cache = null;
+
+ try {
+ if (isClient)
+ cache = grid(NODES - 1).cache(ccfg.getName());
+ else
+ cache = grid(rnd.nextInt(NODES - 1)).cache(ccfg.getName());
+
+ //cur = cache.query(qry);
+
+ cache.registerCacheEntryListener(lsnrCfg);
+
+ for (int i = 0; i < 10; i++)
+ cache.put(i, i);
+
+ assertTrue(latch.await(3, TimeUnit.SECONDS));
+ }
+ finally {
+ if (cur != null)
+ cur.close();
+
+ if (cache != null)
+ cache.deregisterCacheEntryListener(lsnrCfg);
+ }
+ }
+
+ /**
+ *
+ * @param cacheMode Cache mode.
+ * @param backups Number of backups.
+ * @param atomicityMode Cache atomicity mode.
+ * @param memoryMode Cache memory mode.
+ * @return Cache configuration.
+ */
+ private CacheConfiguration<Object, Object> cacheConfiguration(
+ CacheMode cacheMode,
+ int backups,
+ CacheAtomicityMode atomicityMode,
+ CacheMemoryMode memoryMode) {
+ CacheConfiguration<Object, Object> ccfg = new CacheConfiguration<>();
+
+ ccfg.setAtomicityMode(atomicityMode);
+ ccfg.setCacheMode(cacheMode);
+ ccfg.setMemoryMode(memoryMode);
+ ccfg.setWriteSynchronizationMode(FULL_SYNC);
+ ccfg.setAtomicWriteOrderMode(PRIMARY);
+
+ if (cacheMode == PARTITIONED)
+ ccfg.setBackups(backups);
+
+ return ccfg;
+ }
+
+ /**
+ *
+ */
+ public static class QueryTestKey implements Serializable, Comparable {
+ /** */
+ private final Integer key;
+
+ /**
+ * @param key Key.
+ */
+ public QueryTestKey(Integer key) {
+ this.key = key;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean equals(Object o) {
+ if (this == o)
+ return true;
+
+ if (o == null || getClass() != o.getClass())
+ return false;
+
+ QueryTestKey that = (QueryTestKey)o;
+
+ return key.equals(that.key);
+ }
+
+ /** {@inheritDoc} */
+ @Override public int hashCode() {
+ return key.hashCode();
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(QueryTestKey.class, this);
+ }
+
+ /** {@inheritDoc} */
+ @Override public int compareTo(Object o) {
+ return key - ((QueryTestKey)o).key;
+ }
+ }
+
+ /**
+ *
+ */
+ public static class QueryTestValue implements Serializable {
+ /** */
+ protected final Integer val1;
+
+ /** */
+ protected final String val2;
+
+ /**
+ * @param val Value.
+ */
+ public QueryTestValue(Integer val) {
+ this.val1 = val;
+ this.val2 = String.valueOf(val);
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean equals(Object o) {
+ if (this == o)
+ return true;
+
+ if (o == null || getClass() != o.getClass())
+ return false;
+
+ QueryTestValue that = (QueryTestValue) o;
+
+ return val1.equals(that.val1) && val2.equals(that.val2);
+ }
+
+ /** {@inheritDoc} */
+ @Override public int hashCode() {
+ int res = val1.hashCode();
+
+ res = 31 * res + val2.hashCode();
+
+ return res;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(QueryTestValue.class, this);
+ }
+ }
+}