You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2015/08/11 17:03:40 UTC
incubator-ignite git commit: # ignite-426 backup queue flush, test
Repository: incubator-ignite
Updated Branches:
refs/heads/ignite-426 fa494ff1c -> 31179c17c
# ignite-426 backup queue flush, test
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/31179c17
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/31179c17
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/31179c17
Branch: refs/heads/ignite-426
Commit: 31179c17c01dc80e7f6d2584c84690587d86a61c
Parents: fa494ff
Author: sboikov <sb...@gridgain.com>
Authored: Tue Aug 11 18:03:28 2015 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Tue Aug 11 18:03:28 2015 +0300
----------------------------------------------------------------------
.../distributed/dht/GridDhtLocalPartition.java | 4 +-
.../dht/atomic/GridDhtAtomicUpdateFuture.java | 1 -
.../GridDhtPartitionsExchangeFuture.java | 5 +
.../continuous/CacheContinuousQueryEvent.java | 6 +-
.../continuous/CacheContinuousQueryHandler.java | 100 ++++++-
.../CacheContinuousQueryListener.java | 9 +
.../continuous/CacheContinuousQueryManager.java | 34 ++-
.../continuous/GridContinuousMessage.java | 1 +
.../continuous/GridContinuousProcessor.java | 65 ++++-
...acheContinuousQueryFailoverAbstractTest.java | 272 +++++++++++++++++++
.../CacheContinuousQueryFailoverAtomicTest.java | 38 +++
11 files changed, 506 insertions(+), 29 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/31179c17/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java
index 2864fa4..9e7d930 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java
@@ -96,7 +96,7 @@ public class GridDhtLocalPartition implements Comparable<GridDhtLocalPartition>,
private final CopyOnWriteArrayList<GridDhtPartitionsReservation> reservations = new CopyOnWriteArrayList<>();
/** Continuous query update index. */
- private final AtomicLong contQueryUpdIdx = new AtomicLong();
+ private final AtomicLong contQryUpdIdx = new AtomicLong();
/**
* @param cctx Context.
@@ -590,7 +590,7 @@ public class GridDhtLocalPartition implements Comparable<GridDhtLocalPartition>,
* @return Next update index.
*/
public long nextContinuousQueryUpdateIndex() {
- return contQueryUpdIdx.incrementAndGet();
+ return contQryUpdIdx.incrementAndGet();
}
/**
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/31179c17/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
index 63edcaa..601f1d8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
@@ -450,5 +450,4 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter<Void>
@Override public String toString() {
return S.toString(GridDhtAtomicUpdateFuture.class, this);
}
-
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/31179c17/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
index cbf6b40..99c7fc8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
@@ -757,6 +757,8 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
}
}
+ boolean topChanged = discoEvt.type() != EVT_DISCOVERY_CUSTOM_EVT;
+
for (GridCacheContext cacheCtx : cctx.cacheContexts()) {
if (cacheCtx.isLocal())
continue;
@@ -767,6 +769,9 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
if (drCacheCtx.isDrEnabled())
drCacheCtx.dr().beforeExchange(topVer, exchId.isLeft());
+ if (topChanged)
+ cacheCtx.continuousQueries().beforeExchange(exchId.topologyVersion());
+
// Partition release future is done so we can flush the write-behind store.
cacheCtx.store().forceFlush();
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/31179c17/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEvent.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEvent.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEvent.java
index 4a0d6f7..96fd4ae 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEvent.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEvent.java
@@ -58,8 +58,7 @@ class CacheContinuousQueryEvent<K, V> extends CacheEntryEvent<K, V> {
}
/** {@inheritDoc} */
- @Override
- public K getKey() {
+ @Override public K getKey() {
return e.key().value(cctx.cacheObjectContext(), false);
}
@@ -69,8 +68,7 @@ class CacheContinuousQueryEvent<K, V> extends CacheEntryEvent<K, V> {
}
/** {@inheritDoc} */
- @Override
- public V getOldValue() {
+ @Override public V getOldValue() {
return CU.value(e.oldValue(), cctx, false);
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/31179c17/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
index 88ae39b..8e308a6 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
@@ -31,6 +31,7 @@ import org.apache.ignite.internal.processors.cache.query.*;
import org.apache.ignite.internal.processors.continuous.*;
import org.apache.ignite.internal.util.typedef.*;
import org.apache.ignite.internal.util.typedef.internal.*;
+import org.apache.ignite.lang.*;
import org.jetbrains.annotations.*;
import org.jsr166.*;
@@ -85,7 +86,13 @@ class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler {
private transient boolean skipPrimaryCheck;
/** Backup queue. */
- private transient Queue<CacheContinuousQueryEntry> backupQueue;
+ private transient Collection<CacheContinuousQueryEntry> backupQueue;
+
+ /** */
+ private transient Map<Integer, Long> rcvCntrs;
+
+ /** */
+ private transient DuplicateEventFilter dupEvtFilter = new DuplicateEventFilter();
/**
* Required by {@link Externalizable}.
@@ -135,6 +142,8 @@ class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler {
this.ignoreExpired = ignoreExpired;
this.taskHash = taskHash;
this.skipPrimaryCheck = skipPrimaryCheck;
+
+ this.rcvCntrs = new HashMap<>();
}
/** {@inheritDoc} */
@@ -216,23 +225,14 @@ class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler {
}
if (notify) {
- if (loc)
+ if (loc && dupEvtFilter.apply(evt.entry()))
locLsnr.onUpdated(F.<CacheEntryEvent<? extends K, ? extends V>>asList(evt));
else {
try {
final CacheContinuousQueryEntry entry = evt.entry();
if (primary) {
- if (ctx.config().isPeerClassLoadingEnabled() && ctx.discovery().node(nodeId) != null) {
- entry.prepareMarshal(cctx);
-
- GridCacheDeploymentManager depMgr =
- ctx.cache().internalCache(cacheName).context().deploy();
-
- depMgr.prepare(entry);
- }
- else
- entry.prepareMarshal(cctx);
+ prepareEntry(cctx, nodeId, entry);
ctx.continuous().addNotification(nodeId, routineId, entry, topic, sync, true);
}
@@ -304,6 +304,25 @@ class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler {
}
}
+ @Override public void flushBackupQueue(GridKernalContext ctx) {
+ if (backupQueue.isEmpty())
+ return;
+
+ try {
+ GridCacheContext<K, V> cctx = cacheContext(ctx);
+
+ for (CacheContinuousQueryEntry e : backupQueue)
+ prepareEntry(cctx, nodeId, e);
+
+ ctx.continuous().addBackupNotification(nodeId, routineId, backupQueue, topic);
+
+ backupQueue.clear();
+ }
+ catch (IgniteCheckedException e) {
+ U.error(ctx.log(getClass()), "Failed to send backup event notification to node: " + nodeId, e);
+ }
+ }
+
@Override public boolean oldValueRequired() {
return oldValRequired;
}
@@ -325,6 +344,23 @@ class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler {
return mgr.registerListener(routineId, lsnr, internal);
}
+ /**
+ * @param cctx Context.
+ * @param nodeId ID of the node that started routine.
+ * @param entry Entry.
+ * @throws IgniteCheckedException In case of error.
+ */
+ private void prepareEntry(GridCacheContext cctx, UUID nodeId, CacheContinuousQueryEntry entry)
+ throws IgniteCheckedException {
+ if (cctx.kernalContext().config().isPeerClassLoadingEnabled() && cctx.discovery().node(nodeId) != null) {
+ entry.prepareMarshal(cctx);
+
+ cctx.deploy().prepare(entry);
+ }
+ else
+ entry.prepareMarshal(cctx);
+ }
+
/** {@inheritDoc} */
@Override public void onListenerRegistered(UUID routineId, GridKernalContext ctx) {
// No-op.
@@ -392,12 +428,40 @@ class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler {
@Override public CacheEntryEvent<? extends K, ? extends V> apply(CacheContinuousQueryEntry e) {
return new CacheContinuousQueryEvent<>(cache, cctx, e);
}
- }
+ },
+ dupEvtFilter
);
locLsnr.onUpdated(evts);
}
+ /**
+ * @param e Entry.
+ * @return {@code True} if listener should be notified.
+ */
+ private boolean notifyListener(CacheContinuousQueryEntry e) {
+ Integer part = e.partition();
+
+ Long cntr = rcvCntrs.get(part);
+
+ if (cntr != null) {
+ long cntr0 = cntr;
+
+ if (e.updateIndex() > cntr0) {
+ // TODO IGNITE-426: remove assert.
+ assert e.updateIndex() == cntr0 + 1 : "Invalid entry [e=" + e + ", cntr=" + cntr + ']';
+
+ rcvCntrs.put(part, cntr0);
+ }
+ else
+ return false;
+ }
+ else
+ rcvCntrs.put(part, e.updateIndex());
+
+ return true;
+ }
+
/** {@inheritDoc} */
@Override public void p2pMarshal(GridKernalContext ctx) throws IgniteCheckedException {
assert ctx != null;
@@ -530,6 +594,16 @@ class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler {
}
/**
+ *
+ */
+ private class DuplicateEventFilter implements IgnitePredicate<CacheContinuousQueryEntry> {
+ /** {@inheritDoc} */
+ @Override public boolean apply(CacheContinuousQueryEntry e) {
+ return notifyListener(e);
+ }
+ }
+
+ /**
* Deployable object.
*/
private static class DeployableObject implements Externalizable {
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/31179c17/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryListener.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryListener.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryListener.java
index d5d5ff8..d955aa1 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryListener.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryListener.java
@@ -17,6 +17,8 @@
package org.apache.ignite.internal.processors.cache.query.continuous;
+import org.apache.ignite.internal.*;
+
import java.util.*;
/**
@@ -50,6 +52,13 @@ interface CacheContinuousQueryListener<K, V> {
public void cleanupBackupQueue(Map<Integer, Long> updateIdxs);
/**
+ * Flushes backup queue.
+ *
+ * @param ctx Context.
+ */
+ public void flushBackupQueue(GridKernalContext ctx);
+
+ /**
* @return Whether old value is required.
*/
public boolean oldValueRequired();
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/31179c17/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
index c6a16c9..ce2b111 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
@@ -127,7 +127,9 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter {
* @param key Key.
* @param newVal New value.
* @param oldVal Old value.
+ * @param primary {@code True} if called on primary node.
* @param preload Whether update happened during preloading.
+ * @param topVer Topology version.
* @throws IgniteCheckedException In case of error.
*/
public void onEntryUpdated(GridCacheEntryEx e,
@@ -141,8 +143,7 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter {
{
assert e != null;
assert key != null;
-
- assert Thread.holdsLock(e);
+ assert Thread.holdsLock(e) : e;
boolean internal = e.isInternal() || !e.context().userCache();
@@ -175,7 +176,7 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter {
boolean initialized = false;
- boolean recordIgniteEvt = !internal && cctx.gridEvents().isRecordable(EVT_CACHE_QUERY_OBJECT_READ);
+ boolean recordIgniteEvt = primary && !internal && cctx.gridEvents().isRecordable(EVT_CACHE_QUERY_OBJECT_READ);
for (CacheContinuousQueryListener lsnr : lsnrCol.values()) {
if (preload && !lsnr.notifyExisting())
@@ -204,6 +205,12 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter {
e.partition(),
updateIdx);
+ log.info("Created entry [node=" + cctx.gridName() +
+ ", primary=" + primary +
+ ", preload=" + preload +
+ ", part=" + e.partition() +
+ ", idx=" + updateIdx + ']');
+
CacheContinuousQueryEvent evt = new CacheContinuousQueryEvent<>(
cctx.kernalContext().cache().jcache(cctx.name()), cctx, e0);
@@ -221,8 +228,7 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter {
throws IgniteCheckedException {
assert e != null;
assert key != null;
-
- assert Thread.holdsLock(e);
+ assert Thread.holdsLock(e) : e;
if (e.isInternal())
return;
@@ -374,6 +380,17 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter {
}
/**
+ * @param topVer Topology version.
+ */
+ public void beforeExchange(AffinityTopologyVersion topVer) {
+ for (CacheContinuousQueryListener lsnr : lsnrs.values())
+ lsnr.flushBackupQueue(cctx.kernalContext());
+
+ for (CacheContinuousQueryListener lsnr : intLsnrs.values())
+ lsnr.flushBackupQueue(cctx.kernalContext());
+ }
+
+ /**
* @param locLsnr Local listener.
* @param rmtFilter Remote filter.
* @param bufSize Buffer size.
@@ -493,7 +510,12 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter {
GridCacheEntryEx e = it.next();
CacheContinuousQueryEntry entry = new CacheContinuousQueryEntry(
- cctx.cacheId(), CREATED, e.key(), e.rawGet(), null, 0, 0);
+ cctx.cacheId(),
+ CREATED,
+ e.key(),
+ e.rawGet(),
+ null,
+ 0, 0);
next = new CacheContinuousQueryEvent<>(
cctx.kernalContext().cache().jcache(cctx.name()),
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/31179c17/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousMessage.java
index fe50fd8..4c7f8e6 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousMessage.java
@@ -49,6 +49,7 @@ public class GridContinuousMessage implements Message {
private Object data;
/** */
+ @GridToStringInclude
@GridDirectCollection(Message.class)
private Collection<Message> msgs;
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/31179c17/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
index 457f150..7e71b3d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
@@ -593,6 +593,30 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
/**
* @param nodeId ID of the node that started routine.
* @param routineId Routine ID.
+ * @param objs Notification objects.
+ * @param orderedTopic Topic for ordered notifications. If {@code null}, non-ordered message will be sent.
+ * @throws IgniteCheckedException In case of error.
+ */
+ public void addBackupNotification(UUID nodeId,
+ final UUID routineId,
+ Collection<?> objs,
+ @Nullable Object orderedTopic)
+ throws IgniteCheckedException {
+ if (processorStopped)
+ return;
+
+ final RemoteRoutineInfo info = rmtInfos.get(routineId);
+
+ if (info != null) {
+ final GridContinuousBatch batch = info.addAll(objs);
+
+ sendNotification(nodeId, routineId, null, batch.collect(), orderedTopic, true, null);
+ }
+ }
+
+ /**
+ * @param nodeId ID of the node that started routine.
+ * @param routineId Routine ID.
* @param obj Notification object.
* @param orderedTopic Topic for ordered notifications. If {@code null}, non-ordered message will be sent.
* @param sync If {@code true} then waits for event acknowledgment.
@@ -642,7 +666,7 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
final GridContinuousBatch batch = info.add(obj);
if (batch != null) {
- CI1<IgniteException> ackClosure = new CI1<IgniteException>() {
+ CI1<IgniteException> ackC = new CI1<IgniteException>() {
@Override public void apply(IgniteException e) {
if (e == null) {
try {
@@ -655,7 +679,7 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
}
};
- sendNotification(nodeId, routineId, null, batch.collect(), orderedTopic, msg, ackClosure);
+ sendNotification(nodeId, routineId, null, batch.collect(), orderedTopic, msg, ackC);
}
}
}
@@ -904,7 +928,12 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
}
};
- sendNotification(nodeId, routineId, null, toSnd, hnd.orderedTopic(), msg,
+ sendNotification(nodeId,
+ routineId,
+ null,
+ toSnd,
+ hnd.orderedTopic(),
+ msg,
ackClosure);
}
catch (ClusterTopologyCheckedException ignored) {
@@ -1212,6 +1241,36 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
}
/**
+ * @param objs Objects to add.
+ * @return Batch to send.
+ */
+ GridContinuousBatch addAll(Collection<?> objs) {
+ assert objs != null;
+ assert objs.size() > 0;
+
+ GridContinuousBatch toSnd = null;
+
+ lock.writeLock().lock();
+
+ try {
+ for (Object obj : objs)
+ batch.add(obj);
+
+ toSnd = batch;
+
+ batch = hnd.createBatch();
+
+ if (interval > 0)
+ lastSndTime = U.currentTimeMillis();
+ }
+ finally {
+ lock.writeLock().unlock();
+ }
+
+ return toSnd;
+ }
+
+ /**
* @param obj Object to add.
* @return Batch to send or {@code null} if there is nothing to send for now.
*/
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/31179c17/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractTest.java
new file mode 100644
index 0000000..fe3c817
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractTest.java
@@ -0,0 +1,272 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.query.continuous;
+
+import org.apache.ignite.*;
+import org.apache.ignite.cache.*;
+import org.apache.ignite.cache.affinity.*;
+import org.apache.ignite.cache.query.*;
+import org.apache.ignite.cluster.*;
+import org.apache.ignite.configuration.*;
+import org.apache.ignite.internal.managers.communication.*;
+import org.apache.ignite.internal.processors.continuous.*;
+import org.apache.ignite.lang.*;
+import org.apache.ignite.plugin.extensions.communication.*;
+import org.apache.ignite.resources.*;
+import org.apache.ignite.spi.*;
+import org.apache.ignite.spi.communication.tcp.*;
+import org.apache.ignite.spi.discovery.tcp.*;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.*;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*;
+import org.apache.ignite.testframework.junits.common.*;
+
+import javax.cache.event.*;
+import java.util.*;
+import java.util.concurrent.*;
+
+import static java.util.concurrent.TimeUnit.*;
+import static org.apache.ignite.cache.CacheWriteSynchronizationMode.*;
+
+/**
+ *
+ */
+public abstract class CacheContinuousQueryFailoverAbstractTest extends GridCommonAbstractTest {
+ /** */
+ protected static TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true);
+
+ /** */
+ private boolean client;
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+ ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(ipFinder).setForceServerMode(true);
+
+ cfg.setCommunicationSpi(new TestCommunicationSpi());
+
+ CacheConfiguration ccfg = new CacheConfiguration();
+
+ ccfg.setCacheMode(cacheMode());
+ ccfg.setAtomicityMode(atomicityMode());
+ ccfg.setBackups(1);
+ ccfg.setWriteSynchronizationMode(FULL_SYNC);
+
+ cfg.setCacheConfiguration(ccfg);
+
+ cfg.setClientMode(client);
+
+ return cfg;
+ }
+
+ /**
+ * @return Cache mode.
+ */
+ protected abstract CacheMode cacheMode();
+
+ /**
+ * @return Atomicity mode.
+ */
+ protected abstract CacheAtomicityMode atomicityMode();
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testBackupQueue() throws Exception {
+ final int SRV_NODES = 4;
+
+ startGridsMultiThreaded(SRV_NODES);
+
+ client = true;
+
+ Ignite qryClient = startGrid(SRV_NODES);
+
+ client = false;
+
+ IgniteCache<Object, Object> qryClientCache = qryClient.cache(null);
+
+ CacheEventListener lsnr = new CacheEventListener();
+
+ ContinuousQuery<Object, Object> qry = new ContinuousQuery<>();
+
+ qry.setAutoUnsubscribe(true);
+
+ qry.setLocalListener(lsnr);
+
+ QueryCursor<?> cur = qryClientCache.query(qry);
+
+ int PARTS = 1;
+
+ for (int i = 0; i < SRV_NODES - 1; i++) {
+ log.info("Stop iteration: " + i);
+
+ TestCommunicationSpi spi = (TestCommunicationSpi)ignite(i).configuration().getCommunicationSpi();
+
+ Ignite ignite = ignite(i);
+
+ IgniteCache<Object, Object> cache = ignite.cache(null);
+
+ List<Integer> keys = testKeys(cache, PARTS);
+
+ lsnr.latch = new CountDownLatch(keys.size());
+
+ boolean first = true;
+
+ for (Integer key : keys) {
+ log.info("Put [node=" + ignite.name() + ", key=" + key + ']');
+
+ cache.put(key, key);
+
+ if (first) {
+ spi.skipMsg = true;
+
+ first = false;
+ }
+ }
+
+ stopGrid(i);
+
+ assertTrue("Failed to wait for notifications", lsnr.latch.await(5, SECONDS));
+
+ lsnr.latch = null;
+
+ awaitPartitionMapExchange();
+ }
+
+ for (int i = 0; i < SRV_NODES - 1; i++) {
+ log.info("Start iteration: " + i);
+
+ Ignite ignite = startGrid(i);
+
+ awaitPartitionMapExchange();
+
+ IgniteCache<Object, Object> cache = ignite.cache(null);
+
+ List<Integer> keys = testKeys(cache, PARTS);
+
+ lsnr.latch = new CountDownLatch(keys.size());
+
+ for (Integer key : keys) {
+ log.info("Put [node=" + ignite.name() + ", key=" + key + ']');
+
+ cache.put(key, key);
+ }
+
+ if (!lsnr.latch.await(5, SECONDS))
+ fail("Failed to wait for notifications [exp=" + keys.size() + ", left=" + lsnr.latch.getCount() + ']');
+
+ lsnr.latch = null;
+ }
+
+ cur.close();
+ }
+
+ /**
+ * @param cache Cache.
+ * @param parts Number of partitions.
+ * @return Keys.
+ */
+ private List<Integer> testKeys(IgniteCache<Object, Object> cache, int parts) {
+ Ignite ignite = cache.unwrap(Ignite.class);
+
+ List<Integer> res = new ArrayList<>();
+
+ Affinity<Object> aff = ignite.affinity(cache.getName());
+
+ ClusterNode node = ignite.cluster().localNode();
+
+ int[] nodeParts = aff.primaryPartitions(node);
+
+ final int KEYS_PER_PART = 1;
+
+ for (int i = 0; i < parts; i++) {
+ int part = nodeParts[i];
+
+ int cnt = 0;
+
+ for (int key = 0; key < 100_000; key++) {
+ if (aff.partition(key) == part && aff.isPrimary(node, key)) {
+ res.add(key);
+
+ if (++cnt == KEYS_PER_PART)
+ break;
+ }
+ }
+
+ assertEquals(KEYS_PER_PART, cnt);
+ }
+
+ assertEquals(parts * KEYS_PER_PART, res.size());
+
+ return res;
+ }
+
+ /**
+ *
+ */
+ private static class CacheEventListener implements CacheEntryUpdatedListener<Object, Object> {
+ /** */
+ private volatile CountDownLatch latch;
+
+ /** */
+ @IgniteInstanceResource
+ private Ignite ignite;
+
+ /** {@inheritDoc} */
+ @Override public void onUpdated(Iterable<CacheEntryEvent<?, ?>> evts) {
+ for (CacheEntryEvent<?, ?> evt : evts) {
+ ignite.log().info("Received cache event: " + evt);
+
+ CountDownLatch latch = this.latch;
+
+ assertTrue(latch != null);
+ assertTrue(latch.getCount() > 0);
+
+ latch.countDown();
+ }
+ }
+ }
+
+ /**
+ *
+ */
+ private static class TestCommunicationSpi extends TcpCommunicationSpi {
+ /** */
+ @LoggerResource
+ private IgniteLogger log;
+
+ /** */
+ private volatile boolean skipMsg;
+
+ /** {@inheritDoc} */
+ @Override public void sendMessage(ClusterNode node, Message msg, IgniteInClosure<IgniteException> ackClosure)
+ throws IgniteSpiException {
+ if (skipMsg && msg instanceof GridIoMessage) {
+ Object msg0 = ((GridIoMessage)msg).message();
+
+ if (msg0 instanceof GridContinuousMessage) {
+ log.info("Skip continuous message: " + msg0);
+
+ return;
+ }
+ }
+
+ super.sendMessage(node, msg, ackClosure);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/31179c17/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAtomicTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAtomicTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAtomicTest.java
new file mode 100644
index 0000000..8b38b7a
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAtomicTest.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.query.continuous;
+
+import org.apache.ignite.cache.*;
+
+import static org.apache.ignite.cache.CacheAtomicityMode.*;
+import static org.apache.ignite.cache.CacheMode.*;
+
+/**
+ *
+ */
+public class CacheContinuousQueryFailoverAtomicTest extends CacheContinuousQueryFailoverAbstractTest {
+ /** {@inheritDoc} */
+ @Override protected CacheMode cacheMode() {
+ return PARTITIONED;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected CacheAtomicityMode atomicityMode() {
+ return ATOMIC;
+ }
+}