You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2015/08/13 14:48:49 UTC
incubator-ignite git commit: # ignite-426 add tests
Repository: incubator-ignite
Updated Branches:
refs/heads/ignite-426 1e8d8aed4 -> eee8ea416
# ignite-426 add tests
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/eee8ea41
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/eee8ea41
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/eee8ea41
Branch: refs/heads/ignite-426
Commit: eee8ea4169047a2788aa7674aa9ff65a063b3897
Parents: 1e8d8ae
Author: sboikov <sb...@gridgain.com>
Authored: Thu Aug 13 11:57:36 2015 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Thu Aug 13 15:46:08 2015 +0300
----------------------------------------------------------------------
.../distributed/dht/GridDhtLocalPartition.java | 6 +-
.../continuous/CacheContinuousQueryHandler.java | 147 +++---
.../continuous/CacheContinuousQueryManager.java | 36 +-
...acheContinuousQueryFailoverAbstractTest.java | 457 +++++++++++++++++--
...ueryFailoverAtomicPrimaryWriteOrderTest.java | 32 ++
...inuousQueryFailoverAtomicReplicatedTest.java | 32 ++
...ContinuousQueryFailoverTxReplicatedTest.java | 32 ++
.../IgniteCacheQuerySelfTestSuite.java | 3 +
8 files changed, 617 insertions(+), 128 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/eee8ea41/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java
index b7d4375..a0a75c2 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java
@@ -592,11 +592,7 @@ public class GridDhtLocalPartition implements Comparable<GridDhtLocalPartition>,
* @return Next update index.
*/
public long nextContinuousQueryUpdateIndex() {
- long res = contQryUpdIdx.incrementAndGet();
-
- log.info("Next update index [node=" + cctx.gridName() + ", part=" + id + ", idx=" + res + ']');
-
- return res;
+ return contQryUpdIdx.incrementAndGet();
}
/**
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/eee8ea41/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
index 4bd2e1c..98e857b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
@@ -29,6 +29,7 @@ import org.apache.ignite.internal.processors.affinity.*;
import org.apache.ignite.internal.processors.cache.*;
import org.apache.ignite.internal.processors.cache.query.*;
import org.apache.ignite.internal.processors.continuous.*;
+import org.apache.ignite.internal.util.tostring.*;
import org.apache.ignite.internal.util.typedef.*;
import org.apache.ignite.internal.util.typedef.internal.*;
import org.apache.ignite.lang.*;
@@ -39,6 +40,7 @@ import javax.cache.event.*;
import javax.cache.event.EventType;
import java.io.*;
import java.util.*;
+import java.util.concurrent.*;
import static org.apache.ignite.events.EventType.*;
@@ -50,7 +52,7 @@ class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler {
private static final long serialVersionUID = 0L;
/** */
- private static final int BACKUP_ACK_THRESHOLD = 1;
+ private static final int BACKUP_ACK_THRESHOLD = 100;
/** Cache name. */
private String cacheName;
@@ -95,10 +97,10 @@ class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler {
private transient Map<Integer, Long> rcvCntrs;
/** */
- private transient DuplicateEventFilter dupEvtFilter = new DuplicateEventFilter();
+ private transient IgnitePredicate<CacheContinuousQueryEntry> dupEvtFilter;
/** */
- private transient AcknowledgeData ackData = new AcknowledgeData();
+ private transient AcknowledgeBuffer ackBuf;
/**
* Required by {@link Externalizable}.
@@ -121,6 +123,7 @@ class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler {
* @param ignoreExpired Ignore expired events flag.
* @param skipPrimaryCheck Whether to skip primary check for REPLICATED cache.
* @param taskHash Task name hash code.
+ * @param locCache {@code True} if local cache.
*/
public CacheContinuousQueryHandler(
String cacheName,
@@ -133,7 +136,8 @@ class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler {
boolean sync,
boolean ignoreExpired,
int taskHash,
- boolean skipPrimaryCheck) {
+ boolean skipPrimaryCheck,
+ boolean locCache) {
assert topic != null;
assert locLsnr != null;
@@ -149,7 +153,13 @@ class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler {
this.taskHash = taskHash;
this.skipPrimaryCheck = skipPrimaryCheck;
- this.rcvCntrs = new HashMap<>();
+ if (locCache)
+ dupEvtFilter = F.alwaysTrue();
+ else {
+ rcvCntrs = new ConcurrentHashMap<>();
+
+ dupEvtFilter = new DuplicateEventFilter();
+ }
}
/** {@inheritDoc} */
@@ -187,8 +197,12 @@ class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler {
backupQueue = new ConcurrentLinkedDeque8<>();
+ ackBuf = new AcknowledgeBuffer();
+
final boolean loc = nodeId.equals(ctx.localNodeId());
+ assert !skipPrimaryCheck || loc;
+
CacheContinuousQueryListener<K, V> lsnr = new CacheContinuousQueryListener<K, V>() {
@Override public void onExecution() {
if (ctx.event().isRecordable(EVT_CACHE_QUERY_EXECUTED)) {
@@ -217,8 +231,8 @@ class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler {
GridCacheContext<K, V> cctx = cacheContext(ctx);
- if (cctx.isReplicated() && !skipPrimaryCheck && !primary)
- return;
+ // skipPrimaryCheck is set only when listen locally for replicated cache events.
+ assert !skipPrimaryCheck || (cctx.isReplicated() && ctx.localNodeId().equals(nodeId));
boolean notify = true;
@@ -232,30 +246,36 @@ class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler {
}
if (notify) {
- if (loc && dupEvtFilter.apply(evt.entry()))
- locLsnr.onUpdated(F.<CacheEntryEvent<? extends K, ? extends V>>asList(evt));
- else {
- try {
- final CacheContinuousQueryEntry entry = evt.entry();
+ try {
+ final CacheContinuousQueryEntry entry = evt.entry();
+
+ if (primary || skipPrimaryCheck) {
+ if (loc) {
+ if (dupEvtFilter.apply(entry)) {
+ locLsnr.onUpdated(F.<CacheEntryEvent<? extends K, ? extends V>>asList(evt));
- if (primary) {
+ if (!skipPrimaryCheck)
+ sendBackupAcknowledge(ackBuf.onAcknowledged(entry), routineId, ctx);
+ }
+ }
+ else {
prepareEntry(cctx, nodeId, entry);
ctx.continuous().addNotification(nodeId, routineId, entry, topic, sync, true);
}
- else
- backupQueue.add(entry);
}
- catch (ClusterTopologyCheckedException ex) {
- IgniteLogger log = ctx.log(getClass());
+ else
+ backupQueue.add(entry);
+ }
+ catch (ClusterTopologyCheckedException ex) {
+ IgniteLogger log = ctx.log(getClass());
- if (log.isDebugEnabled())
- log.debug("Failed to send event notification to node, node left cluster " +
- "[node=" + nodeId + ", err=" + ex + ']');
- }
- catch (IgniteCheckedException ex) {
- U.error(ctx.log(getClass()), "Failed to send event notification to node: " + nodeId, ex);
- }
+ if (log.isDebugEnabled())
+ log.debug("Failed to send event notification to node, node left cluster " +
+ "[node=" + nodeId + ", err=" + ex + ']');
+ }
+ catch (IgniteCheckedException ex) {
+ U.error(ctx.log(getClass()), "Failed to send event notification to node: " + nodeId, ex);
}
if (recordIgniteEvt) {
@@ -292,28 +312,14 @@ class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler {
while (it.hasNext()) {
CacheContinuousQueryEntry backupEntry = it.next();
- assert backupEntry != null;
-
Long updateIdx = updateIdxs.get(backupEntry.partition());
- if (updateIdx != null) {
- assert backupEntry.updateIndex() <= updateIdx;
-
+ if (updateIdx != null && backupEntry.updateIndex() <= updateIdx)
it.remove();
-
- if (backupEntry.updateIndex() == updateIdx) {
- updateIdxs.remove(backupEntry.partition());
-
- if (updateIdxs.isEmpty())
- break;
- }
- }
}
}
@Override public void flushBackupQueue(GridKernalContext ctx, AffinityTopologyVersion topVer) {
- ctx.log(getClass()).info("Flush backup queue [topVer=" + topVer + ", queue=" + backupQueue + ']');
-
if (backupQueue.isEmpty())
return;
@@ -333,7 +339,7 @@ class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler {
}
@Override public void acknowledgeBackupOnTimeout(GridKernalContext ctx) {
- sendBackupAcknowledge(ackData.acknowledgeOnTimeout(), routineId, ctx);
+ sendBackupAcknowledge(ackBuf.acknowledgeOnTimeout(), routineId, ctx);
}
@Override public void onPartitionEvicted(int part) {
@@ -469,30 +475,15 @@ class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler {
if (e.updateIndex() > cntr0) {
// TODO IGNITE-426: remove assert.
- if (e.updateIndex() != cntr0 + 1)
- System.out.println(Thread.currentThread().getName() +
- "Invalid entry [part=" + e.partition() + ", cntr=" + cntr + ", e=" + e + ']');
-
assert e.updateIndex() == cntr0 + 1 : "Invalid entry [cntr=" + cntr + ", e=" + e + ']';
- System.out.println(Thread.currentThread().getName() +
- " update cntr [part=" + part + ", idx=" + e.updateIndex() + ", e=" + e + ']');
-
rcvCntrs.put(part, e.updateIndex());
}
- else {
- System.out.println(Thread.currentThread().getName() +
- " ignore entry [cntr=" + cntr0 + ", idx=" + e.updateIndex() + ", e=" + e + ']');
-
+ else
return false;
- }
}
- else {
- System.out.println(Thread.currentThread().getName() +
- " update cntr [part=" + part + ", idx=" + e.updateIndex() + ", e=" + e + ']');
-
+ else
rcvCntrs.put(part, e.updateIndex());
- }
return true;
}
@@ -525,7 +516,7 @@ class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler {
@Override public void onBatchAcknowledged(final UUID routineId,
GridContinuousBatch batch,
final GridKernalContext ctx) {
- sendBackupAcknowledge(ackData.onAcknowledged(batch), routineId, ctx);
+ sendBackupAcknowledge(ackBuf.onAcknowledged(batch), routineId, ctx);
}
/**
@@ -651,14 +642,16 @@ class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler {
}
/** */
- private static class AcknowledgeData {
+ private static class AcknowledgeBuffer {
/** */
private int size;
/** */
+ @GridToStringInclude
private Map<Integer, Long> updateIdxs = new HashMap<>();
/** */
+ @GridToStringInclude
private Set<AffinityTopologyVersion> topVers = U.newHashSet(1);
/**
@@ -668,24 +661,42 @@ class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler {
@SuppressWarnings("unchecked")
@Nullable synchronized IgniteBiTuple<Map<Integer, Long>, Set<AffinityTopologyVersion>>
onAcknowledged(GridContinuousBatch batch) {
- // TODO: IGNITE-426: check if it is called from nio thread in correct order.
size += batch.size();
Collection<CacheContinuousQueryEntry> entries = (Collection)batch.collect();
- for (CacheContinuousQueryEntry e : entries) {
- topVers.add(e.topologyVersion());
+ for (CacheContinuousQueryEntry e : entries)
+ addEntry(e);
- Long cntr0 = updateIdxs.get(e.partition());
+ return size >= BACKUP_ACK_THRESHOLD ? acknowledgeData() : null;
+ }
- if (cntr0 == null || e.updateIndex() > cntr0)
- updateIdxs.put(e.partition(), e.updateIndex());
- }
+ /**
+ * @param e Entry.
+ * @return Non-null tuple if acknowledge should be sent to backups.
+ */
+ @Nullable synchronized IgniteBiTuple<Map<Integer, Long>, Set<AffinityTopologyVersion>>
+ onAcknowledged(CacheContinuousQueryEntry e) {
+ size++;
+
+ addEntry(e);
return size >= BACKUP_ACK_THRESHOLD ? acknowledgeData() : null;
}
/**
+ * @param e Entry.
+ */
+ private void addEntry(CacheContinuousQueryEntry e) {
+ topVers.add(e.topologyVersion());
+
+ Long cntr0 = updateIdxs.get(e.partition());
+
+ if (cntr0 == null || e.updateIndex() > cntr0)
+ updateIdxs.put(e.partition(), e.updateIndex());
+ }
+
+ /**
* @return Non-null tuple if acknowledge should be sent to backups.
*/
@Nullable synchronized IgniteBiTuple<Map<Integer, Long>, Set<AffinityTopologyVersion>>
@@ -697,6 +708,8 @@ class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler {
* @return Tuple with acknowledge information.
*/
private IgniteBiTuple<Map<Integer, Long>, Set<AffinityTopologyVersion>> acknowledgeData() {
+ assert size > 0;
+
Map<Integer, Long> idxs = new HashMap<>(updateIdxs);
IgniteBiTuple<Map<Integer, Long>, Set<AffinityTopologyVersion>> res =
@@ -711,7 +724,7 @@ class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler {
/** {@inheritDoc} */
@Override public String toString() {
- return S.toString(AcknowledgeData.class, this);
+ return S.toString(AcknowledgeBuffer.class, this);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/eee8ea41/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
index 9eae419..cc8f77b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
@@ -58,6 +58,9 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter {
/** */
private static final byte EXPIRED_FLAG = 0b1000;
+ /** */
+ private static final long BACKUP_ACK_FREQ = 5000;
+
/** Listeners. */
private final ConcurrentMap<UUID, CacheContinuousQueryListener> lsnrs = new ConcurrentHashMap8<>();
@@ -94,6 +97,16 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter {
lsnr.cleanupBackupQueue(msg.updateIndexes());
}
});
+
+ cctx.time().schedule(new Runnable() {
+ @Override public void run() {
+ for (CacheContinuousQueryListener lsnr : lsnrs.values())
+ lsnr.acknowledgeBackupOnTimeout(cctx.kernalContext());
+
+ for (CacheContinuousQueryListener lsnr : intLsnrs.values())
+ lsnr.acknowledgeBackupOnTimeout(cctx.kernalContext());
+ }
+ }, BACKUP_ACK_FREQ, BACKUP_ACK_FREQ);
}
/** {@inheritDoc} */
@@ -166,11 +179,17 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter {
if (!hasNewVal && !hasOldVal)
return;
- GridDhtLocalPartition locPart = cctx.topology().localPartition(e.partition(), topVer, false);
+ long updateIdx;
- assert locPart != null;
+ if (!cctx.isLocal()) {
+ GridDhtLocalPartition locPart = cctx.topology().localPartition(e.partition(), topVer, false);
- long updateIdx = locPart.nextContinuousQueryUpdateIndex();
+ assert locPart != null;
+
+ updateIdx = locPart.nextContinuousQueryUpdateIndex();
+ }
+ else
+ updateIdx = 0;
EventType evtType = !hasNewVal ? REMOVED : !hasOldVal ? CREATED : UPDATED;
@@ -206,12 +225,6 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter {
updateIdx,
topVer);
- log.info("Created entry [node=" + cctx.gridName() +
- ", primary=" + primary +
- ", preload=" + preload +
- ", part=" + e.partition() +
- ", idx=" + updateIdx + ']');
-
CacheContinuousQueryEvent evt = new CacheContinuousQueryEvent<>(
cctx.kernalContext().cache().jcache(cctx.name()), cctx, e0);
@@ -405,7 +418,6 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter {
lsnr.onPartitionEvicted(part);
}
-
/**
* @param locLsnr Local listener.
* @param rmtFilter Remote filter.
@@ -480,7 +492,8 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter {
sync,
ignoreExpired,
taskNameHash,
- skipPrimaryCheck);
+ skipPrimaryCheck,
+ cctx.isLocal());
UUID id = cctx.kernalContext().continuous().startRoutine(hnd, bufSize, timeInterval,
autoUnsubscribe, grp.predicate()).get();
@@ -702,6 +715,7 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter {
/**
* @param impl Listener.
+ * @param log Logger.
*/
JCacheQueryLocalListener(CacheEntryListener<K, V> impl, IgniteLogger log) {
assert impl != null;
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/eee8ea41/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractTest.java
index 1eb3ae7..3afb01c 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractTest.java
@@ -23,8 +23,10 @@ import org.apache.ignite.cache.affinity.*;
import org.apache.ignite.cache.query.*;
import org.apache.ignite.cluster.*;
import org.apache.ignite.configuration.*;
+import org.apache.ignite.internal.*;
import org.apache.ignite.internal.managers.communication.*;
import org.apache.ignite.internal.processors.continuous.*;
+import org.apache.ignite.internal.util.lang.*;
import org.apache.ignite.lang.*;
import org.apache.ignite.plugin.extensions.communication.*;
import org.apache.ignite.resources.*;
@@ -33,13 +35,17 @@ import org.apache.ignite.spi.communication.tcp.*;
import org.apache.ignite.spi.discovery.tcp.*;
import org.apache.ignite.spi.discovery.tcp.ipfinder.*;
import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*;
+import org.apache.ignite.testframework.*;
import org.apache.ignite.testframework.junits.common.*;
import javax.cache.event.*;
import java.util.*;
import java.util.concurrent.*;
+import java.util.concurrent.atomic.*;
import static java.util.concurrent.TimeUnit.*;
+import static org.apache.ignite.cache.CacheAtomicWriteOrderMode.*;
+import static org.apache.ignite.cache.CacheMode.*;
import static org.apache.ignite.cache.CacheWriteSynchronizationMode.*;
/**
@@ -47,7 +53,10 @@ import static org.apache.ignite.cache.CacheWriteSynchronizationMode.*;
*/
public abstract class CacheContinuousQueryFailoverAbstractTest extends GridCommonAbstractTest {
/** */
- protected static TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true);
+ private static TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true);
+
+ /** */
+ private static final int BACKUP_ACK_THRESHOLD = 100;
/** */
private static volatile boolean err;
@@ -62,6 +71,9 @@ public abstract class CacheContinuousQueryFailoverAbstractTest extends GridCommo
@Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
IgniteConfiguration cfg = super.getConfiguration(gridName);
+ ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setForceServerMode(true);
+ ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(ipFinder);
+
TestCommunicationSpi commSpi = new TestCommunicationSpi();
commSpi.setIdleConnectionTimeout(100);
@@ -72,6 +84,7 @@ public abstract class CacheContinuousQueryFailoverAbstractTest extends GridCommo
ccfg.setCacheMode(cacheMode());
ccfg.setAtomicityMode(atomicityMode());
+ ccfg.setAtomicWriteOrderMode(writeOrderMode());
ccfg.setBackups(backups);
ccfg.setWriteSynchronizationMode(FULL_SYNC);
@@ -83,6 +96,11 @@ public abstract class CacheContinuousQueryFailoverAbstractTest extends GridCommo
}
/** {@inheritDoc} */
+ @Override protected long getTestTimeout() {
+ return 5 * 60_000;
+ }
+
+ /** {@inheritDoc} */
@Override protected void beforeTest() throws Exception {
super.beforeTest();
@@ -107,6 +125,13 @@ public abstract class CacheContinuousQueryFailoverAbstractTest extends GridCommo
protected abstract CacheAtomicityMode atomicityMode();
/**
+ * @return Write order mode for atomic cache.
+ */
+ protected CacheAtomicWriteOrderMode writeOrderMode() {
+ return CLOCK;
+ }
+
+ /**
* @throws Exception If failed.
*/
public void testOneBackup() throws Exception {
@@ -117,6 +142,9 @@ public abstract class CacheContinuousQueryFailoverAbstractTest extends GridCommo
* @throws Exception If failed.
*/
public void testThreeBackups() throws Exception {
+ if (cacheMode() == REPLICATED)
+ return;
+
checkBackupQueue(3);
}
@@ -129,7 +157,7 @@ public abstract class CacheContinuousQueryFailoverAbstractTest extends GridCommo
final int SRV_NODES = 4;
- startGrids(SRV_NODES);
+ startGridsMultiThreaded(SRV_NODES);
client = true;
@@ -139,16 +167,15 @@ public abstract class CacheContinuousQueryFailoverAbstractTest extends GridCommo
IgniteCache<Object, Object> qryClientCache = qryClient.cache(null);
- assertEquals(backups, qryClientCache.getConfiguration(CacheConfiguration.class).getBackups());
+ if (cacheMode() != REPLICATED)
+ assertEquals(backups, qryClientCache.getConfiguration(CacheConfiguration.class).getBackups());
Affinity<Object> aff = qryClient.affinity(null);
- CacheEventListener lsnr = new CacheEventListener();
+ CacheEventListener1 lsnr = new CacheEventListener1();
ContinuousQuery<Object, Object> qry = new ContinuousQuery<>();
- qry.setAutoUnsubscribe(true);
-
qry.setLocalListener(lsnr);
QueryCursor<?> cur = qryClientCache.query(qry);
@@ -188,8 +215,6 @@ public abstract class CacheContinuousQueryFailoverAbstractTest extends GridCommo
if (!latch.await(5, SECONDS))
fail("Failed to wait for notifications [exp=" + keys.size() + ", left=" + lsnr.latch.getCount() + ']');
-
- awaitPartitionMapExchange();
}
for (int i = 0; i < SRV_NODES - 1; i++) {
@@ -197,8 +222,6 @@ public abstract class CacheContinuousQueryFailoverAbstractTest extends GridCommo
Ignite ignite = startGrid(i);
- awaitPartitionMapExchange();
-
IgniteCache<Object, Object> cache = ignite.cache(null);
List<Integer> keys = testKeys(cache, PARTS);
@@ -219,7 +242,7 @@ public abstract class CacheContinuousQueryFailoverAbstractTest extends GridCommo
cur.close();
- assertFalse(err);
+ assertFalse("Unexpected error during test, see log for details.", err);
}
/**
@@ -265,83 +288,427 @@ public abstract class CacheContinuousQueryFailoverAbstractTest extends GridCommo
/**
* @throws Exception If failed.
*/
- public void testBackupQueueCleanup() throws Exception {
- startGrids(2);
+ public void testBackupQueueCleanupClientQuery() throws Exception {
+ startGridsMultiThreaded(2);
- Ignite ignite0 = ignite(0);
- Ignite ignite1 = ignite(1);
+ client = true;
- CacheEventListener lsnr = new CacheEventListener();
+ Ignite qryClient = startGrid(2);
- ContinuousQuery<Object, Object> qry = new ContinuousQuery<>();
+ CacheEventListener1 lsnr = new CacheEventListener1();
- qry.setAutoUnsubscribe(true);
+ ContinuousQuery<Object, Object> qry = new ContinuousQuery<>();
qry.setLocalListener(lsnr);
- QueryCursor<?> cur = ignite1.cache(null).query(qry);
+ QueryCursor<?> cur = qryClient.cache(null).query(qry);
+
+ final Collection<Object> backupQueue = backupQueue(ignite(1));
- IgniteCache<Object, Object> cache0 = ignite0.cache(null);
+ assertEquals(0, backupQueue.size());
- final int KEYS = 1;
+ IgniteCache<Object, Object> cache0 = ignite(0).cache(null);
- List<Integer> keys = primaryKeys(cache0, KEYS);
+ List<Integer> keys = primaryKeys(cache0, BACKUP_ACK_THRESHOLD);
CountDownLatch latch = new CountDownLatch(keys.size());
lsnr.latch = latch;
+ for (Integer key : keys) {
+ log.info("Put: " + key);
+
+ cache0.put(key, key);
+ }
+
+ GridTestUtils.waitForCondition(new GridAbsPredicate() {
+ @Override public boolean apply() {
+ return backupQueue.isEmpty();
+ }
+ }, 2000);
+
+ assertTrue("Backup queue is not cleared: " + backupQueue, backupQueue.isEmpty());
+
+ if (!latch.await(5, SECONDS))
+ fail("Failed to wait for notifications [exp=" + keys.size() + ", left=" + lsnr.latch.getCount() + ']');
+
+ keys = primaryKeys(cache0, BACKUP_ACK_THRESHOLD / 2);
+
+ latch = new CountDownLatch(keys.size());
+
+ lsnr.latch = latch;
+
for (Integer key : keys)
cache0.put(key, key);
+ final long ACK_FREQ = 5000;
+
+ GridTestUtils.waitForCondition(new GridAbsPredicate() {
+ @Override public boolean apply() {
+ return backupQueue.isEmpty();
+ }
+ }, ACK_FREQ + 2000);
+
+ assertTrue("Backup queue is not cleared: " + backupQueue, backupQueue.isEmpty());
+
if (!latch.await(5, SECONDS))
fail("Failed to wait for notifications [exp=" + keys.size() + ", left=" + lsnr.latch.getCount() + ']');
- Thread.sleep(10_000);
+ cur.close();
+
+ assertFalse("Unexpected error during test, see log for details.", err);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testBackupQueueCleanupServerQuery() throws Exception {
+ Ignite qryClient = startGridsMultiThreaded(2);
+
+ CacheEventListener1 lsnr = new CacheEventListener1();
+
+ ContinuousQuery<Object, Object> qry = new ContinuousQuery<>();
+
+ qry.setLocalListener(lsnr);
+
+ IgniteCache<Object, Object> cache = qryClient.cache(null);
+
+ QueryCursor<?> cur = cache.query(qry);
+
+ final Collection<Object> backupQueue = backupQueue(ignite(1));
+
+ assertEquals(0, backupQueue.size());
+
+ List<Integer> keys = primaryKeys(cache, BACKUP_ACK_THRESHOLD);
+
+ CountDownLatch latch = new CountDownLatch(keys.size());
+
+ lsnr.latch = latch;
+
+ for (Integer key : keys) {
+ log.info("Put: " + key);
+
+ cache.put(key, key);
+ }
+
+ GridTestUtils.waitForCondition(new GridAbsPredicate() {
+ @Override public boolean apply() {
+ return backupQueue.isEmpty();
+ }
+ }, 2000);
+
+ assertTrue("Backup queue is not cleared: " + backupQueue, backupQueue.isEmpty());
+
+ if (!latch.await(5, SECONDS))
+ fail("Failed to wait for notifications [exp=" + keys.size() + ", left=" + lsnr.latch.getCount() + ']');
+
+ cur.close();
+ }
+
+ /**
+ * @param ignite Ignite.
+ * @return Backup queue for test query.
+ */
+ private Collection<Object> backupQueue(Ignite ignite) {
+ GridContinuousProcessor proc = ((IgniteKernal)ignite).context().continuous();
+
+ ConcurrentMap<Object, Object> infos = GridTestUtils.getFieldValue(proc, "rmtInfos");
+
+ Collection<Object> backupQueue = null;
+
+ for (Object info : infos.values()) {
+ GridContinuousHandler hnd = GridTestUtils.getFieldValue(info, "hnd");
+
+ if (hnd.isForQuery() && hnd.cacheName() == null) {
+ backupQueue = GridTestUtils.getFieldValue(hnd, "backupQueue");
+
+ break;
+ }
+ }
+
+ assertNotNull(backupQueue);
+
+ return backupQueue;
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void _testFailover() throws Exception {
+ final int SRV_NODES = 4;
+
+ startGridsMultiThreaded(SRV_NODES);
+
+ client = true;
+
+ Ignite qryClient = startGrid(SRV_NODES);
+
+ client = false;
+
+ IgniteCache<Object, Object> qryClientCache = qryClient.cache(null);
+
+ final CacheEventListener2 lsnr = new CacheEventListener2();
+
+ ContinuousQuery<Object, Object> qry = new ContinuousQuery<>();
+
+ qry.setLocalListener(lsnr);
+
+ QueryCursor<?> cur = qryClientCache.query(qry);
+
+ final AtomicBoolean stop = new AtomicBoolean();
+
+ final AtomicReference<CountDownLatch> checkLatch = new AtomicReference<>();
+
+ IgniteInternalFuture<?> restartFut = GridTestUtils.runAsync(new Callable<Void>() {
+ @Override public Void call() throws Exception {
+ final int idx = SRV_NODES + 1;
+
+ while (!stop.get() && !err) {
+ log.info("Start node: " + idx);
+
+ startGrid(idx);
+
+ Thread.sleep(2000);
+
+ log.info("Stop node: " + idx);
+
+ stopGrid(idx);
+
+ Thread.sleep(1000);
+
+ CountDownLatch latch = new CountDownLatch(1);
+
+ assertTrue(checkLatch.compareAndSet(null, latch));
+
+ if (!stop.get()) {
+ log.info("Wait for event check.");
+
+ assertTrue(latch.await(1, MINUTES));
+ }
+ }
+
+ return null;
+ }
+ });
+
+ final Map<Integer, Integer> vals = new HashMap<>();
+
+ try {
+ long stopTime = System.currentTimeMillis() + 3 * 60_000;
+
+ final int PARTS = qryClient.affinity(null).partitions();
+
+ ThreadLocalRandom rnd = ThreadLocalRandom.current();
+
+ while (System.currentTimeMillis() < stopTime) {
+ Integer key = rnd.nextInt(PARTS);
+
+ Integer val = vals.get(key);
+
+ if (val == null)
+ val = 0;
+ else
+ val = val + 1;
+
+ qryClientCache.put(key, val);
+
+ vals.put(key, val);
+
+ CountDownLatch latch = checkLatch.get();
+
+ if (latch != null) {
+ log.info("Check events.");
+
+ checkLatch.set(null);
+
+ boolean success = false;
+
+ try {
+ if (err)
+ break;
+
+ boolean check = GridTestUtils.waitForCondition(new GridAbsPredicate() {
+ @Override public boolean apply() {
+ return checkEvents(false, vals, lsnr);
+ }
+ }, 10_000);
+
+ if (!check)
+ assertTrue(checkEvents(true, vals, lsnr));
+
+ success = true;
+
+ log.info("Events checked.");
+ }
+ finally {
+ if (!success)
+ err = true;
+
+ latch.countDown();
+ }
+ }
+ }
+ }
+ finally {
+ stop.set(true);
+ }
+
+ CountDownLatch latch = checkLatch.get();
+
+ if (latch != null)
+ latch.countDown();
+
+ restartFut.get();
+
+ boolean check = GridTestUtils.waitForCondition(new GridAbsPredicate() {
+ @Override public boolean apply() {
+ return checkEvents(false, vals, lsnr);
+ }
+ }, 10_000);
+
+ if (!check)
+ assertTrue(checkEvents(true, vals, lsnr));
cur.close();
+
+ assertFalse("Unexpected error during test, see log for details.", err);
+ }
+
+ /**
+ * @param logAll If {@code true} logs all unexpected values.
+ * @param vals Expected values.
+ * @param lsnr Listener.
+ * @return Check status.
+ */
+ private boolean checkEvents(boolean logAll, Map<Integer, Integer> vals, CacheEventListener2 lsnr) {
+ assertTrue(!vals.isEmpty());
+
+ ConcurrentHashMap<Integer, Integer> lsnrVals = lsnr.vals;
+
+ ConcurrentHashMap<Integer, Integer> lsnrCntrs = lsnr.cntrs;
+
+ boolean pass = true;
+
+ for (Map.Entry<Integer, Integer> e : vals.entrySet()) {
+ Integer key = e.getKey();
+
+ Integer lsnrVal = lsnrVals.get(key);
+ Integer expVal = e.getValue();
+
+ if (!expVal.equals(lsnrVal)) {
+ pass = false;
+
+ log.info("Unexpected value [key=" + key + ", val=" + lsnrVal + ", expVal=" + expVal + ']');
+
+ if (!logAll)
+ return false;
+ }
+
+ Integer lsnrCntr = lsnrCntrs.get(key);
+ Integer expCntr = expVal + 1;
+
+ if (!expCntr.equals(lsnrCntr)) {
+ pass = false;
+
+ log.info("Unexpected events count [key=" + key + ", val=" + lsnrCntr + ", expVal=" + expCntr + ']');
+
+ if (!logAll)
+ return false;
+ }
+ }
+
+ return pass;
}
/**
*
*/
- private static class CacheEventListener implements CacheEntryUpdatedListener<Object, Object> {
+ private static class CacheEventListener1 implements CacheEntryUpdatedListener<Object, Object> {
/** */
private volatile CountDownLatch latch;
/** */
- @IgniteInstanceResource
- private Ignite ignite;
+ @LoggerResource
+ private IgniteLogger log;
/** {@inheritDoc} */
@Override public void onUpdated(Iterable<CacheEntryEvent<?, ?>> evts) {
- for (CacheEntryEvent<?, ?> evt : evts) {
- ignite.log().info("Received cache event: " + evt);
+ try {
+ for (CacheEntryEvent<?, ?> evt : evts) {
+ CountDownLatch latch = this.latch;
- CountDownLatch latch = this.latch;
+ log.info("Received cache event: " + evt + " " + (latch != null ? latch.getCount() : null));
- testAssert(evt, latch != null);
- testAssert(evt, latch.getCount() > 0);
+ assertTrue(latch != null);
+ assertTrue(latch.getCount() > 0);
- latch.countDown();
+ latch.countDown();
- if (latch.getCount() == 0)
- this.latch = null;
+ if (latch.getCount() == 0)
+ this.latch = null;
+ }
+ }
+ catch (Throwable e) {
+ err = true;
+
+ log.error("Unexpected error", e);
}
}
+ }
+
+ /**
+ *
+ */
+ private static class CacheEventListener2 implements CacheEntryUpdatedListener<Object, Object> {
+ /** */
+ @LoggerResource
+ private IgniteLogger log;
- /**
- * @param evt Event.
- * @param cond Condition to check.
- */
- private void testAssert(CacheEntryEvent evt, boolean cond) {
- if (!cond) {
- ignite.log().info("Unexpected event: " + evt);
+ /** */
+ private final ConcurrentHashMap<Integer, Integer> vals = new ConcurrentHashMap<>();
- err = true;
+ /** */
+ private final ConcurrentHashMap<Integer, Integer> cntrs = new ConcurrentHashMap<>();
+
+ /** {@inheritDoc} */
+ @Override public void onUpdated(Iterable<CacheEntryEvent<?, ?>> evts)
+ throws CacheEntryListenerException {
+ try {
+ for (CacheEntryEvent<?, ?> evt : evts) {
+ Integer key = (Integer)evt.getKey();
+ Integer val = (Integer)evt.getValue();
+
+ assertNotNull(key);
+ assertNotNull(val);
+
+ Integer prevVal = vals.get(key);
+
+ if (prevVal != null) {
+ assertEquals("Unexpected event: " + evt, (Integer)(prevVal + 1), val);
+ assertEquals("Unexpected event: " + evt, prevVal, evt.getOldValue());
+ }
+ else {
+ assertEquals("Unexpected event: " + evt, (Object)0, val);
+ assertNull("Unexpected event: " + evt, evt.getOldValue());
+ }
+
+ vals.put(key, val);
+
+ Integer cntr = cntrs.get(key);
+
+ if (cntr == null)
+ cntr = 1;
+ else
+ cntr = cntr + 1;
+
+ cntrs.put(key, cntr);
+ }
}
+ catch (Throwable e) {
+ err = true;
- assertTrue(cond);
+ log.error("Unexpected error", e);
+ }
}
}
@@ -357,7 +724,7 @@ public abstract class CacheContinuousQueryFailoverAbstractTest extends GridCommo
private volatile boolean skipMsg;
/** {@inheritDoc} */
- @Override public void sendMessage(ClusterNode node, Message msg, IgniteInClosure<IgniteException> ackClosure)
+ @Override public void sendMessage(ClusterNode node, Message msg, IgniteInClosure<IgniteException> ackC)
throws IgniteSpiException {
if (skipMsg && msg instanceof GridIoMessage) {
Object msg0 = ((GridIoMessage)msg).message();
@@ -369,7 +736,7 @@ public abstract class CacheContinuousQueryFailoverAbstractTest extends GridCommo
}
}
- super.sendMessage(node, msg, ackClosure);
+ super.sendMessage(node, msg, ackC);
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/eee8ea41/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAtomicPrimaryWriteOrderTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAtomicPrimaryWriteOrderTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAtomicPrimaryWriteOrderTest.java
new file mode 100644
index 0000000..6515b21
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAtomicPrimaryWriteOrderTest.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.query.continuous;
+
+import org.apache.ignite.cache.*;
+
+import static org.apache.ignite.cache.CacheAtomicWriteOrderMode.*;
+
+/**
+ *
+ */
+public class CacheContinuousQueryFailoverAtomicPrimaryWriteOrderTest extends CacheContinuousQueryFailoverAtomicTest {
+ /** {@inheritDoc} */
+ @Override protected CacheAtomicWriteOrderMode writeOrderMode() {
+ return PRIMARY;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/eee8ea41/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAtomicReplicatedTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAtomicReplicatedTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAtomicReplicatedTest.java
new file mode 100644
index 0000000..c8209d9
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAtomicReplicatedTest.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.query.continuous;
+
+import org.apache.ignite.cache.*;
+
+import static org.apache.ignite.cache.CacheMode.*;
+
+/**
+ *
+ */
+public class CacheContinuousQueryFailoverAtomicReplicatedTest extends CacheContinuousQueryFailoverAtomicTest {
+ /** {@inheritDoc} */
+ @Override protected CacheMode cacheMode() {
+ return REPLICATED;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/eee8ea41/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverTxReplicatedTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverTxReplicatedTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverTxReplicatedTest.java
new file mode 100644
index 0000000..3ac0c64
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverTxReplicatedTest.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.query.continuous;
+
+import org.apache.ignite.cache.*;
+
+import static org.apache.ignite.cache.CacheMode.*;
+
+/**
+ *
+ */
+public class CacheContinuousQueryFailoverTxReplicatedTest extends CacheContinuousQueryFailoverTxTest {
+ /** {@inheritDoc} */
+ @Override protected CacheMode cacheMode() {
+ return REPLICATED;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/eee8ea41/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java
index 2240226..8a7039e 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java
@@ -99,7 +99,10 @@ public class IgniteCacheQuerySelfTestSuite extends TestSuite {
suite.addTestSuite(GridCacheContinuousQueryAtomicNearEnabledSelfTest.class);
suite.addTestSuite(GridCacheContinuousQueryAtomicP2PDisabledSelfTest.class);
suite.addTestSuite(CacheContinuousQueryFailoverAtomicTest.class);
+ suite.addTestSuite(CacheContinuousQueryFailoverAtomicPrimaryWriteOrderTest.class);
+ suite.addTestSuite(CacheContinuousQueryFailoverAtomicReplicatedTest.class);
suite.addTestSuite(CacheContinuousQueryFailoverTxTest.class);
+ suite.addTestSuite(CacheContinuousQueryFailoverTxReplicatedTest.class);
// Reduce fields queries.
suite.addTestSuite(GridCacheReduceFieldsQueryLocalSelfTest.class);