You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vo...@apache.org on 2016/06/20 12:52:45 UTC
[05/50] ignite git commit: ignite-3038 Do not use custom discovery
events to start continuous queries for system caches
ignite-3038 Do not use custom discovery events to start continuous queries for system caches
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/7f878c56
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/7f878c56
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/7f878c56
Branch: refs/heads/ignite-3341
Commit: 7f878c56cf4e63cff1773dfe834cda7cd91c62ac
Parents: e10ffef
Author: sboikov <sb...@gridgain.com>
Authored: Tue Jun 14 10:17:50 2016 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Tue Jun 14 10:17:50 2016 +0300
----------------------------------------------------------------------
.../ignite/internal/MarshallerContextImpl.java | 51 ++-
.../processors/cache/GridCacheAdapter.java | 12 +-
.../binary/CacheObjectBinaryProcessorImpl.java | 29 +-
.../continuous/CacheContinuousQueryManager.java | 135 +++++++
.../cacheobject/IgniteCacheObjectProcessor.java | 7 +
.../IgniteCacheObjectProcessorImpl.java | 5 +
.../continuous/GridContinuousProcessor.java | 217 +++++++---
.../service/GridServiceProcessor.java | 391 ++++++++++---------
...eClientReconnectContinuousProcessorTest.java | 60 ++-
...ridCacheContinuousQueryAbstractSelfTest.java | 2 +-
.../IgniteNoCustomEventsOnNodeStart.java | 80 ++++
.../service/GridServiceClientNodeTest.java | 102 ++++-
.../testsuites/IgniteCacheTestSuite2.java | 3 +
13 files changed, 813 insertions(+), 281 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/7f878c56/modules/core/src/main/java/org/apache/ignite/internal/MarshallerContextImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/MarshallerContextImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/MarshallerContextImpl.java
index 8f566a9..b4c9607 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/MarshallerContextImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/MarshallerContextImpl.java
@@ -39,6 +39,7 @@ import org.apache.ignite.internal.processors.cache.GridCacheAdapter;
import org.apache.ignite.internal.processors.cache.GridCacheTryPutFailedException;
import org.apache.ignite.internal.util.GridStripedLock;
import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.plugin.PluginProvider;
@@ -64,6 +65,9 @@ public class MarshallerContextImpl extends MarshallerContextAdapter {
/** Non-volatile on purpose. */
private int failedCnt;
+ /** */
+ private ContinuousQueryListener lsnr;
+
/**
* @param plugins Plugins.
* @throws IgniteCheckedException In case of error.
@@ -75,25 +79,58 @@ public class MarshallerContextImpl extends MarshallerContextAdapter {
}
/**
+ * @param ctx Context.
+ * @throws IgniteCheckedException If failed.
+ */
+ public void onContinuousProcessorStarted(GridKernalContext ctx) throws IgniteCheckedException {
+ if (ctx.clientNode()) {
+ lsnr = new ContinuousQueryListener(ctx.log(MarshallerContextImpl.class), workDir);
+
+ ctx.continuous().registerStaticRoutine(
+ CU.MARSH_CACHE_NAME,
+ lsnr,
+ null,
+ null);
+ }
+ }
+
+ /**
* @param ctx Kernal context.
* @throws IgniteCheckedException In case of error.
*/
public void onMarshallerCacheStarted(GridKernalContext ctx) throws IgniteCheckedException {
assert ctx != null;
- if (!ctx.isDaemon()) {
+ log = ctx.log(MarshallerContextImpl.class);
+
+ cache = ctx.cache().marshallerCache();
+
+ if (ctx.cache().marshallerCache().context().affinityNode()) {
ctx.cache().marshallerCache().context().continuousQueries().executeInternalQuery(
- new ContinuousQueryListener(ctx.log(MarshallerContextImpl.class), workDir),
+ new ContinuousQueryListener(log, workDir),
null,
- ctx.cache().marshallerCache().context().affinityNode(),
+ true,
true,
false
);
}
-
- log = ctx.log(MarshallerContextImpl.class);
-
- cache = ctx.cache().marshallerCache();
+ else {
+ if (lsnr != null) {
+ ctx.closure().runLocalSafe(new Runnable() {
+ @SuppressWarnings("unchecked")
+ @Override public void run() {
+ try {
+ Iterable entries = cache.context().continuousQueries().existingEntries(false, null);
+
+ lsnr.onUpdated(entries);
+ }
+ catch (IgniteCheckedException e) {
+ U.error(log, "Failed to load marshaller cache entries: " + e, e);
+ }
+ }
+ });
+ }
+ }
latch.countDown();
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/7f878c56/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
index 6e647c5..b4daec2 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
@@ -4138,8 +4138,18 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
/**
* @return Distributed ignite cache iterator.
+ * @throws IgniteCheckedException If failed.
*/
public Iterator<Cache.Entry<K, V>> igniteIterator() throws IgniteCheckedException {
+ return igniteIterator(ctx.keepBinary());
+ }
+
+ /**
+ * @param keepBinary
+ * @return Distributed ignite cache iterator.
+ * @throws IgniteCheckedException If failed.
+ */
+ public Iterator<Cache.Entry<K, V>> igniteIterator(boolean keepBinary) throws IgniteCheckedException {
GridCacheContext ctx0 = ctx.isNear() ? ctx.near().dht().context() : ctx;
final CacheOperationContext opCtx = ctx.operationContextPerCall();
@@ -4147,7 +4157,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
if (!ctx0.isSwapOrOffheapEnabled() && ctx0.kernalContext().discovery().size() == 1)
return localIteratorHonorExpirePolicy(opCtx);
- final GridCloseableIterator<Map.Entry<K, V>> iter = ctx0.queries().createScanQuery(null, null, ctx.keepBinary())
+ final GridCloseableIterator<Map.Entry<K, V>> iter = ctx0.queries().createScanQuery(null, null, keepBinary)
.keepAll(false)
.executeScanQuery();
http://git-wip-us.apache.org/repos/asf/ignite/blob/7f878c56/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java
index fe73f8a..d5756a3 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java
@@ -154,9 +154,6 @@ public class CacheObjectBinaryProcessorImpl extends IgniteCacheObjectProcessorIm
/** Metadata updates collected before metadata cache is initialized. */
private final Map<Integer, BinaryMetadata> metaBuf = new ConcurrentHashMap<>();
- /** */
- private UUID metaCacheQryId;
-
/**
* @param ctx Kernal context.
*/
@@ -260,6 +257,17 @@ public class CacheObjectBinaryProcessorImpl extends IgniteCacheObjectProcessorIm
}
/** {@inheritDoc} */
+ @Override public void onContinuousProcessorStarted(GridKernalContext ctx) throws IgniteCheckedException {
+ if (clientNode && !ctx.isDaemon()) {
+ ctx.continuous().registerStaticRoutine(
+ CU.UTILITY_CACHE_NAME,
+ new MetaDataEntryListener(),
+ new MetaDataEntryFilter(),
+ null);
+ }
+ }
+
+ /** {@inheritDoc} */
@SuppressWarnings("unchecked")
@Override public void onUtilityCacheStarted() throws IgniteCheckedException {
IgniteCacheProxy<Object, Object> proxy = ctx.cache().jcache(CU.UTILITY_CACHE_NAME);
@@ -276,13 +284,6 @@ public class CacheObjectBinaryProcessorImpl extends IgniteCacheObjectProcessorIm
if (clientNode) {
assert !metaDataCache.context().affinityNode();
- metaCacheQryId = metaDataCache.context().continuousQueries().executeInternalQuery(
- new MetaDataEntryListener(),
- new MetaDataEntryFilter(),
- false,
- true,
- false);
-
while (true) {
ClusterNode oldestSrvNode =
CU.oldestAliveCacheServerNode(ctx.cache().context(), AffinityTopologyVersion.NONE);
@@ -363,14 +364,6 @@ public class CacheObjectBinaryProcessorImpl extends IgniteCacheObjectProcessorIm
}
}
- /** {@inheritDoc} */
- @Override public void onKernalStop(boolean cancel) {
- super.onKernalStop(cancel);
-
- if (metaCacheQryId != null)
- metaDataCache.context().continuousQueries().cancelInternalQuery(metaCacheQryId);
- }
-
/**
* @param key Metadata key.
* @param newMeta Metadata.
http://git-wip-us.apache.org/repos/asf/ignite/blob/7f878c56/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
index b042249..c966527 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
@@ -31,6 +31,7 @@ import java.util.UUID;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
+import javax.cache.Cache;
import javax.cache.configuration.CacheEntryListenerConfiguration;
import javax.cache.configuration.Factory;
import javax.cache.event.CacheEntryCreatedListener;
@@ -46,6 +47,7 @@ import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.cache.CacheEntryEventSerializableFilter;
import org.apache.ignite.cache.CacheMode;
+import org.apache.ignite.cache.query.CacheQueryEntryEvent;
import org.apache.ignite.cache.query.ContinuousQuery;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.internal.GridKernalContext;
@@ -57,8 +59,10 @@ import org.apache.ignite.internal.processors.cache.KeyCacheObject;
import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicUpdateFuture;
import org.apache.ignite.internal.processors.continuous.GridContinuousHandler;
import org.apache.ignite.internal.processors.continuous.GridContinuousProcessor;
+import org.apache.ignite.internal.util.tostring.GridToStringInclude;
import org.apache.ignite.internal.util.typedef.CI2;
import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteAsyncCallback;
import org.apache.ignite.lang.IgniteClosure;
@@ -728,6 +732,70 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter {
}
/**
+ * @param keepBinary Keep binary flag.
+ * @param filter Filter.
+ * @return Iterable for events created for existing cache entries.
+ * @throws IgniteCheckedException If failed.
+ */
+ public Iterable<CacheEntryEvent<?, ?>> existingEntries(final boolean keepBinary, final CacheEntryEventFilter filter)
+ throws IgniteCheckedException {
+ final Iterator<Cache.Entry<?, ?>> it = cctx.cache().igniteIterator(keepBinary);
+
+ final Cache cache = cctx.kernalContext().cache().jcache(cctx.name());
+
+ return new Iterable<CacheEntryEvent<?, ?>>() {
+ @Override public Iterator<CacheEntryEvent<?, ?>> iterator() {
+ return new Iterator<CacheEntryEvent<?, ?>>() {
+ private CacheQueryEntryEvent<?, ?> next;
+
+ {
+ advance();
+ }
+
+ @Override public boolean hasNext() {
+ return next != null;
+ }
+
+ @Override public CacheEntryEvent<?, ?> next() {
+ if (!hasNext())
+ throw new NoSuchElementException();
+
+ CacheEntryEvent next0 = next;
+
+ advance();
+
+ return next0;
+ }
+
+ @Override public void remove() {
+ throw new UnsupportedOperationException();
+ }
+
+ private void advance() {
+ next = null;
+
+ while (next == null) {
+ if (!it.hasNext())
+ break;
+
+ Cache.Entry e = it.next();
+
+ next = new CacheEntryEventImpl(
+ cache,
+ CREATED,
+ e.getKey(),
+ e.getValue());
+
+ if (filter != null && !filter.evaluate(next))
+ next = null;
+ }
+ }
+ };
+ }
+ };
+ }
+
+ /**
* @param nodes Nodes.
* @return {@code True} if all nodes greater than {@link GridContinuousProcessor#QUERY_MSG_VER_2_SINCE},
* otherwise {@code false}.
@@ -1129,4 +1197,71 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter {
lsnr.acknowledgeBackupOnTimeout(ctx);
}
}
+
+ /**
+ *
+ */
+ private static class CacheEntryEventImpl extends CacheQueryEntryEvent {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** */
+ @GridToStringInclude
+ private Object key;
+
+ /** */
+ @GridToStringInclude
+ private Object val;
+
+ /**
+ * @param src Event source.
+ * @param evtType Event type.
+ * @param key Key.
+ * @param val Value.
+ */
+ public CacheEntryEventImpl(Cache src, EventType evtType, Object key, Object val) {
+ super(src, evtType);
+
+ this.key = key;
+ this.val = val;
+ }
+
+ /** {@inheritDoc} */
+ @Override public long getPartitionUpdateCounter() {
+ return 0;
+ }
+
+ /** {@inheritDoc} */
+ @Override public Object getOldValue() {
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean isOldValueAvailable() {
+ return false;
+ }
+
+ /** {@inheritDoc} */
+ @Override public Object getKey() {
+ return key;
+ }
+
+ /** {@inheritDoc} */
+ @Override public Object getValue() {
+ return val;
+ }
+
+ /** {@inheritDoc} */
+ @Override public Object unwrap(Class cls) {
+ if (cls.isAssignableFrom(getClass()))
+ return cls.cast(this);
+
+ throw new IllegalArgumentException("Unwrapping to class is not supported: " + cls);
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(CacheEntryEventImpl.class, this);
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/7f878c56/modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/IgniteCacheObjectProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/IgniteCacheObjectProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/IgniteCacheObjectProcessor.java
index cadf1a9..99de507 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/IgniteCacheObjectProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/IgniteCacheObjectProcessor.java
@@ -22,6 +22,7 @@ import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.internal.GridComponent;
+import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.processors.GridProcessor;
import org.apache.ignite.internal.processors.cache.CacheObject;
import org.apache.ignite.internal.processors.cache.CacheObjectContext;
@@ -34,6 +35,12 @@ import org.jetbrains.annotations.Nullable;
*/
public interface IgniteCacheObjectProcessor extends GridProcessor {
/**
+ * @param ctx Context.
+ * @throws IgniteCheckedException If failed.
+ */
+ public void onContinuousProcessorStarted(GridKernalContext ctx) throws IgniteCheckedException;
+
+ /**
* @see GridComponent#onKernalStart()
* @throws IgniteCheckedException If failed.
*/
http://git-wip-us.apache.org/repos/asf/ignite/blob/7f878c56/modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/IgniteCacheObjectProcessorImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/IgniteCacheObjectProcessorImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/IgniteCacheObjectProcessorImpl.java
index 75d65de..6630c7a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/IgniteCacheObjectProcessorImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/IgniteCacheObjectProcessorImpl.java
@@ -235,6 +235,11 @@ public class IgniteCacheObjectProcessorImpl extends GridProcessorAdapter impleme
}
/** {@inheritDoc} */
+ @Override public void onContinuousProcessorStarted(GridKernalContext ctx) throws IgniteCheckedException {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
@Override public void onUtilityCacheStarted() throws IgniteCheckedException {
// No-op.
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/7f878c56/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
index fd798df..e96e646 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
@@ -30,12 +30,15 @@ import java.util.Iterator;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
+import javax.cache.event.CacheEntryUpdatedListener;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
+import org.apache.ignite.cache.CacheEntryEventSerializableFilter;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.events.DiscoveryEvent;
import org.apache.ignite.events.Event;
@@ -64,7 +67,6 @@ import org.apache.ignite.internal.util.future.GridFutureAdapter;
import org.apache.ignite.internal.util.tostring.GridToStringInclude;
import org.apache.ignite.internal.util.typedef.CI1;
import org.apache.ignite.internal.util.typedef.F;
-import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.internal.util.worker.GridWorker;
@@ -83,6 +85,7 @@ import org.jsr166.ConcurrentHashMap8;
import static org.apache.ignite.events.EventType.EVT_NODE_FAILED;
import static org.apache.ignite.events.EventType.EVT_NODE_LEFT;
import static org.apache.ignite.events.EventType.EVT_NODE_SEGMENTED;
+import static org.apache.ignite.internal.GridTopic.TOPIC_CACHE;
import static org.apache.ignite.internal.GridTopic.TOPIC_CONTINUOUS;
import static org.apache.ignite.internal.managers.communication.GridIoPolicy.SYSTEM_POOL;
import static org.apache.ignite.internal.processors.continuous.GridContinuousMessageType.MSG_EVT_ACK;
@@ -137,6 +140,9 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
/** */
private boolean processorStopped;
+ /** Query sequence number for message topic. */
+ private final AtomicLong seq = new AtomicLong();
+
/**
* @param ctx Kernal context.
*/
@@ -255,13 +261,11 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
UUID routineId = msg.routineId();
unregisterRemote(routineId);
+ }
- if (snd.isClient()) {
- Map<UUID, LocalRoutineInfo> clientRoutineMap = clientInfos.get(snd.id());
-
- if (clientRoutineMap != null)
- clientRoutineMap.remove(msg.routineId());
- }
+ for (Map<UUID, LocalRoutineInfo> clientInfo : clientInfos.values()) {
+ if (clientInfo.remove(msg.routineId()) != null)
+ break;
}
}
});
@@ -310,6 +314,12 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
}
});
+ ctx.marshallerContext().onContinuousProcessorStarted(ctx);
+
+ ctx.cacheObjects().onContinuousProcessorStarted(ctx);
+
+ ctx.service().onContinuousProcessorStarted(ctx);
+
if (log.isDebugEnabled())
log.debug("Continuous processor started.");
}
@@ -393,16 +403,33 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
/** {@inheritDoc} */
@Override @Nullable public Serializable collectDiscoveryData(UUID nodeId) {
+ if (log.isDebugEnabled()) {
+ log.debug("collectDiscoveryData [node=" + nodeId +
+ ", loc=" + ctx.localNodeId() +
+ ", locInfos=" + locInfos +
+ ", clientInfos=" + clientInfos +
+ ']');
+ }
+
if (!nodeId.equals(ctx.localNodeId()) || !locInfos.isEmpty()) {
Map<UUID, Map<UUID, LocalRoutineInfo>> clientInfos0 = U.newHashMap(clientInfos.size());
for (Map.Entry<UUID, Map<UUID, LocalRoutineInfo>> e : clientInfos.entrySet()) {
- Map<UUID, LocalRoutineInfo> copy = U.newHashMap(e.getValue().size());
+ Map<UUID, LocalRoutineInfo> cp = U.newHashMap(e.getValue().size());
for (Map.Entry<UUID, LocalRoutineInfo> e0 : e.getValue().entrySet())
- copy.put(e0.getKey(), e0.getValue());
+ cp.put(e0.getKey(), e0.getValue());
+
+ clientInfos0.put(e.getKey(), cp);
+ }
+
+ if (nodeId.equals(ctx.localNodeId()) && ctx.discovery().localNode().isClient()) {
+ Map<UUID, LocalRoutineInfo> infos = new HashMap<>();
- clientInfos0.put(e.getKey(), copy);
+ for (Map.Entry<UUID, LocalRoutineInfo> e : locInfos.entrySet())
+ infos.put(e.getKey(), e.getValue());
+
+ clientInfos0.put(ctx.localNodeId(), infos);
}
DiscoveryData data = new DiscoveryData(ctx.localNodeId(), clientInfos0);
@@ -430,6 +457,14 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
@Override public void onDiscoveryDataReceived(UUID joiningNodeId, UUID rmtNodeId, Serializable obj) {
DiscoveryData data = (DiscoveryData)obj;
+ if (log.isDebugEnabled()) {
+ log.info("onDiscoveryDataReceived [joining=" + joiningNodeId +
+ ", rmtNodeId=" + rmtNodeId +
+ ", loc=" + ctx.localNodeId() +
+ ", data=" + data +
+ ']');
+ }
+
if (!ctx.isDaemon() && data != null) {
for (DiscoveryDataItem item : data.items) {
try {
@@ -457,29 +492,31 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
for (Map.Entry<UUID, Map<UUID, LocalRoutineInfo>> entry : data.clientInfos.entrySet()) {
UUID clientNodeId = entry.getKey();
- Map<UUID, LocalRoutineInfo> clientRoutineMap = entry.getValue();
+ if (!ctx.localNodeId().equals(clientNodeId)) {
+ Map<UUID, LocalRoutineInfo> clientRoutineMap = entry.getValue();
- for (Map.Entry<UUID, LocalRoutineInfo> e : clientRoutineMap.entrySet()) {
- UUID routineId = e.getKey();
- LocalRoutineInfo info = e.getValue();
+ for (Map.Entry<UUID, LocalRoutineInfo> e : clientRoutineMap.entrySet()) {
+ UUID routineId = e.getKey();
+ LocalRoutineInfo info = e.getValue();
- try {
- if (info.prjPred != null)
- ctx.resource().injectGeneric(info.prjPred);
-
- if (info.prjPred == null || info.prjPred.apply(ctx.discovery().localNode())) {
- if (registerHandler(clientNodeId,
- routineId,
- info.hnd,
- info.bufSize,
- info.interval,
- info.autoUnsubscribe,
- false))
- info.hnd.onListenerRegistered(routineId, ctx);
+ try {
+ if (info.prjPred != null)
+ ctx.resource().injectGeneric(info.prjPred);
+
+ if (info.prjPred == null || info.prjPred.apply(ctx.discovery().localNode())) {
+ if (registerHandler(clientNodeId,
+ routineId,
+ info.hnd,
+ info.bufSize,
+ info.interval,
+ info.autoUnsubscribe,
+ false))
+ info.hnd.onListenerRegistered(routineId, ctx);
+ }
+ }
+ catch (IgniteCheckedException err) {
+ U.error(log, "Failed to register continuous handler.", err);
}
- }
- catch (IgniteCheckedException err) {
- U.error(log, "Failed to register continuous handler.", err);
}
}
@@ -537,6 +574,47 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
}
/**
+ * Registers routine info to be sent in discovery data during this node join
+ * (to be used for internal queries started from client nodes).
+ *
+ * @param cacheName Cache name.
+ * @param locLsnr Local listener.
+ * @param rmtFilter Remote filter.
+ * @param prjPred Projection predicate.
+ * @return Routine ID.
+ * @throws IgniteCheckedException If failed.
+ */
+ public UUID registerStaticRoutine(
+ String cacheName,
+ CacheEntryUpdatedListener<?, ?> locLsnr,
+ CacheEntryEventSerializableFilter rmtFilter,
+ @Nullable IgnitePredicate<ClusterNode> prjPred) throws IgniteCheckedException {
+ String topicPrefix = "CONTINUOUS_QUERY_STATIC" + "_" + cacheName;
+
+ CacheContinuousQueryHandler hnd = new CacheContinuousQueryHandler(
+ cacheName,
+ TOPIC_CACHE.topic(topicPrefix, ctx.localNodeId(), seq.incrementAndGet()),
+ locLsnr,
+ rmtFilter,
+ true,
+ false,
+ true,
+ false);
+
+ hnd.internal(true);
+
+ final UUID routineId = UUID.randomUUID();
+
+ LocalRoutineInfo routineInfo = new LocalRoutineInfo(prjPred, hnd, 1, 0, true);
+
+ locInfos.put(routineId, routineInfo);
+
+ registerMessageListener(hnd);
+
+ return routineId;
+ }
+
+ /**
* @param hnd Handler.
* @param bufSize Buffer size.
* @param interval Time interval.
@@ -610,29 +688,7 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
}
// Register per-routine notifications listener if ordered messaging is used.
- if (hnd.orderedTopic() != null) {
- ctx.io().addMessageListener(hnd.orderedTopic(), new GridMessageListener() {
- @Override public void onMessage(UUID nodeId, Object obj) {
- GridContinuousMessage msg = (GridContinuousMessage)obj;
-
- // Only notification can be ordered.
- assert msg.type() == MSG_EVT_NOTIFICATION;
-
- if (msg.data() == null && msg.dataBytes() != null) {
- try {
- msg.data(marsh.unmarshal(msg.dataBytes(), U.resolveClassLoader(ctx.config())));
- }
- catch (IgniteCheckedException e) {
- U.error(log, "Failed to process message (ignoring): " + msg, e);
-
- return;
- }
- }
-
- processNotification(nodeId, msg);
- }
- });
- }
+ registerMessageListener(hnd);
StartFuture fut = new StartFuture(ctx, routineId);
@@ -664,6 +720,35 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
}
/**
+ * @param hnd Handler.
+ */
+ private void registerMessageListener(GridContinuousHandler hnd) {
+ if (hnd.orderedTopic() != null) {
+ ctx.io().addMessageListener(hnd.orderedTopic(), new GridMessageListener() {
+ @Override public void onMessage(UUID nodeId, Object obj) {
+ GridContinuousMessage msg = (GridContinuousMessage)obj;
+
+ // Only notification can be ordered.
+ assert msg.type() == MSG_EVT_NOTIFICATION;
+
+ if (msg.data() == null && msg.dataBytes() != null) {
+ try {
+ msg.data(marsh.unmarshal(msg.dataBytes(), U.resolveClassLoader(ctx.config())));
+ }
+ catch (IgniteCheckedException e) {
+ U.error(log, "Failed to process message (ignoring): " + msg, e);
+
+ return;
+ }
+ }
+
+ processNotification(nodeId, msg);
+ }
+ });
+ }
+ }
+
+ /**
* @param routineId Consume ID.
* @return Future.
*/
@@ -807,12 +892,28 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
@Override public void onDisconnected(IgniteFuture<?> reconnectFut) throws IgniteCheckedException {
cancelFutures(new IgniteClientDisconnectedCheckedException(reconnectFut, "Client node disconnected."));
- for (UUID rmtId : rmtInfos.keySet())
- unregisterRemote(rmtId);
+ if (log.isDebugEnabled()) {
+ log.debug("onDisconnected [rmtInfos=" + rmtInfos +
+ ", locInfos=" + locInfos +
+ ", clientInfos=" + clientInfos + ']');
+ }
+
+ for (Map.Entry<UUID, RemoteRoutineInfo> e : rmtInfos.entrySet()) {
+ RemoteRoutineInfo info = e.getValue();
+
+ if (!ctx.localNodeId().equals(info.nodeId) || info.autoUnsubscribe)
+ unregisterRemote(e.getKey());
+ }
rmtInfos.clear();
clientInfos.clear();
+
+ if (log.isDebugEnabled()) {
+ log.debug("after onDisconnected [rmtInfos=" + rmtInfos +
+ ", locInfos=" + locInfos +
+ ", clientInfos=" + clientInfos + ']');
+ }
}
/**
@@ -1038,6 +1139,9 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
}
if (doRegister) {
+ if (log.isDebugEnabled())
+ log.debug("Register handler: [nodeId=" + nodeId + ", routineId=" + routineId + ", info=" + info + ']');
+
if (interval > 0) {
IgniteThread checker = new IgniteThread(new GridWorker(ctx.gridName(), "continuous-buffer-checker", log) {
@SuppressWarnings("ConstantConditions")
@@ -1152,6 +1256,9 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
stopLock.unlock();
}
+ if (log.isDebugEnabled())
+ log.debug("unregisterRemote [routineId=" + routineId + ", loc=" + loc + ", rmt=" + remote + ']');
+
if (remote != null)
unregisterHandler(routineId, remote.hnd, false);
else if (loc != null) {
http://git-wip-us.apache.org/repos/asf/ignite/blob/7f878c56/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
index 97d9988..853e6bf 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
@@ -60,6 +60,7 @@ import org.apache.ignite.internal.processors.cache.CacheIteratorConverter;
import org.apache.ignite.internal.processors.cache.IgniteInternalCache;
import org.apache.ignite.internal.processors.cache.query.CacheQuery;
import org.apache.ignite.internal.processors.cache.query.GridCacheQueryManager;
+import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryManager;
import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
import org.apache.ignite.internal.processors.task.GridInternal;
import org.apache.ignite.internal.processors.timeout.GridTimeoutObject;
@@ -144,12 +145,6 @@ public class GridServiceProcessor extends GridProcessorAdapter {
/** Topology listener. */
private GridLocalEventListener topLsnr = new TopologyListener();
- /** Deployment listener ID. */
- private UUID cfgQryId;
-
- /** Assignment listener ID. */
- private UUID assignQryId;
-
/**
* @param ctx Kernal context.
*/
@@ -166,6 +161,22 @@ public class GridServiceProcessor extends GridProcessorAdapter {
new ServicesCompatibilityState(srvcCompatibilitySysProp != null ? srvcCompatibilitySysProp : false, false));
}
+ /**
+ * @param ctx Context.
+ * @throws IgniteCheckedException If failed.
+ */
+ public void onContinuousProcessorStarted(GridKernalContext ctx) throws IgniteCheckedException {
+ if (ctx.clientNode()) {
+ assert !ctx.isDaemon();
+
+ ctx.continuous().registerStaticRoutine(
+ CU.UTILITY_CACHE_NAME,
+ new ServiceEntriesListener(),
+ null,
+ null);
+ }
+ }
+
/** {@inheritDoc} */
@Override public void start() throws IgniteCheckedException {
ctx.addNodeAttribute(ATTR_SERVICES_COMPATIBILITY_MODE, srvcCompatibilitySysProp);
@@ -197,13 +208,32 @@ public class GridServiceProcessor extends GridProcessorAdapter {
if (ctx.deploy().enabled())
ctx.cache().context().deploy().ignoreOwnership(true);
- boolean affNode = cache.context().affinityNode();
+ if (!ctx.clientNode()) {
+ assert cache.context().affinityNode();
+
+ cache.context().continuousQueries().executeInternalQuery(new ServiceEntriesListener(),
+ null,
+ true,
+ true,
+ false);
+ }
+ else {
+ assert !ctx.isDaemon();
- cfgQryId = cache.context().continuousQueries().executeInternalQuery(
- new DeploymentListener(), null, affNode, true, !affNode);
+ ctx.closure().runLocalSafe(new Runnable() {
+ @Override public void run() {
+ try {
+ Iterable<CacheEntryEvent<?, ?>> entries =
+ cache.context().continuousQueries().existingEntries(false, null);
- assignQryId = cache.context().continuousQueries().executeInternalQuery(
- new AssignmentListener(), null, affNode, true, !affNode);
+ onSystemCacheUpdated(entries);
+ }
+ catch (IgniteCheckedException e) {
+ U.error(log, "Failed to load service entries: " + e, e);
+ }
+ }
+ });
+ }
}
finally {
if (ctx.deploy().enabled())
@@ -237,12 +267,6 @@ public class GridServiceProcessor extends GridProcessorAdapter {
if (!ctx.clientNode())
ctx.event().removeLocalEventListener(topLsnr);
- if (cfgQryId != null)
- cache.context().continuousQueries().cancelInternalQuery(cfgQryId);
-
- if (assignQryId != null)
- cache.context().continuousQueries().cancelInternalQuery(assignQryId);
-
Collection<ServiceContextImpl> ctxs = new ArrayList<>();
synchronized (locSvcs) {
@@ -1284,148 +1308,171 @@ public class GridServiceProcessor extends GridProcessorAdapter {
/**
* Service deployment listener.
*/
- private class DeploymentListener implements CacheEntryUpdatedListener<Object, Object> {
+ @SuppressWarnings("unchecked")
+ private class ServiceEntriesListener implements CacheEntryUpdatedListener<Object, Object> {
/** {@inheritDoc} */
@Override public void onUpdated(final Iterable<CacheEntryEvent<?, ?>> deps) {
depExe.submit(new BusyRunnable() {
@Override public void run0() {
- boolean firstTime = true;
+ onSystemCacheUpdated(deps);
+ }
+ });
+ }
+ }
- for (CacheEntryEvent<?, ?> e : deps) {
- if (!(e.getKey() instanceof GridServiceDeploymentKey))
- continue;
+ /**
+ * @param evts Update events.
+ */
+ private void onSystemCacheUpdated(final Iterable<CacheEntryEvent<?, ?>> evts) {
+ boolean firstTime = true;
- if (firstTime) {
- markCompatibilityStateAsUsed();
+ for (CacheEntryEvent<?, ?> e : evts) {
+ if (e.getKey() instanceof GridServiceDeploymentKey) {
+ if (firstTime) {
+ markCompatibilityStateAsUsed();
- firstTime = false;
- }
+ firstTime = false;
+ }
- GridServiceDeployment dep;
+ processDeployment((CacheEntryEvent)e);
+ }
+ else if (e.getKey() instanceof GridServiceAssignmentsKey) {
+ if (firstTime) {
+ markCompatibilityStateAsUsed();
- try {
- dep = (GridServiceDeployment)e.getValue();
- }
- catch (IgniteException ex) {
- if (X.hasCause(ex, ClassNotFoundException.class))
- continue;
- else
- throw ex;
- }
+ firstTime = false;
+ }
+
+ processAssignment((CacheEntryEvent)e);
+ }
+ }
+ }
- if (dep != null) {
- svcName.set(dep.configuration().getName());
+ /**
+ * @param e Entry.
+ */
+ private void processDeployment(CacheEntryEvent<GridServiceDeploymentKey, GridServiceDeployment> e) {
+ GridServiceDeployment dep;
- // Ignore other utility cache events.
- long topVer = ctx.discovery().topologyVersion();
+ try {
+ dep = e.getValue();
+ }
+ catch (IgniteException ex) {
+ if (X.hasCause(ex, ClassNotFoundException.class))
+ return;
+ else
+ throw ex;
+ }
- ClusterNode oldest = U.oldest(ctx.discovery().nodes(topVer), null);
+ if (dep != null) {
+ svcName.set(dep.configuration().getName());
- if (oldest.isLocal())
- onDeployment(dep, topVer);
- }
- // Handle undeployment.
- else {
- String name = ((GridServiceDeploymentKey)e.getKey()).name();
+ // Ignore other utility cache events.
+ long topVer = ctx.discovery().topologyVersion();
- svcName.set(name);
+ ClusterNode oldest = U.oldest(ctx.discovery().nodes(topVer), null);
- Collection<ServiceContextImpl> ctxs;
+ if (oldest.isLocal())
+ onDeployment(dep, topVer);
+ }
+ // Handle undeployment.
+ else {
+ String name = e.getKey().name();
- synchronized (locSvcs) {
- ctxs = locSvcs.remove(name);
- }
+ svcName.set(name);
- if (ctxs != null) {
- synchronized (ctxs) {
- cancel(ctxs, ctxs.size());
- }
- }
+ Collection<ServiceContextImpl> ctxs;
- // Finish deployment futures if undeployment happened.
- GridFutureAdapter<?> fut = depFuts.remove(name);
+ synchronized (locSvcs) {
+ ctxs = locSvcs.remove(name);
+ }
- if (fut != null)
- fut.onDone();
+ if (ctxs != null) {
+ synchronized (ctxs) {
+ cancel(ctxs, ctxs.size());
+ }
+ }
- // Complete undeployment future.
- fut = undepFuts.remove(name);
+ // Finish deployment futures if undeployment happened.
+ GridFutureAdapter<?> fut = depFuts.remove(name);
- if (fut != null)
- fut.onDone();
+ if (fut != null)
+ fut.onDone();
- GridServiceAssignmentsKey key = new GridServiceAssignmentsKey(name);
+ // Complete undeployment future.
+ fut = undepFuts.remove(name);
- // Remove assignment on primary node in case of undeploy.
- if (cache.cache().affinity().isPrimary(ctx.discovery().localNode(), key)) {
- try {
- cache.getAndRemove(key);
- }
- catch (IgniteCheckedException ex) {
- U.error(log, "Failed to remove assignments for undeployed service: " + name, ex);
- }
- }
- }
- }
+ if (fut != null)
+ fut.onDone();
+
+ GridServiceAssignmentsKey key = new GridServiceAssignmentsKey(name);
+
+ // Remove assignment on primary node in case of undeploy.
+ if (cache.cache().affinity().isPrimary(ctx.discovery().localNode(), key)) {
+ try {
+ cache.getAndRemove(key);
}
- });
+ catch (IgniteCheckedException ex) {
+ U.error(log, "Failed to remove assignments for undeployed service: " + name, ex);
+ }
+ }
}
+ }
- /**
- * Deployment callback.
- *
- * @param dep Service deployment.
- * @param topVer Topology version.
- */
- private void onDeployment(final GridServiceDeployment dep, final long topVer) {
- // Retry forever.
- try {
- long newTopVer = ctx.discovery().topologyVersion();
+ /**
+ * Deployment callback.
+ *
+ * @param dep Service deployment.
+ * @param topVer Topology version.
+ */
+ private void onDeployment(final GridServiceDeployment dep, final long topVer) {
+ // Retry forever.
+ try {
+ long newTopVer = ctx.discovery().topologyVersion();
- // If topology version changed, reassignment will happen from topology event.
- if (newTopVer == topVer)
- reassign(dep, topVer);
- }
- catch (IgniteCheckedException e) {
- if (!(e instanceof ClusterTopologyCheckedException))
- log.error("Failed to do service reassignment (will retry): " + dep.configuration().getName(), e);
+ // If topology version changed, reassignment will happen from topology event.
+ if (newTopVer == topVer)
+ reassign(dep, topVer);
+ }
+ catch (IgniteCheckedException e) {
+ if (!(e instanceof ClusterTopologyCheckedException))
+ log.error("Failed to do service reassignment (will retry): " + dep.configuration().getName(), e);
- long newTopVer = ctx.discovery().topologyVersion();
+ long newTopVer = ctx.discovery().topologyVersion();
- if (newTopVer != topVer) {
- assert newTopVer > topVer;
+ if (newTopVer != topVer) {
+ assert newTopVer > topVer;
- // Reassignment will happen from topology event.
- return;
- }
+ // Reassignment will happen from topology event.
+ return;
+ }
- ctx.timeout().addTimeoutObject(new GridTimeoutObject() {
- private IgniteUuid id = IgniteUuid.randomUuid();
+ ctx.timeout().addTimeoutObject(new GridTimeoutObject() {
+ private IgniteUuid id = IgniteUuid.randomUuid();
- private long start = System.currentTimeMillis();
+ private long start = System.currentTimeMillis();
- @Override public IgniteUuid timeoutId() {
- return id;
- }
+ @Override public IgniteUuid timeoutId() {
+ return id;
+ }
- @Override public long endTime() {
- return start + RETRY_TIMEOUT;
- }
+ @Override public long endTime() {
+ return start + RETRY_TIMEOUT;
+ }
- @Override public void onTimeout() {
- if (!busyLock.enterBusy())
- return;
+ @Override public void onTimeout() {
+ if (!busyLock.enterBusy())
+ return;
- try {
- // Try again.
- onDeployment(dep, topVer);
- }
- finally {
- busyLock.leaveBusy();
- }
+ try {
+ // Try again.
+ onDeployment(dep, topVer);
}
- });
- }
+ finally {
+ busyLock.leaveBusy();
+ }
+ }
+ });
}
}
@@ -1585,79 +1632,59 @@ public class GridServiceProcessor extends GridProcessorAdapter {
}
/**
- * Assignment listener.
+ * @param e Entry.
*/
- private class AssignmentListener implements CacheEntryUpdatedListener<Object, Object> {
- /** {@inheritDoc} */
- @Override public void onUpdated(final Iterable<CacheEntryEvent<?, ?>> assignCol) throws CacheEntryListenerException {
- depExe.submit(new BusyRunnable() {
- @Override public void run0() {
- boolean firstTime = true;
-
- for (CacheEntryEvent<?, ?> e : assignCol) {
- if (!(e.getKey() instanceof GridServiceAssignmentsKey))
- continue;
-
- if (firstTime) {
- markCompatibilityStateAsUsed();
+ private void processAssignment(CacheEntryEvent<GridServiceAssignmentsKey, GridServiceAssignments> e) {
+ GridServiceAssignments assigns;
- firstTime = false;
- }
-
- GridServiceAssignments assigns;
-
- try {
- assigns = (GridServiceAssignments)e.getValue();
- }
- catch (IgniteException ex) {
- if (X.hasCause(ex, ClassNotFoundException.class))
- continue;
- else
- throw ex;
- }
+ try {
+ assigns = e.getValue();
+ }
+ catch (IgniteException ex) {
+ if (X.hasCause(ex, ClassNotFoundException.class))
+ return;
+ else
+ throw ex;
+ }
- if (assigns != null) {
- svcName.set(assigns.name());
+ if (assigns != null) {
+ svcName.set(assigns.name());
- Throwable t = null;
+ Throwable t = null;
- try {
- redeploy(assigns);
- }
- catch (Error | RuntimeException th) {
- t = th;
- }
+ try {
+ redeploy(assigns);
+ }
+ catch (Error | RuntimeException th) {
+ t = th;
+ }
- GridServiceDeploymentFuture fut = depFuts.get(assigns.name());
+ GridServiceDeploymentFuture fut = depFuts.get(assigns.name());
- if (fut != null && fut.configuration().equalsIgnoreNodeFilter(assigns.configuration())) {
- depFuts.remove(assigns.name(), fut);
+ if (fut != null && fut.configuration().equalsIgnoreNodeFilter(assigns.configuration())) {
+ depFuts.remove(assigns.name(), fut);
- // Complete deployment futures once the assignments have been stored in cache.
- fut.onDone(null, t);
- }
- }
- // Handle undeployment.
- else {
- String name = ((GridServiceAssignmentsKey)e.getKey()).name();
+ // Complete deployment futures once the assignments have been stored in cache.
+ fut.onDone(null, t);
+ }
+ }
+ // Handle undeployment.
+ else {
+ String name = e.getKey().name();
- svcName.set(name);
+ svcName.set(name);
- Collection<ServiceContextImpl> ctxs;
+ Collection<ServiceContextImpl> ctxs;
- synchronized (locSvcs) {
- ctxs = locSvcs.remove(name);
- }
+ synchronized (locSvcs) {
+ ctxs = locSvcs.remove(name);
+ }
- if (ctxs != null) {
- synchronized (ctxs) {
- cancel(ctxs, ctxs.size());
- }
- }
- }
- }
+ if (ctxs != null) {
+ synchronized (ctxs) {
+ cancel(ctxs, ctxs.size());
}
- });
+ }
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/7f878c56/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectContinuousProcessorTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectContinuousProcessorTest.java b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectContinuousProcessorTest.java
index 4c44adc..0ff5883 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectContinuousProcessorTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectContinuousProcessorTest.java
@@ -125,6 +125,7 @@ public class IgniteClientReconnectContinuousProcessorTest extends IgniteClientRe
}
/**
+ * @param stopFromClient If {@code true} stops listener from client node, otherwise from server.
* @throws Exception If failed.
*/
private void testMessageListenerReconnect(boolean stopFromClient) throws Exception {
@@ -178,9 +179,11 @@ public class IgniteClientReconnectContinuousProcessorTest extends IgniteClientRe
assertTrue(locLsnr.latch.await(5000, MILLISECONDS));
assertTrue(latch.await(5000, MILLISECONDS));
- log.info("Stop listen, should not get remote messages anymore.");
+ Ignite stopFrom = (stopFromClient ? client : srv);
- (stopFromClient ? client : srv).message().stopRemoteListen(opId);
+ log.info("Stop listen, should not get remote messages anymore [from=" + stopFrom.name() + ']');
+
+ stopFrom.message().stopRemoteListen(opId);
srv.message().send(topic, "msg3");
@@ -243,6 +246,59 @@ public class IgniteClientReconnectContinuousProcessorTest extends IgniteClientRe
}
/**
+ * @throws Exception If failed.
+ */
+ public void testCacheContinuousQueryReconnectNewServer() throws Exception {
+ Ignite client = grid(serverCount());
+
+ assertTrue(client.cluster().localNode().isClient());
+
+ IgniteCache<Object, Object> clientCache = client.getOrCreateCache(new CacheConfiguration<>());
+
+ CacheEventListener lsnr = new CacheEventListener();
+
+ ContinuousQuery<Object, Object> qry = new ContinuousQuery<>();
+
+ qry.setAutoUnsubscribe(true);
+
+ qry.setLocalListener(lsnr);
+
+ QueryCursor<?> cur = clientCache.query(qry);
+
+ continuousQueryReconnect(client, clientCache, lsnr);
+
+ // Check new server registers listener for reconnected client.
+ try (Ignite newSrv = startGrid(serverCount() + 1)) {
+ awaitPartitionMapExchange();
+
+ lsnr.latch = new CountDownLatch(10);
+
+ IgniteCache<Object, Object> newSrvCache = newSrv.cache(null);
+
+ for (Integer key : primaryKeys(newSrvCache, 10))
+ newSrvCache.put(key, key);
+
+ assertTrue(lsnr.latch.await(5000, MILLISECONDS));
+ }
+
+ cur.close();
+
+ // Check new server does not register listener for closed query.
+ try (Ignite newSrv = startGrid(serverCount() + 1)) {
+ awaitPartitionMapExchange();
+
+ lsnr.latch = new CountDownLatch(5);
+
+ IgniteCache<Object, Object> newSrvCache = newSrv.cache(null);
+
+ for (Integer key : primaryKeys(newSrvCache, 5))
+ newSrvCache.put(key, key);
+
+ assertFalse(lsnr.latch.await(3000, MILLISECONDS));
+ }
+ }
+
+ /**
* @param client Client.
* @param clientCache Client cache.
* @param lsnr Continuous query listener.
http://git-wip-us.apache.org/repos/asf/ignite/blob/7f878c56/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryAbstractSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryAbstractSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryAbstractSelfTest.java
index 3ae6bb4..3d238af 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryAbstractSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryAbstractSelfTest.java
@@ -209,7 +209,7 @@ public abstract class GridCacheContinuousQueryAbstractSelfTest extends GridCommo
for (int i = 0; i < gridCount(); i++) {
GridContinuousProcessor proc = grid(i).context().continuous();
- assertEquals(String.valueOf(i), 3, ((Map)U.field(proc, "locInfos")).size());
+ assertEquals(String.valueOf(i), 2, ((Map)U.field(proc, "locInfos")).size());
assertEquals(String.valueOf(i), 0, ((Map)U.field(proc, "rmtInfos")).size());
assertEquals(String.valueOf(i), 0, ((Map)U.field(proc, "startFuts")).size());
assertEquals(String.valueOf(i), 0, ((Map)U.field(proc, "stopFuts")).size());
http://git-wip-us.apache.org/repos/asf/ignite/blob/7f878c56/modules/core/src/test/java/org/apache/ignite/internal/processors/continuous/IgniteNoCustomEventsOnNodeStart.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/continuous/IgniteNoCustomEventsOnNodeStart.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/continuous/IgniteNoCustomEventsOnNodeStart.java
new file mode 100644
index 0000000..05537c0
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/continuous/IgniteNoCustomEventsOnNodeStart.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.continuous;
+
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.spi.discovery.DiscoverySpiCustomMessage;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+
+/**
+ * Sanity test to verify there are no unnecessary messages on node start.
+ */
+public class IgniteNoCustomEventsOnNodeStart extends GridCommonAbstractTest {
+ /** */
+ private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true);
+
+ /** */
+ private boolean client;
+
+ /** */
+ private static volatile boolean failed;
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+ TestTcpDiscoverySpi discoSpi = new TestTcpDiscoverySpi();
+ discoSpi.setIpFinder(IP_FINDER);
+
+ cfg.setDiscoverySpi(discoSpi);
+
+ cfg.setClientMode(client);
+
+ return cfg;
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testNoCustomEventsOnStart() throws Exception {
+ failed = false;
+
+ for (int i = 0; i < 5; i++) {
+ client = i % 2 == 1;
+
+ startGrid(i);
+ }
+
+ assertFalse(failed);
+ }
+
+ /**
+ *
+ */
+ static class TestTcpDiscoverySpi extends TcpDiscoverySpi {
+ /** {@inheritDoc} */
+ @Override public void sendCustomEvent(DiscoverySpiCustomMessage msg) {
+ failed = true;
+
+ fail("Should not be called.");
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/7f878c56/modules/core/src/test/java/org/apache/ignite/internal/processors/service/GridServiceClientNodeTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/service/GridServiceClientNodeTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/service/GridServiceClientNodeTest.java
index c3b9cf4..665294d 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/service/GridServiceClientNodeTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/service/GridServiceClientNodeTest.java
@@ -21,6 +21,7 @@ import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.apache.ignite.Ignite;
import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
@@ -34,47 +35,118 @@ public class GridServiceClientNodeTest extends GridCommonAbstractTest {
protected static TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true);
/** */
- private static final int NODE_CNT = 3;
+ private boolean client;
/** {@inheritDoc} */
@Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
IgniteConfiguration cfg = super.getConfiguration(gridName);
- ((TcpDiscoverySpi) cfg.getDiscoverySpi()).setIpFinder(ipFinder);
+ ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(ipFinder);
+ ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setMaxMissedClientHeartbeats(30);
+ ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setHeartbeatFrequency(1000);
- if (gridName.equals(getTestGridName(NODE_CNT - 1)))
- cfg.setClientMode(true);
+ cfg.setClientMode(client);
return cfg;
}
/** {@inheritDoc} */
- @Override protected void beforeTestsStarted() throws Exception {
- super.beforeTestsStarted();
+ @Override protected void afterTest() throws Exception {
+ stopAllGrids();
- startGrids(NODE_CNT);
+ super.afterTest();
}
- /** {@inheritDoc} */
- @Override protected void afterTestsStopped() throws Exception {
- stopAllGrids();
+ /**
+ * @throws Exception If failed.
+ */
+ public void testDeployFromClient() throws Exception {
+ startGrids(3);
+
+ client = true;
+
+ Ignite ignite = startGrid(3);
+
+ checkDeploy(ignite, "service1");
}
/**
* @throws Exception If failed.
*/
- public void testDeployFromClient() throws Exception {
- Ignite ignite = ignite(NODE_CNT - 1);
+ public void testDeployFromClientAfterRouterStop1() throws Exception {
+ startGrid(0);
- assertTrue(ignite.configuration().isClientMode());
+ client = true;
- String svcName = "testService";
+ Ignite ignite = startGrid(1);
+
+ client = false;
+
+ startGrid(2);
+
+ U.sleep(1000);
+
+ stopGrid(0);
+
+ awaitPartitionMapExchange();
+
+ checkDeploy(ignite, "service1");
+
+ startGrid(3);
+
+ for (int i = 0; i < 10; i++)
+ checkDeploy(ignite, "service2-" + i);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testDeployFromClientAfterRouterStop2() throws Exception {
+ startGrid(0);
+
+ client = true;
+
+ Ignite ignite = startGrid(1);
+
+ client = false;
+
+ startGrid(2);
+
+ client = true;
+
+ startGrid(3);
+
+ client = false;
+
+ startGrid(4);
+
+ U.sleep(1000);
+
+ stopGrid(0);
+
+ awaitPartitionMapExchange();
+
+ checkDeploy(ignite, "service1");
+
+ startGrid(5);
+
+ for (int i = 0; i < 10; i++)
+ checkDeploy(ignite, "service2-" + i);
+ }
+
+ /**
+ * @param client Client node.
+ * @param svcName Service name.
+ * @throws Exception If failed.
+ */
+ private void checkDeploy(Ignite client, String svcName) throws Exception {
+ assertTrue(client.configuration().isClientMode());
CountDownLatch latch = new CountDownLatch(1);
DummyService.exeLatch(svcName, latch);
- ignite.services().deployClusterSingleton(svcName, new DummyService());
+ client.services().deployClusterSingleton(svcName, new DummyService());
assertTrue(latch.await(5000, TimeUnit.MILLISECONDS));
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/7f878c56/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite2.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite2.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite2.java
index d0943b5..2632d4c 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite2.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite2.java
@@ -125,6 +125,7 @@ import org.apache.ignite.internal.processors.cache.local.GridCacheLocalMultithre
import org.apache.ignite.internal.processors.cache.local.GridCacheLocalTxMultiThreadedSelfTest;
import org.apache.ignite.internal.processors.cache.local.GridCacheLocalTxSingleThreadedSelfTest;
import org.apache.ignite.internal.processors.cache.local.GridCacheLocalTxTimeoutSelfTest;
+import org.apache.ignite.internal.processors.continuous.IgniteNoCustomEventsOnNodeStart;
/**
* Test suite.
@@ -253,6 +254,8 @@ public class IgniteCacheTestSuite2 extends TestSuite {
suite.addTest(new TestSuite(CacheEnumOperationsSingleNodeTest.class));
suite.addTest(new TestSuite(CacheEnumOperationsTest.class));
+ suite.addTest(new TestSuite(IgniteNoCustomEventsOnNodeStart.class));
+
return suite;
}
}