You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by dg...@apache.org on 2019/07/02 13:47:20 UTC
[ignite] branch master updated: IGNITE-3653 P2P doesn't work for
remote filter and filter factory - Fixes #4566.
This is an automated email from the ASF dual-hosted git repository.
dgovorukhin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new 2cea7e2 IGNITE-3653 P2P doesn't work for remote filter and filter factory - Fixes #4566.
2cea7e2 is described below
commit 2cea7e245927e8e54b93f7adea7524ce68c6d226
Author: Denis Mekhanikov <dm...@gmail.com>
AuthorDate: Tue Jul 2 16:47:07 2019 +0300
IGNITE-3653 P2P doesn't work for remote filter and filter factory - Fixes #4566.
Signed-off-by: Dmitriy Govorukhin <dm...@gmail.com>
---
.../apache/ignite/cache/query/ContinuousQuery.java | 50 ++--
.../ignite/internal/GridEventConsumeHandler.java | 59 ++++-
.../ignite/internal/GridMessageListenHandler.java | 56 ++---
.../apache/ignite/internal/IgniteEventsImpl.java | 3 +
.../ignite/internal/IgniteMessagingImpl.java | 3 +
.../CacheContinuousQueryDeployableObject.java | 8 +
.../continuous/CacheContinuousQueryHandler.java | 260 +++++++++++++++-----
.../continuous/CacheContinuousQueryHandlerV2.java | 20 +-
.../continuous/CacheContinuousQueryHandlerV3.java | 22 +-
.../continuous/CacheContinuousQueryListener.java | 5 -
.../continuous/CacheContinuousQueryManager.java | 20 +-
.../continuous/GridContinuousProcessor.java | 266 +++++++++++----------
.../CacheContinuousQueryLongP2PTest.java | 154 ++++++++++++
.../CacheContinuousQueryOperationP2PTest.java | 166 ++++++++++---
...ousQueryDeserializationErrorOnNodeJoinTest.java | 110 +++++++++
.../ContinuousQueryPeerClassLoadingTest.java | 11 +-
...GridCacheContinuousQueryNodesFilteringTest.java | 20 +-
.../testsuites/IgniteCacheQuerySelfTestSuite6.java | 2 +
.../zk/ZookeeperDiscoverySpiTestSuite3.java | 6 +-
19 files changed, 908 insertions(+), 333 deletions(-)
diff --git a/modules/core/src/main/java/org/apache/ignite/cache/query/ContinuousQuery.java b/modules/core/src/main/java/org/apache/ignite/cache/query/ContinuousQuery.java
index 0d1444b..f898365 100644
--- a/modules/core/src/main/java/org/apache/ignite/cache/query/ContinuousQuery.java
+++ b/modules/core/src/main/java/org/apache/ignite/cache/query/ContinuousQuery.java
@@ -30,22 +30,22 @@ import org.apache.ignite.lang.IgniteAsyncCallback;
/**
* API for configuring continuous cache queries.
* <p>
- * Continuous queries allow to register a remote filter and a local listener
+ * Continuous queries allow registering a remote filter and a local listener
* for cache updates. If an update event passes the filter, it will be sent to
- * the node that executed the query and local listener will be notified.
+ * the node that executed the query, and local listener will be notified.
* <p>
- * Additionally, you can execute initial query to get currently existing data.
+ * Additionally, you can execute an initial query to get currently existing data.
* Query can be of any type (SQL, TEXT or SCAN) and can be set via {@link #setInitialQuery(Query)}
* method.
* <p>
* Query can be executed either on all nodes in topology using {@link IgniteCache#query(Query)}
* method, or only on the local node, if {@link Query#setLocal(boolean)} parameter is set to {@code true}.
- * Note that in case query is distributed and a new node joins, it will get the remote
- * filter for the query during discovery process before it actually joins topology,
+ * Note that if the query is distributed and a new node joins, it will get the remote
+ * filter for the query during discovery process before it actually joins a topology,
* so no updates will be missed.
* <h1 class="header">Example</h1>
- * As an example, suppose we have cache with {@code 'Person'} objects and we need
- * to query all persons with salary above 1000.
+ * As an example, suppose we have a cache with {@code 'Person'} objects and we need
+ * to query for all people with salary above 1000.
* <p>
* Here is the {@code Person} class:
* <pre name="code" class="java">
@@ -60,17 +60,17 @@ import org.apache.ignite.lang.IgniteAsyncCallback;
* }
* </pre>
* <p>
- * You can create and execute continuous query like so:
+ * You can create and execute a continuous query like so:
* <pre name="code" class="java">
- * // Create new continuous query.
+ * // Create a new continuous query.
* ContinuousQuery<Long, Person> qry = new ContinuousQuery<>();
*
- * // Initial iteration query will return all persons with salary above 1000.
+ * // Initial iteration query will return all people with salary above 1000.
* qry.setInitialQuery(new ScanQuery<>((id, p) -> p.getSalary() > 1000));
*
*
* // Callback that is called locally when update notifications are received.
- * // It simply prints out information about all created persons.
+ * // It simply prints out information about all created or modified records.
* qry.setLocalListener((evts) -> {
* for (CacheEntryEvent<? extends Long, ? extends Person> e : evts) {
* Person p = e.getValue();
@@ -79,29 +79,29 @@ import org.apache.ignite.lang.IgniteAsyncCallback;
* }
* });
*
- * // Continuous listener will be notified for persons with salary above 1000.
+ * // The continuous listener will be notified for people with salary above 1000.
* qry.setRemoteFilter(evt -> evt.getValue().getSalary() > 1000);
*
- * // Execute query and get cursor that iterates through initial data.
+ * // Execute the query and get a cursor that iterates through the initial data.
* QueryCursor<Cache.Entry<Long, Person>> cur = cache.query(qry);
* </pre>
- * This will execute query on all nodes that have cache you are working with and
- * listener will start to receive notifications for cache updates.
+ * This will execute query on all nodes that have the cache you are working with and
+ * listener will start receiving notifications for cache updates.
* <p>
* To stop receiving updates call {@link QueryCursor#close()} method:
* <pre name="code" class="java">
* cur.close();
* </pre>
- * Note that this works even if you didn't provide initial query. Cursor will
+ * Note that this works even if you didn't provide the initial query. Cursor will
* be empty in this case, but it will still unregister listeners when {@link QueryCursor#close()}
* is called.
* <p>
* {@link IgniteAsyncCallback} annotation is supported for {@link CacheEntryEventFilter}
* (see {@link #setRemoteFilterFactory(Factory)}) and {@link CacheEntryUpdatedListener}
* (see {@link #setLocalListener(CacheEntryUpdatedListener)}).
- * If filter and/or listener are annotated with {@link IgniteAsyncCallback} then annotated callback
- * is executed in async callback pool (see {@link IgniteConfiguration#getAsyncCallbackPoolSize()})
- * and notification order is kept the same as update order for given cache key.
+ * If a filter and/or listener are annotated with {@link IgniteAsyncCallback} then the annotated callback
+ * is executed in an async callback pool (see {@link IgniteConfiguration#getAsyncCallbackPoolSize()})
+ * and a notification order is kept the same as an update order for a given cache key.
*
* @see ContinuousQueryWithTransformer
* @see IgniteAsyncCallback
@@ -130,10 +130,10 @@ public final class ContinuousQuery<K, V> extends AbstractContinuousQuery<K, V> {
}
/**
- * Sets local callback. This callback is called only in local node when new updates are received.
+ * Sets a local callback. This callback is called only on local node when new updates are received.
* <p>
- * The callback predicate accepts ID of the node from where updates are received and collection
- * of received entries. Note that for removed entries value will be {@code null}.
+ * The callback predicate accepts ID of the node from where updates are received and a collection
+ * of the received entries. Note that for removed entries values will be {@code null}.
* <p>
* If the predicate returns {@code false}, query execution will be cancelled.
* <p>
@@ -141,7 +141,7 @@ public final class ContinuousQuery<K, V> extends AbstractContinuousQuery<K, V> {
* synchronization or transactional cache operations), should be executed asynchronously without
* blocking the thread that called the callback. Otherwise, you can get deadlocks.
* <p>
- * If local listener are annotated with {@link IgniteAsyncCallback} then it is executed in async callback pool
+ * If local listener are annotated with {@link IgniteAsyncCallback} then it is executed in an async callback pool
* (see {@link IgniteConfiguration#getAsyncCallbackPoolSize()}) that allow to perform a cache operations.
*
* @param locLsnr Local callback.
@@ -157,8 +157,6 @@ public final class ContinuousQuery<K, V> extends AbstractContinuousQuery<K, V> {
}
/**
- * Gets local listener.
- *
* @return Local listener.
*/
public CacheEntryUpdatedListener<K, V> getLocalListener() {
@@ -214,7 +212,7 @@ public final class ContinuousQuery<K, V> extends AbstractContinuousQuery<K, V> {
}
/**
- * Sets whether this query should be executed on local node only.
+ * Sets whether this query should be executed on a local node only.
*
* Note: backup event queues are not kept for local continuous queries. It may lead to loss of notifications in case
* of node failures. Use {@link ContinuousQuery#setRemoteFilterFactory(Factory)} to register cache event listeners
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridEventConsumeHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/GridEventConsumeHandler.java
index e15cdd0..017a1b8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/GridEventConsumeHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/GridEventConsumeHandler.java
@@ -45,6 +45,8 @@ import org.apache.ignite.internal.processors.continuous.GridContinuousBatch;
import org.apache.ignite.internal.processors.continuous.GridContinuousBatchAdapter;
import org.apache.ignite.internal.processors.continuous.GridContinuousHandler;
import org.apache.ignite.internal.processors.platform.PlatformEventFilterListener;
+import org.apache.ignite.internal.util.future.GridFinishedFuture;
+import org.apache.ignite.internal.util.future.GridFutureAdapter;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.P2;
import org.apache.ignite.internal.util.typedef.T2;
@@ -92,6 +94,9 @@ class GridEventConsumeHandler implements GridContinuousHandler {
/** Listener. */
private GridLocalEventListener lsnr;
+ /** P2P unmarshalling future. */
+ private IgniteInternalFuture<Void> p2pUnmarshalFut = new GridFinishedFuture<>();
+
/**
* Required by {@link Externalizable}.
*/
@@ -147,6 +152,21 @@ class GridEventConsumeHandler implements GridContinuousHandler {
return Collections.emptyMap();
}
+ /**
+ * Performs remote filter initialization.
+ *
+ * @param filter Remote filter.
+ * @param ctx Kernal context.
+ * @throws IgniteCheckedException In case if initialization failed.
+ */
+ private void initFilter(IgnitePredicate<Event> filter, GridKernalContext ctx) throws IgniteCheckedException {
+ if (filter != null)
+ ctx.resource().injectGeneric(filter);
+
+ if (filter instanceof PlatformEventFilterListener)
+ ((PlatformEventFilterListener)filter).initialize(ctx);
+ }
+
/** {@inheritDoc} */
@Override public RegisterStatus register(final UUID nodeId, final UUID routineId, final GridKernalContext ctx)
throws IgniteCheckedException {
@@ -157,12 +177,6 @@ class GridEventConsumeHandler implements GridContinuousHandler {
if (cb != null)
ctx.resource().injectGeneric(cb);
- if (filter != null)
- ctx.resource().injectGeneric(filter);
-
- if (filter instanceof PlatformEventFilterListener)
- ((PlatformEventFilterListener)filter).initialize(ctx);
-
final boolean loc = nodeId.equals(ctx.localNodeId());
lsnr = new GridLocalEventListener() {
@@ -262,7 +276,18 @@ class GridEventConsumeHandler implements GridContinuousHandler {
if (F.isEmpty(types))
types = EVTS_ALL;
- ctx.event().addLocalEventListener(lsnr, types);
+ p2pUnmarshalFut.listen((fut) -> {
+ if (fut.error() == null) {
+ try {
+ initFilter(filter, ctx);
+ }
+ catch (IgniteCheckedException e) {
+ throw F.wrap(e);
+ }
+
+ ctx.event().addLocalEventListener(lsnr, types);
+ }
+ });
return RegisterStatus.REGISTERED;
}
@@ -387,13 +412,22 @@ class GridEventConsumeHandler implements GridContinuousHandler {
assert ctx.config().isPeerClassLoadingEnabled();
if (filterBytes != null) {
- GridDeployment dep = ctx.deploy().getGlobalDeployment(depInfo.deployMode(), clsName, clsName,
- depInfo.userVersion(), nodeId, depInfo.classLoaderId(), depInfo.participants(), null);
+ try {
+ GridDeployment dep = ctx.deploy().getGlobalDeployment(depInfo.deployMode(), clsName, clsName,
+ depInfo.userVersion(), nodeId, depInfo.classLoaderId(), depInfo.participants(), null);
- if (dep == null)
- throw new IgniteDeploymentCheckedException("Failed to obtain deployment for class: " + clsName);
+ if (dep == null)
+ throw new IgniteDeploymentCheckedException("Failed to obtain deployment for class: " + clsName);
+
+ filter = U.unmarshal(ctx, filterBytes, U.resolveClassLoader(dep.classLoader(), ctx.config()));
- filter = U.unmarshal(ctx, filterBytes, U.resolveClassLoader(dep.classLoader(), ctx.config()));
+ ((GridFutureAdapter)p2pUnmarshalFut).onDone();
+ }
+ catch (IgniteCheckedException e) {
+ ((GridFutureAdapter)p2pUnmarshalFut).onDone(e);
+
+ throw e;
+ }
}
}
@@ -454,6 +488,7 @@ class GridEventConsumeHandler implements GridContinuousHandler {
boolean b = in.readBoolean();
if (b) {
+ p2pUnmarshalFut = new GridFutureAdapter<>();
filterBytes = U.readByteArray(in);
clsName = U.readString(in);
depInfo = (GridDeploymentInfo)in.readObject();
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridMessageListenHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/GridMessageListenHandler.java
index a98ed50..83de474 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/GridMessageListenHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/GridMessageListenHandler.java
@@ -26,12 +26,15 @@ import java.util.Collections;
import java.util.Map;
import java.util.UUID;
import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteException;
import org.apache.ignite.internal.managers.deployment.GridDeployment;
import org.apache.ignite.internal.managers.deployment.GridDeploymentInfoBean;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.continuous.GridContinuousBatch;
import org.apache.ignite.internal.processors.continuous.GridContinuousBatchAdapter;
import org.apache.ignite.internal.processors.continuous.GridContinuousHandler;
+import org.apache.ignite.internal.util.future.GridFinishedFuture;
+import org.apache.ignite.internal.util.future.GridFutureAdapter;
import org.apache.ignite.internal.util.lang.GridPeerDeployAware;
import org.apache.ignite.internal.util.typedef.T2;
import org.apache.ignite.internal.util.typedef.internal.S;
@@ -67,6 +70,9 @@ public class GridMessageListenHandler implements GridContinuousHandler {
/** */
private boolean depEnabled;
+ /** P2P unmarshalling future. */
+ private IgniteInternalFuture<Void> p2pUnmarshalFut = new GridFinishedFuture<>();
+
/**
* Required by {@link Externalizable}.
*/
@@ -85,22 +91,6 @@ public class GridMessageListenHandler implements GridContinuousHandler {
this.pred = pred;
}
- /**
- *
- * @param orig Handler to be copied.
- */
- public GridMessageListenHandler(GridMessageListenHandler orig) {
- assert orig != null;
-
- this.clsName = orig.clsName;
- this.depInfo = orig.depInfo;
- this.pred = orig.pred;
- this.predBytes = orig.predBytes;
- this.topic = orig.topic;
- this.topicBytes = orig.topicBytes;
- this.depEnabled = false;
- }
-
/** {@inheritDoc} */
@Override public boolean isEvents() {
return false;
@@ -138,9 +128,11 @@ public class GridMessageListenHandler implements GridContinuousHandler {
}
/** {@inheritDoc} */
- @Override public RegisterStatus register(UUID nodeId, UUID routineId, final GridKernalContext ctx)
- throws IgniteCheckedException {
- ctx.io().addUserMessageListener(topic, pred, nodeId);
+ @Override public RegisterStatus register(UUID nodeId, UUID routineId, final GridKernalContext ctx) {
+ p2pUnmarshalFut.listen((fut) -> {
+ if (fut.error() == null)
+ ctx.io().addUserMessageListener(topic, pred, nodeId);
+ });
return RegisterStatus.REGISTERED;
}
@@ -186,18 +178,27 @@ public class GridMessageListenHandler implements GridContinuousHandler {
assert ctx != null;
assert ctx.config().isPeerClassLoadingEnabled();
- GridDeployment dep = ctx.deploy().getGlobalDeployment(depInfo.deployMode(), clsName, clsName,
- depInfo.userVersion(), nodeId, depInfo.classLoaderId(), depInfo.participants(), null);
+ try {
+ GridDeployment dep = ctx.deploy().getGlobalDeployment(depInfo.deployMode(), clsName, clsName,
+ depInfo.userVersion(), nodeId, depInfo.classLoaderId(), depInfo.participants(), null);
- if (dep == null)
- throw new IgniteDeploymentCheckedException("Failed to obtain deployment for class: " + clsName);
+ if (dep == null)
+ throw new IgniteDeploymentCheckedException("Failed to obtain deployment for class: " + clsName);
+
+ ClassLoader ldr = dep.classLoader();
- ClassLoader ldr = dep.classLoader();
+ if (topicBytes != null)
+ topic = U.unmarshal(ctx, topicBytes, U.resolveClassLoader(ldr, ctx.config()));
- if (topicBytes != null)
- topic = U.unmarshal(ctx, topicBytes, U.resolveClassLoader(ldr, ctx.config()));
+ pred = U.unmarshal(ctx, predBytes, U.resolveClassLoader(ldr, ctx.config()));
+ }
+ catch (IgniteCheckedException | IgniteException e) {
+ ((GridFutureAdapter)p2pUnmarshalFut).onDone(e);
+
+ throw e;
+ }
- pred = U.unmarshal(ctx, predBytes, U.resolveClassLoader(ldr, ctx.config()));
+ ((GridFutureAdapter)p2pUnmarshalFut).onDone();
}
/** {@inheritDoc} */
@@ -256,6 +257,7 @@ public class GridMessageListenHandler implements GridContinuousHandler {
depEnabled = in.readBoolean();
if (depEnabled) {
+ p2pUnmarshalFut = new GridFutureAdapter<>();
topicBytes = U.readByteArray(in);
predBytes = U.readByteArray(in);
clsName = U.readString(in);
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteEventsImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteEventsImpl.java
index c91792d..d683ecb 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteEventsImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteEventsImpl.java
@@ -172,6 +172,9 @@ public class IgniteEventsImpl extends AsyncSupportAdapter<IgniteEvents> implemen
autoUnsubscribe,
prj.predicate()));
}
+ catch (IgniteCheckedException e) {
+ throw U.convertException(e);
+ }
finally {
unguard();
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteMessagingImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteMessagingImpl.java
index e86e83a..6719379 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteMessagingImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteMessagingImpl.java
@@ -241,6 +241,9 @@ public class IgniteMessagingImpl extends AsyncSupportAdapter<IgniteMessaging>
false,
prj.predicate()));
}
+ catch (IgniteCheckedException e) {
+ throw U.convertException(e);
+ }
finally {
unguard();
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryDeployableObject.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryDeployableObject.java
index f888467..c4e4005 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryDeployableObject.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryDeployableObject.java
@@ -28,6 +28,8 @@ import org.apache.ignite.internal.IgniteDeploymentCheckedException;
import org.apache.ignite.internal.managers.deployment.GridDeployment;
import org.apache.ignite.internal.managers.deployment.GridDeploymentInfo;
import org.apache.ignite.internal.managers.deployment.GridDeploymentInfoBean;
+import org.apache.ignite.internal.util.tostring.GridToStringExclude;
+import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
/**
@@ -38,6 +40,7 @@ class CacheContinuousQueryDeployableObject implements Externalizable {
private static final long serialVersionUID = 0L;
/** Serialized object. */
+ @GridToStringExclude
private byte[] bytes;
/** Deployment class name. */
@@ -107,4 +110,9 @@ class CacheContinuousQueryDeployableObject implements Externalizable {
clsName = U.readString(in);
depInfo = (GridDeploymentInfo)in.readObject();
}
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(CacheContinuousQueryDeployableObject.class, this);
+ }
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
index 19eb3bb..34f66b2 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
@@ -33,10 +33,12 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import javax.cache.event.CacheEntryEvent;
import javax.cache.event.CacheEntryEventFilter;
+import javax.cache.event.CacheEntryListener;
import javax.cache.event.CacheEntryUpdatedListener;
import javax.cache.event.EventType;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.IgniteSystemProperties;
import org.apache.ignite.cache.CacheEntryEventSerializableFilter;
@@ -54,8 +56,8 @@ import org.apache.ignite.internal.processors.cache.GridCacheAdapter;
import org.apache.ignite.internal.processors.cache.GridCacheAffinityManager;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.GridCacheDeploymentManager;
-import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtLocalPartition;
import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicAbstractUpdateFuture;
+import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtLocalPartition;
import org.apache.ignite.internal.processors.cache.query.CacheQueryType;
import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryManager.JCacheQueryLocalListener;
import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryManager.JCacheQueryRemoteFilter;
@@ -63,6 +65,8 @@ import org.apache.ignite.internal.processors.continuous.GridContinuousBatch;
import org.apache.ignite.internal.processors.continuous.GridContinuousHandler;
import org.apache.ignite.internal.processors.continuous.GridContinuousQueryBatch;
import org.apache.ignite.internal.processors.platform.cache.query.PlatformContinuousQueryFilter;
+import org.apache.ignite.internal.util.future.GridFinishedFuture;
+import org.apache.ignite.internal.util.future.GridFutureAdapter;
import org.apache.ignite.internal.util.typedef.CI1;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.T2;
@@ -114,6 +118,12 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
/** Topic for ordered messages. */
private Object topic;
+ /** P2P unmarshalling future. */
+ protected transient IgniteInternalFuture<Void> p2pUnmarshalFut = new GridFinishedFuture<>();
+
+ /** Initialization future. */
+ protected transient IgniteInternalFuture<Void> initFut;
+
/** Local listener. */
private transient CacheEntryUpdatedListener<K, V> locLsnr;
@@ -323,37 +333,26 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
assert routineId != null;
assert ctx != null;
- if (locLsnr != null) {
- if (locLsnr instanceof JCacheQueryLocalListener) {
- ctx.resource().injectGeneric(((JCacheQueryLocalListener)locLsnr).impl);
-
- asyncCb = ((JCacheQueryLocalListener)locLsnr).async();
- }
- else {
- ctx.resource().injectGeneric(locLsnr);
-
- asyncCb = U.hasAnnotation(locLsnr, IgniteAsyncCallback.class);
- }
- }
-
- final CacheEntryEventFilter filter = getEventFilter();
+ initLocalListener(locLsnr, ctx);
- if (filter != null) {
- if (filter instanceof JCacheQueryRemoteFilter) {
- if (((JCacheQueryRemoteFilter)filter).impl != null)
- ctx.resource().injectGeneric(((JCacheQueryRemoteFilter)filter).impl);
+ if (initFut == null) {
+ initFut = p2pUnmarshalFut.chain((fut) -> {
+ try {
+ fut.get();
- if (!asyncCb)
- asyncCb = ((JCacheQueryRemoteFilter)filter).async();
- }
- else {
- ctx.resource().injectGeneric(filter);
+ initRemoteFilter(getEventFilter0(), ctx);
+ }
+ catch (IgniteCheckedException e) {
+ throw new IgniteException("Failed to initialize a remote filter.", e);
+ }
- if (!asyncCb)
- asyncCb = U.hasAnnotation(filter, IgniteAsyncCallback.class);
- }
+ return null;
+ });
}
+ if (initFut.error() != null)
+ throw new IgniteCheckedException("Failed to initialize a continuous query.", initFut.error());
+
entryBufs = new ConcurrentHashMap<>();
ackBuf = new CacheContinuousQueryAcknowledgeBuffer();
@@ -373,29 +372,6 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
log = ctx.log(CU.CONTINUOUS_QRY_LOG_CATEGORY);
CacheContinuousQueryListener<K, V> lsnr = new CacheContinuousQueryListener<K, V>() {
- @Override public void onExecution() {
- GridCacheContext<K, V> cctx = cacheContext(ctx);
-
- if (cctx != null && cctx.events().isRecordable(EVT_CACHE_QUERY_EXECUTED)) {
- //noinspection unchecked
- ctx.event().record(new CacheQueryExecutedEvent<>(
- ctx.discovery().localNode(),
- "Continuous query executed.",
- EVT_CACHE_QUERY_EXECUTED,
- CacheQueryType.CONTINUOUS.name(),
- cacheName,
- null,
- null,
- null,
- filter instanceof CacheEntryEventSerializableFilter ?
- (CacheEntryEventSerializableFilter)filter : null,
- null,
- nodeId,
- taskName()
- ));
- }
- }
-
@Override public void onRegister() {
GridCacheContext<K, V> cctx = cacheContext(ctx);
@@ -453,8 +429,20 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
}
@Override public void onUnregister() {
- if (filter instanceof PlatformContinuousQueryFilter)
- ((PlatformContinuousQueryFilter)filter).onQueryUnregister();
+ try {
+ CacheEntryEventFilter filter = getEventFilter();
+
+ if (filter instanceof PlatformContinuousQueryFilter)
+ ((PlatformContinuousQueryFilter)filter).onQueryUnregister();
+ }
+ catch (IgniteCheckedException e) {
+ if (log.isDebugEnabled()) {
+ log.debug("Failed to execute the onUnregister callback " +
+ "on the continuoue query listener. " +
+ "[nodeId=" + nodeId + ", routineId=" + routineId + ", cacheName=" + cacheName +
+ ", err=" + e + "]");
+ }
+ }
}
@Override public void cleanupBackupQueue(Map<Integer, Long> updateCntrs) {
@@ -637,13 +625,112 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
if (mgr == null)
return RegisterStatus.DELAYED;
- return mgr.registerListener(routineId, lsnr, internal);
+ RegisterStatus regStatus = mgr.registerListener(routineId, lsnr, internal);
+
+ if (regStatus == RegisterStatus.REGISTERED)
+ initFut.listen(res -> sendQueryExecutedEvent());
+
+ return regStatus;
+ }
+
+ /**
+ * Fires continuous query execution event.
+ * @see org.apache.ignite.events.EventType#EVT_CACHE_QUERY_EXECUTED
+ */
+ private void sendQueryExecutedEvent() {
+ GridCacheContext<K, V> cctx = cacheContext(ctx);
+
+ CacheEntryEventFilter filter;
+ try {
+ filter = getEventFilter();
+ }
+ catch (IgniteCheckedException e) {
+ if (log.isDebugEnabled()) {
+ log.debug("Failed to trigger the continuoue query executed event. " +
+ "[routineId=" + routineId + ", cacheName=" + cacheName + ", err=" + e + "]");
+ }
+
+ return;
+ }
+
+ if (cctx != null && cctx.events().isRecordable(EVT_CACHE_QUERY_EXECUTED)) {
+ //noinspection unchecked
+ ctx.event().record(new CacheQueryExecutedEvent<K, V>(
+ ctx.discovery().localNode(),
+ "Continuous query executed.",
+ EVT_CACHE_QUERY_EXECUTED,
+ CacheQueryType.CONTINUOUS.name(),
+ cacheName,
+ null,
+ null,
+ null,
+ filter instanceof CacheEntryEventSerializableFilter ?
+ (CacheEntryEventSerializableFilter)filter : null,
+ null,
+ nodeId,
+ taskName()
+ ));
+ }
+ }
+
+ /**
+ * Performs resource injection and checks asynchrony for the provided local listener.
+ *
+ * @param lsnr Local listener.
+ * @param ctx Kernal context.
+ * @throws IgniteCheckedException If failed to perform resource injection.
+ */
+ private void initLocalListener(CacheEntryListener lsnr, GridKernalContext ctx) throws IgniteCheckedException {
+ if (lsnr != null) {
+ CacheEntryListener impl =
+ lsnr instanceof JCacheQueryLocalListener
+ ? ((JCacheQueryLocalListener)lsnr).impl
+ : lsnr;
+
+ ctx.resource().injectGeneric(impl);
+
+ asyncCb = U.hasAnnotation(impl, IgniteAsyncCallback.class);
+ }
+ }
+
+ /**
+ * Performs resource injection and checks asynchrony for the provided remote filter.
+ *
+ * @param filter Remote filter.
+ * @param ctx Kernal context.
+ * @throws IgniteCheckedException If failed to perform resource injection.
+ */
+ protected void initRemoteFilter(CacheEntryEventFilter filter, GridKernalContext ctx) throws IgniteCheckedException {
+ CacheEntryEventFilter impl =
+ filter instanceof JCacheQueryRemoteFilter
+ ? ((JCacheQueryRemoteFilter)filter).impl
+ : filter;
+
+ if (impl != null) {
+ ctx.resource().injectGeneric(impl);
+
+ if (!asyncCb)
+ asyncCb = U.hasAnnotation(impl, IgniteAsyncCallback.class);
+ }
+ }
+
+ /**
+ * @return Cache entry event filter.
+ *
+ * @throws IgniteCheckedException If P2P unmarshalling failed.
+ */
+ public CacheEntryEventFilter getEventFilter() throws IgniteCheckedException {
+ initFut.get();
+
+ return getEventFilter0();
}
/**
+ * Returns an event filter without waiting on the unmarshalling future.
+ *
* @return Cache entry event filter.
*/
- public CacheEntryEventFilter getEventFilter() {
+ protected CacheEntryEventFilter getEventFilter0() {
return rmtFilter;
}
@@ -868,7 +955,7 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
/**
* @param evt Query event.
- * @return {@code True} if event passed filter otherwise {@code true}.
+ * @return {@code True} if event passed filter otherwise {@code false}.
*/
public boolean filter(CacheContinuousQueryEvent evt) {
CacheContinuousQueryEntry entry = evt.entry();
@@ -895,7 +982,8 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
* @param loc Listener deployed on this node.
* @param recordIgniteEvt Record ignite event.
*/
- private void onEntryUpdate(CacheContinuousQueryEvent evt, boolean notify, boolean loc, boolean recordIgniteEvt) {
+ private void onEntryUpdate(CacheContinuousQueryEvent<K, V> evt,
+ boolean notify, boolean loc, boolean recordIgniteEvt) {
try {
GridCacheContext<K, V> cctx = cacheContext(ctx);
@@ -948,8 +1036,21 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
}
if (recordIgniteEvt && notify) {
+ CacheEntryEventFilter filter;
+ try {
+ filter = getEventFilter();
+ }
+ catch (IgniteCheckedException e) {
+ if (log.isDebugEnabled()) {
+ log.debug("Failed to trigger a continuoue query event. " +
+ "[routineId=" + routineId + ", cacheName=" + cacheName + ", err=" + e + "]");
+ }
+
+ return;
+ }
+
//noinspection unchecked
- ctx.event().record(new CacheQueryReadEvent<>(
+ ctx.event().record(new CacheQueryReadEvent<K, V>(
ctx.discovery().localNode(),
"Continuous query executed.",
EVT_CACHE_QUERY_OBJECT_READ,
@@ -958,8 +1059,8 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
null,
null,
null,
- getEventFilter() instanceof CacheEntryEventSerializableFilter ?
- (CacheEntryEventSerializableFilter)getEventFilter() : null,
+ filter instanceof CacheEntryEventSerializableFilter ?
+ (CacheEntryEventSerializableFilter)filter : null,
null,
nodeId,
taskName(),
@@ -1153,7 +1254,41 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
assert ctx.config().isPeerClassLoadingEnabled();
if (rmtFilterDep != null)
- rmtFilter = rmtFilterDep.unmarshal(nodeId, ctx);
+ rmtFilter = p2pUnmarshal(rmtFilterDep, nodeId, ctx);
+
+ if (!p2pUnmarshalFut.isDone())
+ ((GridFutureAdapter)p2pUnmarshalFut).onDone();
+ }
+
+ /**
+ * @return Whether the handler is marshalled for peer class loading.
+ */
+ public boolean isMarshalled() {
+ return rmtFilter == null || U.isGrid(rmtFilter.getClass()) || rmtFilterDep != null;
+ }
+
+ /**
+ * @param depObj Deployable object to unmarshal.
+ * @param nodeId Sender node Id.
+ * @param ctx Kernal context.
+ * @param <T> Result type.
+ * @return Unmarshalled object.
+ * @throws IgniteCheckedException In case of unmarshalling failures.
+ */
+ protected <T> T p2pUnmarshal(CacheContinuousQueryDeployableObject depObj,
+ UUID nodeId, GridKernalContext ctx) throws IgniteCheckedException {
+ if (depObj != null) {
+ try {
+ return depObj.unmarshal(nodeId, ctx);
+ }
+ catch (IgniteCheckedException e) {
+ ((GridFutureAdapter)p2pUnmarshalFut).onDone(e);
+
+ throw e;
+ }
+ }
+ else
+ return null;
}
/** {@inheritDoc} */
@@ -1262,8 +1397,11 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
boolean b = in.readBoolean();
- if (b)
+ if (b) {
rmtFilterDep = (CacheContinuousQueryDeployableObject)in.readObject();
+
+ p2pUnmarshalFut = new GridFutureAdapter<>();
+ }
else
rmtFilter = (CacheEntryEventSerializableFilter<K, V>)in.readObject();
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandlerV2.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandlerV2.java
index 86c1ae1..1d968c6 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandlerV2.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandlerV2.java
@@ -29,6 +29,7 @@ import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryManager.JCacheQueryRemoteFilter;
import org.apache.ignite.internal.processors.continuous.GridContinuousHandler;
+import org.apache.ignite.internal.util.future.GridFutureAdapter;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.jetbrains.annotations.Nullable;
@@ -99,7 +100,7 @@ public class CacheContinuousQueryHandlerV2<K, V> extends CacheContinuousQueryHan
}
/** {@inheritDoc} */
- @Override public CacheEntryEventFilter getEventFilter() {
+ @Override protected CacheEntryEventFilter getEventFilter0() {
if (filter == null) {
assert rmtFilterFactory != null;
@@ -124,10 +125,16 @@ public class CacheContinuousQueryHandlerV2<K, V> extends CacheContinuousQueryHan
/** {@inheritDoc} */
@Override public void p2pUnmarshal(UUID nodeId, GridKernalContext ctx) throws IgniteCheckedException {
+ if (rmtFilterFactoryDep != null)
+ rmtFilterFactory = p2pUnmarshal(rmtFilterFactoryDep, nodeId, ctx);
+
super.p2pUnmarshal(nodeId, ctx);
+ }
- if (rmtFilterFactoryDep != null)
- rmtFilterFactory = rmtFilterFactoryDep.unmarshal(nodeId, ctx);
+ /** {@inheritDoc} */
+ @Override public boolean isMarshalled() {
+ return super.isMarshalled() &&
+ (rmtFilterFactory == null || U.isGrid(rmtFilterFactory.getClass()) || rmtFilterFactoryDep != null);
}
/** {@inheritDoc} */
@@ -163,9 +170,12 @@ public class CacheContinuousQueryHandlerV2<K, V> extends CacheContinuousQueryHan
boolean b = in.readBoolean();
- if (b)
+ if (b) {
rmtFilterFactoryDep = (CacheContinuousQueryDeployableObject)in.readObject();
- else
+
+ if (p2pUnmarshalFut.isDone())
+ p2pUnmarshalFut = new GridFutureAdapter<>();
+ } else
rmtFilterFactory = (Factory)in.readObject();
types = in.readByte();
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandlerV3.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandlerV3.java
index 0008cfc..552e87d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandlerV3.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandlerV3.java
@@ -28,6 +28,7 @@ import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.cache.query.ContinuousQueryWithTransformer;
import org.apache.ignite.cache.query.ContinuousQueryWithTransformer.EventListener;
import org.apache.ignite.internal.GridKernalContext;
+import org.apache.ignite.internal.util.future.GridFutureAdapter;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteAsyncCallback;
import org.apache.ignite.lang.IgniteClosure;
@@ -114,11 +115,11 @@ public class CacheContinuousQueryHandlerV3<K, V> extends CacheContinuousQueryHan
}
/** {@inheritDoc} */
- @Override public CacheEntryEventFilter<K, V> getEventFilter() {
+ @Override protected CacheEntryEventFilter getEventFilter0() {
if (rmtFilterFactory == null)
return null;
- return super.getEventFilter();
+ return super.getEventFilter0();
}
/** {@inheritDoc} */
@@ -148,10 +149,16 @@ public class CacheContinuousQueryHandlerV3<K, V> extends CacheContinuousQueryHan
/** {@inheritDoc} */
@Override public void p2pUnmarshal(UUID nodeId, GridKernalContext ctx) throws IgniteCheckedException {
+ if (rmtTransFactoryDep != null)
+ rmtTransFactory = p2pUnmarshal(rmtTransFactoryDep, nodeId, ctx);
+
super.p2pUnmarshal(nodeId, ctx);
+ }
- if (rmtTransFactoryDep != null)
- rmtTransFactory = rmtTransFactoryDep.unmarshal(nodeId, ctx);
+ /** {@inheritDoc} */
+ @Override public boolean isMarshalled() {
+ return super.isMarshalled() &&
+ (rmtTransFactory == null || U.isGrid(rmtTransFactory.getClass()) || rmtTransFactoryDep != null);
}
/** {@inheritDoc} */
@@ -174,9 +181,12 @@ public class CacheContinuousQueryHandlerV3<K, V> extends CacheContinuousQueryHan
boolean b = in.readBoolean();
- if (b)
+ if (b) {
rmtTransFactoryDep = (CacheContinuousQueryDeployableObject)in.readObject();
- else
+
+ if (p2pUnmarshalFut.isDone())
+ p2pUnmarshalFut = new GridFutureAdapter<>();
+ } else
rmtTransFactory = (Factory<? extends IgniteClosure<CacheEntryEvent<? extends K, ? extends V>, ?>>)in.readObject();
}
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryListener.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryListener.java
index a652c51..e534fdd 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryListener.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryListener.java
@@ -30,11 +30,6 @@ import org.jetbrains.annotations.Nullable;
*/
public interface CacheContinuousQueryListener<K, V> {
/**
- * Query execution callback.
- */
- public void onExecution();
-
- /**
* Entry update callback.
*
* @param evt Event
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
index 0ae980a..d2fecbf 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
@@ -73,7 +73,6 @@ import org.apache.ignite.internal.util.typedef.CI2;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
-import org.apache.ignite.lang.IgniteAsyncCallback;
import org.apache.ignite.lang.IgniteClosure;
import org.apache.ignite.lang.IgniteOutClosure;
import org.apache.ignite.lang.IgnitePredicate;
@@ -840,7 +839,7 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter {
cctx.kernalContext().cache().jcache(cctx.name()),
cctx, entry);
- if (hnd.getEventFilter() != null && !hnd.getEventFilter().evaluate(next))
+ if (!hnd.filter(next))
next = null;
}
}
@@ -955,9 +954,6 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter {
finally {
cctx.group().listenerLock().writeLock().unlock();
}
-
- if (added)
- lsnr.onExecution();
}
return added ? GridContinuousHandler.RegisterStatus.REGISTERED
@@ -1212,13 +1208,6 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter {
return evts;
}
- /**
- * @return {@code True} if listener should be executed in non-system thread.
- */
- protected boolean async() {
- return U.hasAnnotation(impl, IgniteAsyncCallback.class);
- }
-
/** {@inheritDoc} */
@Override public void close() throws IOException {
if (impl instanceof Closeable)
@@ -1286,13 +1275,6 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter {
}
/**
- * @return {@code True} if filter should be executed in non-system thread.
- */
- protected boolean async() {
- return U.hasAnnotation(impl, IgniteAsyncCallback.class);
- }
-
- /**
* @param evtType Type.
* @return Flag value.
*/
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
index bbd2290..f6bd571 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
@@ -44,8 +44,9 @@ import org.apache.ignite.cache.CacheEntryEventSerializableFilter;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.events.DiscoveryEvent;
import org.apache.ignite.events.Event;
+import org.apache.ignite.failure.FailureContext;
+import org.apache.ignite.failure.FailureType;
import org.apache.ignite.internal.GridKernalContext;
-import org.apache.ignite.internal.GridMessageListenHandler;
import org.apache.ignite.internal.IgniteClientDisconnectedCheckedException;
import org.apache.ignite.internal.IgniteDeploymentCheckedException;
import org.apache.ignite.internal.IgniteFutureTimeoutCheckedException;
@@ -444,12 +445,16 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
UUID routineId = e.getKey();
LocalRoutineInfo info = e.getValue();
+ assert !ctx.config().isPeerClassLoadingEnabled() ||
+ !(info.hnd instanceof CacheContinuousQueryHandler) ||
+ ((CacheContinuousQueryHandler)info.hnd).isMarshalled();
+
data.addItem(new DiscoveryDataItem(routineId,
- info.prjPred,
- info.hnd,
- info.bufSize,
- info.interval,
- info.autoUnsubscribe));
+ info.prjPred,
+ info.hnd,
+ info.bufSize,
+ info.interval,
+ info.autoUnsubscribe));
}
return data;
@@ -505,13 +510,13 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
for (ContinuousRoutineInfo routineInfo : nodeData.startedRoutines) {
routinesInfo.addRoutineInfo(routineInfo);
- startDiscoveryDataRoutine(routineInfo);
+ onDiscoveryDataReceivedV2(routineInfo);
}
}
}
else {
if (data.hasJoiningNodeData())
- onDiscoDataReceived((DiscoveryData) data.joiningNodeData());
+ onDiscoveryDataReceivedV1((DiscoveryData) data.joiningNodeData());
}
}
@@ -531,7 +536,7 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
routinesInfo.addRoutineInfo(routineInfo);
- startDiscoveryDataRoutine(routineInfo);
+ onDiscoveryDataReceivedV2(routineInfo);
}
}
}
@@ -540,15 +545,62 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
if (nodeSpecData != null) {
for (Map.Entry<UUID, Serializable> e : nodeSpecData.entrySet())
- onDiscoDataReceived((DiscoveryData) e.getValue());
+ onDiscoveryDataReceivedV1((DiscoveryData) e.getValue());
+ }
+ }
+ }
+
+ /**
+ * Processes data received in a discovery message.
+ * Used with protocol version 1.
+ *
+ * @param data received discovery data.
+ */
+ private void onDiscoveryDataReceivedV1(DiscoveryData data) {
+ if (!ctx.isDaemon() && data != null) {
+ for (DiscoveryDataItem item : data.items) {
+ if (!locInfos.containsKey(item.routineId)) {
+ registerHandlerOnJoin(data.nodeId, item.routineId, item.prjPred,
+ item.hnd, item.bufSize, item.interval, item.autoUnsubscribe);
+ }
+
+ if (!item.autoUnsubscribe) {
+ locInfos.putIfAbsent(item.routineId, new LocalRoutineInfo(
+ item.prjPred, item.hnd, item.bufSize, item.interval, item.autoUnsubscribe));
+ }
+ }
+
+ // Process CQs started on clients.
+ for (Map.Entry<UUID, Map<UUID, LocalRoutineInfo>> entry : data.clientInfos.entrySet()) {
+ UUID clientNodeId = entry.getKey();
+
+ if (!ctx.localNodeId().equals(clientNodeId)) {
+ Map<UUID, LocalRoutineInfo> clientRoutineMap = entry.getValue();
+
+ for (Map.Entry<UUID, LocalRoutineInfo> e : clientRoutineMap.entrySet()) {
+ UUID routineId = e.getKey();
+ LocalRoutineInfo info = e.getValue();
+
+ registerHandlerOnJoin(clientNodeId, routineId, info.prjPred,
+ info.hnd, info.bufSize, info.interval, info.autoUnsubscribe);
+ }
+ }
+
+ Map<UUID, LocalRoutineInfo> map =
+ clientInfos.computeIfAbsent(clientNodeId, k -> new HashMap<>());
+
+ map.putAll(entry.getValue());
}
}
}
/**
+ * Processes data received in a discovery message.
+ * Used with protocol version 2.
+ *
* @param routineInfo Routine info.
*/
- private void startDiscoveryDataRoutine(ContinuousRoutineInfo routineInfo) {
+ private void onDiscoveryDataReceivedV2(ContinuousRoutineInfo routineInfo) {
IgnitePredicate<ClusterNode> nodeFilter;
try {
@@ -561,122 +613,94 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
nodeFilter = null;
}
catch (IgniteCheckedException e) {
- U.error(log, "Failed to unmarshal continuous routine filter, ignore routine [" +
+ U.error(log, "Failed to unmarshal continuous routine filter [" +
"routineId=" + routineInfo.routineId +
", srcNodeId=" + routineInfo.srcNodeId + ']', e);
+ ctx.failure().process(new FailureContext(FailureType.CRITICAL_ERROR, e));
+
return;
}
- ctx.discovery().localJoinFuture().listen(f -> ctx.closure().runLocalSafe(() -> {
- if (nodeFilter == null || nodeFilter.apply(ctx.discovery().localNode())) {
- GridContinuousHandler hnd;
+ GridContinuousHandler hnd;
- try {
- hnd = U.unmarshal(marsh, routineInfo.hnd, U.resolveClassLoader(ctx.config()));
+ try {
+ hnd = U.unmarshal(marsh, routineInfo.hnd, U.resolveClassLoader(ctx.config()));
+ }
+ catch (IgniteCheckedException e) {
+ U.error(log, "Failed to unmarshal continuous routine handler [" +
+ "routineId=" + routineInfo.routineId +
+ ", srcNodeId=" + routineInfo.srcNodeId + ']', e);
- if (ctx.config().isPeerClassLoadingEnabled())
- hnd.p2pUnmarshal(routineInfo.srcNodeId, ctx);
- }
- catch (IgniteCheckedException e) {
- U.error(log, "Failed to unmarshal continuous routine handler, ignore routine [" +
- "routineId=" + routineInfo.routineId +
- ", srcNodeId=" + routineInfo.srcNodeId + ']', e);
+ ctx.failure().process(new FailureContext(FailureType.CRITICAL_ERROR, e));
- return;
- }
+ return;
+ }
- try {
- registerHandler(routineInfo.srcNodeId,
- routineInfo.routineId,
+ registerHandlerOnJoin(routineInfo.srcNodeId, routineInfo.routineId, nodeFilter,
+ hnd, routineInfo.bufSize, routineInfo.interval, routineInfo.autoUnsubscribe);
+ }
+
+ /**
+ * Register a continuous query handler on local node join.
+ *
+ * @param srcNodeId Id of the subscriber node.
+ * @param routineId Routine id.
+ * @param nodeFilter Node filter.
+ * @param hnd Continuous query handler.
+ * @param bufSize Buffer size.
+ * @param interval Time interval for buffer checker.
+ * @param autoUnsubscribe Automatic unsubscribe flag.
+ */
+ private void registerHandlerOnJoin(UUID srcNodeId, UUID routineId, IgnitePredicate<ClusterNode> nodeFilter,
+ GridContinuousHandler hnd, int bufSize, long interval, boolean autoUnsubscribe) {
+
+ try {
+ if (nodeFilter != null)
+ ctx.resource().injectGeneric(nodeFilter);
+
+ if (nodeFilter == null || nodeFilter.apply(ctx.discovery().localNode())) {
+ registerHandler(srcNodeId,
+ routineId,
hnd,
- routineInfo.bufSize,
- routineInfo.interval,
- routineInfo.autoUnsubscribe,
+ bufSize,
+ interval,
+ autoUnsubscribe,
false);
- }
- catch (IgniteCheckedException e) {
- U.error(log, "Failed to register continuous routine handler, ignore routine [" +
- "routineId=" + routineInfo.routineId +
- ", srcNodeId=" + routineInfo.srcNodeId + ']', e);
- }
}
else {
if (log.isDebugEnabled()) {
log.debug("Do not register continuous routine, rejected by node filter [" +
- "routineId=" + routineInfo.routineId +
- ", srcNodeId=" + routineInfo.srcNodeId + ']');
- }
- }
- }));
- }
-
- /**
- * @param data received discovery data.
- */
- private void onDiscoDataReceived(DiscoveryData data) {
- if (!ctx.isDaemon() && data != null) {
- for (DiscoveryDataItem item : data.items) {
- try {
- if (item.prjPred != null)
- ctx.resource().injectGeneric(item.prjPred);
-
- // Register handler only if local node passes projection predicate.
- if ((item.prjPred == null || item.prjPred.apply(ctx.discovery().localNode())) &&
- !locInfos.containsKey(item.routineId))
- registerHandler(data.nodeId, item.routineId, item.hnd, item.bufSize, item.interval,
- item.autoUnsubscribe, false);
-
- if (!item.autoUnsubscribe)
- // Register routine locally.
- locInfos.putIfAbsent(item.routineId, new LocalRoutineInfo(
- item.prjPred, item.hnd, item.bufSize, item.interval, item.autoUnsubscribe));
- }
- catch (IgniteCheckedException e) {
- U.error(log, "Failed to register continuous handler.", e);
+ "routineId=" + routineId +
+ ", srcNodeId=" + srcNodeId + ']');
}
}
+ }
+ catch (IgniteCheckedException e) {
+ U.error(log, "Failed to register continuous routine handler [" +
+ "routineId=" + routineId +
+ ", srcNodeId=" + srcNodeId + ']', e);
- for (Map.Entry<UUID, Map<UUID, LocalRoutineInfo>> entry : data.clientInfos.entrySet()) {
- UUID clientNodeId = entry.getKey();
-
- if (!ctx.clientNode()) {
- Map<UUID, LocalRoutineInfo> clientRoutineMap = entry.getValue();
-
- for (Map.Entry<UUID, LocalRoutineInfo> e : clientRoutineMap.entrySet()) {
- UUID routineId = e.getKey();
- LocalRoutineInfo info = e.getValue();
-
- try {
- if (info.prjPred != null)
- ctx.resource().injectGeneric(info.prjPred);
+ ctx.failure().process(new FailureContext(FailureType.CRITICAL_ERROR, e));
+ }
- if (info.prjPred == null || info.prjPred.apply(ctx.discovery().localNode())) {
- registerHandler(clientNodeId,
- routineId,
- info.hnd,
- info.bufSize,
- info.interval,
- info.autoUnsubscribe,
- false);
- }
- }
- catch (IgniteCheckedException err) {
- U.error(log, "Failed to register continuous handler.", err);
- }
- }
+ if (ctx.config().isPeerClassLoadingEnabled()) {
+ // Peer class loading cannot be performed before a node joins, so we delay the deployment.
+ // Run the deployment task in the system pool to avoid blocking of the discovery thread.
+ ctx.discovery().localJoinFuture().listen(f -> ctx.closure().runLocalSafe(() -> {
+ try {
+ hnd.p2pUnmarshal(srcNodeId, ctx);
}
+ catch (IgniteCheckedException | IgniteException e) {
+ U.error(log, "Failed to unmarshal continuous routine handler [" +
+ "routineId=" + routineId +
+ ", srcNodeId=" + srcNodeId + ']', e);
- Map<UUID, LocalRoutineInfo> map = clientInfos.get(entry.getKey());
+ ctx.failure().process(new FailureContext(FailureType.CRITICAL_ERROR, e));
- if (map == null) {
- map = new HashMap<>();
-
- clientInfos.put(entry.getKey(), map);
+ unregisterHandler(routineId, hnd, false);
}
-
- map.putAll(entry.getValue());
- }
+ }));
}
}
@@ -721,6 +745,8 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
* Registers routine info to be sent in discovery data during this node join
* (to be used for internal queries started from client nodes).
*
+ * Peer class loading is not applied to static routines.
+ *
* @param cacheName Cache name.
* @param locLsnr Local listener.
* @param rmtFilter Remote filter.
@@ -818,7 +844,7 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
int bufSize,
long interval,
boolean autoUnsubscribe,
- @Nullable IgnitePredicate<ClusterNode> prjPred) {
+ @Nullable IgnitePredicate<ClusterNode> prjPred) throws IgniteCheckedException {
assert hnd != null;
assert bufSize > 0;
assert interval >= 0;
@@ -826,6 +852,12 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
// Generate ID.
final UUID routineId = UUID.randomUUID();
+ if (ctx.config().isPeerClassLoadingEnabled()) {
+ hnd.p2pMarshal(ctx);
+
+ assert !(hnd instanceof CacheContinuousQueryHandler) || ((CacheContinuousQueryHandler)hnd).isMarshalled();
+ }
+
// Register routine locally.
locInfos.put(routineId, new LocalRoutineInfo(prjPred, hnd, bufSize, interval, autoUnsubscribe));
@@ -1369,10 +1401,6 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
U.error(log, "Failed to register handler [nodeId=" + node.id() + ", routineId=" + routineId + ']', e);
}
- GridContinuousHandler hnd0 = hnd instanceof GridMessageListenHandler ?
- new GridMessageListenHandler((GridMessageListenHandler)hnd) :
- hnd;
-
if (node.isClient()) {
Map<UUID, LocalRoutineInfo> clientRoutineMap = clientInfos.get(node.id());
@@ -1385,7 +1413,7 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
}
clientRoutineMap.put(routineId, new LocalRoutineInfo(data.projectionPredicate(),
- hnd0,
+ hnd,
data.bufferSize(),
data.interval(),
data.autoUnsubscribe()));
@@ -1400,13 +1428,13 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
if ((prjPred == null || prjPred.apply(ctx.discovery().node(ctx.localNodeId()))) &&
!locInfos.containsKey(routineId))
- registerHandler(node.id(), routineId, hnd0, data.bufferSize(), data.interval(),
+ registerHandler(node.id(), routineId, hnd, data.bufferSize(), data.interval(),
data.autoUnsubscribe(), false);
if (!data.autoUnsubscribe())
// Register routine locally.
locInfos.putIfAbsent(routineId, new LocalRoutineInfo(
- prjPred, hnd0, data.bufferSize(), data.interval(), data.autoUnsubscribe()));
+ prjPred, hnd, data.bufferSize(), data.interval(), data.autoUnsubscribe()));
}
catch (IgniteCheckedException e) {
err = e;
@@ -1416,14 +1444,14 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
}
// Load partition counters.
- if (hnd0.isQuery()) {
+ if (hnd.isQuery()) {
GridCacheProcessor proc = ctx.cache();
if (proc != null) {
- GridCacheAdapter cache = ctx.cache().internalCache(hnd0.cacheName());
+ GridCacheAdapter cache = ctx.cache().internalCache(hnd.cacheName());
if (cache != null && !cache.isLocal() && cache.context().userCache())
- req.addUpdateCounters(ctx.localNodeId(), hnd0.updateCounters());
+ req.addUpdateCounters(ctx.localNodeId(), hnd.updateCounters());
}
}
@@ -1544,23 +1572,19 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
((CacheContinuousQueryHandler)hnd).keepBinary(true);
}
- GridContinuousHandler hnd0 = hnd instanceof GridMessageListenHandler ?
- new GridMessageListenHandler((GridMessageListenHandler)hnd) :
- hnd;
-
registerHandler(snd.id(),
msg.routineId,
- hnd0,
+ hnd,
reqData.bufferSize(),
reqData.interval(),
reqData.autoUnsubscribe(),
false);
- if (hnd0.isQuery()) {
+ if (hnd.isQuery()) {
GridCacheProcessor proc = ctx.cache();
if (proc != null) {
- GridCacheAdapter cache = ctx.cache().internalCache(hnd0.cacheName());
+ GridCacheAdapter cache = ctx.cache().internalCache(hnd.cacheName());
if (cache != null && !cache.isLocal() && cache.context().userCache()) {
CachePartitionPartialCountersMap cntrsMap =
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryLongP2PTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryLongP2PTest.java
new file mode 100644
index 0000000..b4ce91f
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryLongP2PTest.java
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.query.continuous;
+
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+import javax.cache.configuration.Factory;
+import javax.cache.event.CacheEntryEventFilter;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.cache.query.ContinuousQuery;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.managers.communication.GridIoMessage;
+import org.apache.ignite.internal.managers.deployment.GridDeploymentResponse;
+import org.apache.ignite.lang.IgniteInClosure;
+import org.apache.ignite.plugin.extensions.communication.Message;
+import org.apache.ignite.resources.LoggerResource;
+import org.apache.ignite.spi.IgniteSpiException;
+import org.apache.ignite.spi.communication.CommunicationSpi;
+import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.junit.Test;
+
+import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC;
+import static org.apache.ignite.cache.CacheMode.PARTITIONED;
+
+/**
+ *
+ */
+public class CacheContinuousQueryLongP2PTest extends CacheContinuousQueryOperationP2PTest {
+ /** */
+ private static volatile int delay;
+
+ /** {@inheritDoc} */
+ @Override protected CommunicationSpi communicationSpi() {
+ return new P2PDelayingCommunicationSpi();
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void beforeTest() throws Exception {
+ delay = 300;
+
+ super.beforeTest();
+ }
+
+ /**
+ * Checks that a node start is not blocked by peer class loading of the continuous query remote filter.
+ *
+ * @throws Exception If failed.
+ */
+ @Test(timeout = 60_000)
+ public void testLongP2PClassLoadingDoesntBlockNodeStart() throws Exception {
+ delay = 3_000;
+
+ CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED, 1, ATOMIC);
+ IgniteCache cache = grid(NODES - 1).getOrCreateCache(ccfg.getName());
+
+ ContinuousQuery<Integer, Integer> qry = continuousQuery();
+
+ cache.query(qry);
+
+ AtomicReference<String> err = new AtomicReference<>();
+
+ IgniteInternalFuture<?> startFut = GridTestUtils.runAsync(() -> {
+ try {
+ startGrid(NODES);
+ }
+ catch (Exception e) {
+ err.set(e.getMessage());
+
+ e.printStackTrace();
+ }
+ });
+
+ startFut.get(1, TimeUnit.SECONDS);
+
+ assertNull("Error occurred when starting a node: " + err.get(), err.get());
+ }
+
+ /**
+ * @return Continuous query with remote filter from an external class loader.
+ * @throws Exception If failed.
+ */
+ private ContinuousQuery<Integer, Integer> continuousQuery() throws Exception {
+ final Class<Factory<CacheEntryEventFilter>> evtFilterFactoryCls =
+ (Class<Factory<CacheEntryEventFilter>>)getExternalClassLoader().
+ loadClass("org.apache.ignite.tests.p2p.CacheDeploymentEntryEventFilterFactory");
+
+ ContinuousQuery<Integer, Integer> qry = new ContinuousQuery<>();
+
+ qry.setLocalListener((evt) -> {});
+
+ qry.setRemoteFilterFactory(
+ (Factory<? extends CacheEntryEventFilter<Integer, Integer>>)(Object)evtFilterFactoryCls.newInstance());
+
+ return qry;
+ }
+
+ /**
+ * TcpCommunicationSpi
+ */
+ private static class P2PDelayingCommunicationSpi extends TcpCommunicationSpi {
+ /** */
+ @LoggerResource
+ private IgniteLogger log;
+
+ /** {@inheritDoc} */
+ @Override public void sendMessage(ClusterNode node, Message msg, IgniteInClosure<IgniteException> ackC)
+ throws IgniteSpiException {
+ if (isDeploymentResponse((GridIoMessage) msg)) {
+ log.info(">>> Delaying deployment message: " + msg);
+
+ try {
+ Thread.sleep(delay);
+ }
+ catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ }
+
+ super.sendMessage(node, msg, ackC);
+ }
+
+ /**
+ * Checks if it is a p2p deployment response.
+ *
+ * @param msg Message to check.
+ * @return {@code True} if this is a p2p response.
+ */
+ private boolean isDeploymentResponse(GridIoMessage msg) {
+ Object origMsg = msg.message();
+
+ return origMsg instanceof GridDeploymentResponse;
+ }
+ }
+}
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryOperationP2PTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryOperationP2PTest.java
index 39debb0..9a53315 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryOperationP2PTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryOperationP2PTest.java
@@ -20,6 +20,7 @@ package org.apache.ignite.internal.processors.cache.query.continuous;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
import javax.cache.configuration.Factory;
import javax.cache.configuration.FactoryBuilder;
import javax.cache.configuration.MutableCacheEntryListenerConfiguration;
@@ -28,6 +29,7 @@ import javax.cache.event.CacheEntryEvent;
import javax.cache.event.CacheEntryEventFilter;
import javax.cache.event.CacheEntryListenerException;
import javax.cache.event.CacheEntryUpdatedListener;
+import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.cache.CacheAtomicityMode;
import org.apache.ignite.cache.CacheMode;
@@ -35,6 +37,9 @@ import org.apache.ignite.cache.query.ContinuousQuery;
import org.apache.ignite.cache.query.QueryCursor;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.spi.communication.CommunicationSpi;
+import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
+import org.apache.ignite.testframework.GridTestUtils;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import org.junit.Test;
@@ -50,7 +55,10 @@ import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
*/
public class CacheContinuousQueryOperationP2PTest extends GridCommonAbstractTest {
/** */
- private static final int NODES = 5;
+ public static final int NODES = 5;
+
+ /** */
+ private static final int UPDATES = 100;
/** */
private boolean client;
@@ -59,27 +67,34 @@ public class CacheContinuousQueryOperationP2PTest extends GridCommonAbstractTest
@Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
+ cfg.setCommunicationSpi(communicationSpi());
+
cfg.setClientMode(client);
cfg.setPeerClassLoadingEnabled(true);
return cfg;
}
+ /**
+ * @return Communication SPI to use during a test.
+ */
+ protected CommunicationSpi communicationSpi() {
+ return new TcpCommunicationSpi();
+ }
+
/** {@inheritDoc} */
@Override protected void beforeTest() throws Exception {
- super.beforeTest();
-
startGridsMultiThreaded(NODES - 1);
client = true;
startGrid(NODES - 1);
+
+ client = false;
}
/** {@inheritDoc} */
@Override protected void afterTest() throws Exception {
- super.afterTest();
-
stopAllGrids();
}
@@ -240,85 +255,158 @@ public class CacheContinuousQueryOperationP2PTest extends GridCommonAbstractTest
}
/**
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testMultithreadedUpdatesNodeJoin() throws Exception {
+ Ignite client = startGrid("client");
+
+ CacheConfiguration<Object, Object> cacheCfg = cacheConfiguration(PARTITIONED,
+ 0,
+ ATOMIC
+ );
+ IgniteCache<Object, Object> cache = client.createCache(cacheCfg);
+
+ int iterations = 50;
+ int keysNum = 100;
+ int threadsNum = Runtime.getRuntime().availableProcessors();
+
+ CountDownLatch updatesLatch = new CountDownLatch(iterations * keysNum * threadsNum / 2);
+
+ ContinuousQuery<Object, Object> qry = new ContinuousQuery<>();
+
+ final Class<Factory<CacheEntryEventFilter>> evtFilterFactoryCls =
+ (Class<Factory<CacheEntryEventFilter>>)getExternalClassLoader().
+ loadClass("org.apache.ignite.tests.p2p.CacheDeploymentEntryEventFilterFactory");
+ qry.setRemoteFilterFactory(
+ (Factory<? extends CacheEntryEventFilter<Object, Object>>)(Object)evtFilterFactoryCls.newInstance());
+
+ qry.setLocalListener((evts) -> {
+ for (CacheEntryEvent<?, ?> ignored : evts)
+ updatesLatch.countDown();
+ });
+
+ cache.query(qry);
+
+ for (int t = 0; t < threadsNum; t++) {
+ int threadId = t;
+
+ GridTestUtils.runAsync(() -> {
+ for (int i = 0; i < iterations; i++) {
+ log.info("Iteration #" + (i + 1));
+
+ for (int k = 0; k < keysNum; k++) {
+ int key = keysNum * threadId + k;
+
+ cache.put(key, key);
+ }
+ }
+ }, "cache-writer-thread-" + threadId);
+ }
+
+ startGrid(NODES);
+
+ assertTrue("Failed to wait for all cache updates invocations. Latch: " + updatesLatch,
+ updatesLatch.await(30, TimeUnit.SECONDS));
+ }
+
+ /**
* @param ccfg Cache configuration.
* @param isClient Client.
* @throws Exception If failed.
*/
protected void testContinuousQuery(CacheConfiguration<Object, Object> ccfg, boolean isClient)
throws Exception {
- ignite(0).createCache(ccfg);
-
- ThreadLocalRandom rnd = ThreadLocalRandom.current();
- QueryCursor<?> cur = null;
+ ignite(0).createCache(ccfg);
- final Class<Factory<CacheEntryEventFilter>> evtFilterFactory =
+ final Class<Factory<CacheEntryEventFilter>> evtFilterFactoryCls =
(Class<Factory<CacheEntryEventFilter>>)getExternalClassLoader().
loadClass("org.apache.ignite.tests.p2p.CacheDeploymentEntryEventFilterFactory");
- final CountDownLatch latch = new CountDownLatch(10);
+ testContinuousQuery(ccfg, isClient, false, evtFilterFactoryCls);
+ testContinuousQuery(ccfg, isClient, true, evtFilterFactoryCls);
+ }
+
+ /**
+ * @param ccfg Cache configuration.
+ * @param isClient Client.
+ * @param joinNode If a node should be added to topology after a query is started.
+ * @param evtFilterFactoryCls Remote filter factory class.
+ * @throws Exception If failed.
+ */
+ private void testContinuousQuery(CacheConfiguration<Object, Object> ccfg,
+ boolean isClient, boolean joinNode,
+ Class<Factory<CacheEntryEventFilter>> evtFilterFactoryCls) throws Exception {
+
+ ThreadLocalRandom rnd = ThreadLocalRandom.current();
+
+ final CountDownLatch latch = new CountDownLatch(UPDATES);
ContinuousQuery<Integer, Integer> qry = new ContinuousQuery<>();
- TestLocalListener localLsnr = new TestLocalListener() {
- @Override public void onEvent(Iterable<CacheEntryEvent<? extends Integer, ? extends Integer>> evts)
- throws CacheEntryListenerException {
+ AtomicReference<String> err = new AtomicReference<>();
+
+ TestLocalListener locLsnr = new TestLocalListener() {
+ @Override protected void onEvent(Iterable<CacheEntryEvent<? extends Integer, ? extends Integer>> evts) {
for (CacheEntryEvent<? extends Integer, ? extends Integer> evt : evts) {
latch.countDown();
log.info("Received event: " + evt);
+
+ int key = evt.getKey();
+
+ if (key % 2 == 0)
+ err.set("Event received on entry, that doesn't pass a filter: " + key);
}
}
};
+ qry.setLocalListener(locLsnr);
+
+ qry.setRemoteFilterFactory(
+ (Factory<? extends CacheEntryEventFilter<Integer, Integer>>)(Object)evtFilterFactoryCls.newInstance());
+
MutableCacheEntryListenerConfiguration<Integer, Integer> lsnrCfg =
new MutableCacheEntryListenerConfiguration<>(
- new FactoryBuilder.SingletonFactory<>(localLsnr),
+ new FactoryBuilder.SingletonFactory<>(locLsnr),
(Factory<? extends CacheEntryEventFilter<? super Integer, ? super Integer>>)
- (Object)evtFilterFactory.newInstance(),
+ (Object)evtFilterFactoryCls.newInstance(),
true,
true
);
- qry.setLocalListener(localLsnr);
+ IgniteCache<Integer, Integer> cache;
- qry.setRemoteFilterFactory(
- (Factory<? extends CacheEntryEventFilter<Integer, Integer>>)(Object)evtFilterFactory.newInstance());
-
- IgniteCache<Integer, Integer> cache = null;
-
- try {
- if (isClient)
- cache = grid(NODES - 1).cache(ccfg.getName());
- else
- cache = grid(rnd.nextInt(NODES - 1)).cache(ccfg.getName());
-
- cur = cache.query(qry);
+ cache = isClient
+ ? grid(NODES - 1).cache(ccfg.getName())
+ : grid(rnd.nextInt(NODES - 1)).cache(ccfg.getName());
+ try (QueryCursor<?> cur = cache.query(qry)) {
cache.registerCacheEntryListener(lsnrCfg);
- for (int i = 0; i < 10; i++)
+ if (joinNode) {
+ startGrid(NODES);
+ awaitPartitionMapExchange();
+ }
+
+ for (int i = 0; i < UPDATES; i++)
cache.put(i, i);
- assertTrue(latch.await(3, TimeUnit.SECONDS));
- }
- finally {
- if (cur != null)
- cur.close();
+ assertTrue("Failed to wait for local listener invocations: " + latch.getCount(),
+ latch.await(3, TimeUnit.SECONDS));
- if (cache != null)
- cache.deregisterCacheEntryListener(lsnrCfg);
+ assertNull(err.get(), err.get());
}
}
/**
- *
* @param cacheMode Cache mode.
* @param backups Number of backups.
* @param atomicityMode Cache atomicity mode.
* @return Cache configuration.
*/
- private CacheConfiguration<Object, Object> cacheConfiguration(
+ protected CacheConfiguration<Object, Object> cacheConfiguration(
CacheMode cacheMode,
int backups,
CacheAtomicityMode atomicityMode) {
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/ContinuousQueryDeserializationErrorOnNodeJoinTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/ContinuousQueryDeserializationErrorOnNodeJoinTest.java
new file mode 100644
index 0000000..1ef027f
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/ContinuousQueryDeserializationErrorOnNodeJoinTest.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.query.continuous;
+
+import java.util.concurrent.ThreadLocalRandom;
+import javax.cache.configuration.Factory;
+import javax.cache.event.CacheEntryEventFilter;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.cache.query.ContinuousQuery;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.junit.Test;
+
+/**
+ * Checks, that errors during CQ remote filter deserialization doesn't prevent a node from joining a cluster.
+ */
+public class ContinuousQueryDeserializationErrorOnNodeJoinTest extends GridCommonAbstractTest {
+ /** */
+ private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true);
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
+ IgniteConfiguration igniteCfg = super.getConfiguration(igniteInstanceName);
+ ((TcpDiscoverySpi)igniteCfg.getDiscoverySpi()).setIpFinder(IP_FINDER);
+
+ igniteCfg.setPeerClassLoadingEnabled(false);
+
+ return igniteCfg;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTest() throws Exception {
+ stopAllGrids();
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testDeserializationErrorOnJoiningNode() throws Exception {
+ String cacheName = "cache";
+ int recordsNum = 1000;
+
+ Ignite node1 = startGrid(1);
+
+ IgniteCache<Integer, Integer> cacheNode1 = node1.getOrCreateCache(cacheName);
+
+ ContinuousQuery<Integer, String> qry = new ContinuousQuery<>();
+ qry.setLocalListener((evt) -> log.info("Event received: " + evt));
+ qry.setRemoteFilterFactory(remoteFilterFactory());
+
+ cacheNode1.query(qry);
+
+ // Deserialization error will happen, when the new node tries to deserialize the discovery data.
+ Ignite node2 = startGrid(2);
+
+ awaitPartitionMapExchange();
+
+ // Check, that node and cache are functional.
+ IgniteCache<Integer, Integer> cacheNode2 = node2.cache(cacheName);
+
+ ThreadLocalRandom rnd = ThreadLocalRandom.current();
+
+ for (int i = 0; i < recordsNum; i++) {
+ IgniteCache<Integer, Integer> cache =
+ rnd.nextBoolean() ? cacheNode1 : cacheNode2;
+
+ cache.put(i, i);
+ }
+
+ for (int i = 0; i < recordsNum; i++) {
+ assertEquals(new Integer(i), cacheNode1.get(i));
+ assertEquals(new Integer(i), cacheNode2.get(i));
+ }
+ }
+
+ /**
+ * @param <K> Key type.
+ * @param <V> Value type.
+ * @return Remote filter.
+ * @throws ReflectiveOperationException In case of instantiation failure.
+ */
+ private <K, V> Factory<? extends CacheEntryEventFilter<K, V>> remoteFilterFactory()
+ throws ReflectiveOperationException {
+ final Class<Factory<CacheEntryEventFilter>> evtFilterFactoryCls =
+ (Class<Factory<CacheEntryEventFilter>>)getExternalClassLoader().
+ loadClass("org.apache.ignite.tests.p2p.CacheDeploymentEntryEventFilterFactory");
+
+ return (Factory<? extends CacheEntryEventFilter<K, V>>)(Object)evtFilterFactoryCls.newInstance();
+ }
+}
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/ContinuousQueryPeerClassLoadingTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/ContinuousQueryPeerClassLoadingTest.java
index 945e04e..8fd8a23 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/ContinuousQueryPeerClassLoadingTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/ContinuousQueryPeerClassLoadingTest.java
@@ -19,7 +19,6 @@ package org.apache.ignite.internal.processors.cache.query.continuous;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
import javax.cache.event.CacheEntryEvent;
import javax.cache.event.CacheEntryListenerException;
import javax.cache.event.CacheEntryUpdatedListener;
@@ -78,6 +77,13 @@ public class ContinuousQueryPeerClassLoadingTest extends GridCommonAbstractTest
}
/**
+ * @throws Exception If failed.
+ */
+ public void testRemoteFilterFactoryFromClientToServer() throws Exception {
+ check("server1", "client", "server2");
+ }
+
+ /**
* @param node1Name Node 1 name.
* @param node2Name Node 2 name.
* @param node3Name Node 3 name.
@@ -98,9 +104,6 @@ public class ContinuousQueryPeerClassLoadingTest extends GridCommonAbstractTest
qry1.setRemoteFilterFactory(new DummyEventFilterFactory<>());
qry2.setRemoteFilterFactory(new DummyEventFilterFactory<>());
- final AtomicInteger client1Evts = new AtomicInteger(0);
- final AtomicInteger client2Evts = new AtomicInteger(0);
-
final CountDownLatch latch1 = new CountDownLatch(20);
final CountDownLatch latch2 = new CountDownLatch(10);
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryNodesFilteringTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryNodesFilteringTest.java
index 2907b90..7688a7b 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryNodesFilteringTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryNodesFilteringTest.java
@@ -19,6 +19,8 @@ package org.apache.ignite.internal.processors.cache.query.continuous;
import java.io.Serializable;
import java.util.Collections;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
import javax.cache.configuration.Factory;
import javax.cache.event.CacheEntryEvent;
import javax.cache.event.CacheEntryEventFilter;
@@ -30,6 +32,8 @@ import org.apache.ignite.cache.query.ContinuousQuery;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.failure.FailureHandler;
+import org.apache.ignite.failure.TestFailureHandler;
import org.apache.ignite.lang.IgnitePredicate;
import org.apache.ignite.testframework.GridStringLogger;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
@@ -63,14 +67,16 @@ public class GridCacheContinuousQueryNodesFilteringTest extends GridCommonAbstra
@Test
public void testNodeWithAttributeFailure() throws Exception {
try (Ignite node1 = startNodeWithCache()) {
- GridStringLogger log = new GridStringLogger();
+ CountDownLatch latch = new CountDownLatch(1);
- try (Ignite node2 = startGrid("node2", getConfiguration("node2", true, log))) {
- fail();
- }
- catch (IgniteException ignored) {
- assertTrue(log.toString().contains("Class not found for continuous query remote filter " +
- "[name=org.apache.ignite.tests.p2p.CacheDeploymentEntryEventFilter]"));
+ FailureHandler failHnd = new TestFailureHandler(false, latch);
+
+ IgniteConfiguration node2Cfg = getConfiguration("node2", true, null)
+ .setFailureHandler(failHnd);
+
+ try (Ignite node2 = startGrid(node2Cfg)) {
+ assertTrue("Failure handler hasn't been invoked on the joined node.",
+ latch.await(5, TimeUnit.SECONDS));
}
}
}
diff --git a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite6.java b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite6.java
index 1de177c..8e8e43b 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite6.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite6.java
@@ -29,6 +29,7 @@ import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinu
import org.apache.ignite.internal.processors.cache.query.continuous.CacheKeepBinaryIterationNearEnabledTest;
import org.apache.ignite.internal.processors.cache.query.continuous.CacheKeepBinaryIterationStoreEnabledTest;
import org.apache.ignite.internal.processors.cache.query.continuous.CacheKeepBinaryIterationTest;
+import org.apache.ignite.internal.processors.cache.query.continuous.ContinuousQueryDeserializationErrorOnNodeJoinTest;
import org.apache.ignite.internal.processors.cache.query.continuous.ContinuousQueryMarshallerTest;
import org.apache.ignite.internal.processors.cache.query.continuous.GridCacheContinuousQueryLocalAtomicSelfTest;
import org.apache.ignite.internal.processors.cache.query.continuous.GridCacheContinuousQueryMultiNodesFilteringTest;
@@ -57,6 +58,7 @@ import org.junit.runners.Suite;
CacheKeepBinaryIterationNearEnabledTest.class,
GridCacheContinuousQueryPartitionedOnlySelfTest.class,
CacheContinuousQueryOperationP2PTest.class,
+ ContinuousQueryDeserializationErrorOnNodeJoinTest.class,
CacheContinuousBatchAckTest.class,
CacheContinuousQueryOrderingEventTest.class,
IgniteCacheContinuousQueryClientTest.class,
diff --git a/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpiTestSuite3.java b/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpiTestSuite3.java
index 2af8005..8403656 100644
--- a/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpiTestSuite3.java
+++ b/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpiTestSuite3.java
@@ -24,6 +24,8 @@ import org.apache.ignite.internal.processors.cache.datastructures.replicated.Gri
import org.apache.ignite.internal.processors.cache.distributed.replicated.GridCacheReplicatedNodeRestartSelfTest;
import org.apache.ignite.internal.processors.cache.multijvm.GridCacheAtomicMultiJvmFullApiSelfTest;
import org.apache.ignite.internal.processors.cache.multijvm.GridCachePartitionedMultiJvmFullApiSelfTest;
+import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryLongP2PTest;
+import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryOperationP2PTest;
import org.apache.ignite.internal.processors.continuous.GridEventConsumeSelfTest;
import org.apache.ignite.p2p.GridP2PContinuousDeploymentSelfTest;
import org.junit.BeforeClass;
@@ -43,7 +45,9 @@ import org.junit.runners.Suite;
GridCachePartitionedSequenceApiSelfTest.class,
GridCacheAtomicMultiJvmFullApiSelfTest.class,
GridCachePartitionedMultiJvmFullApiSelfTest.class,
- GridP2PContinuousDeploymentSelfTest.class
+ GridP2PContinuousDeploymentSelfTest.class,
+ CacheContinuousQueryOperationP2PTest.class,
+ CacheContinuousQueryLongP2PTest.class
})
public class ZookeeperDiscoverySpiTestSuite3 {
/** */