You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by nt...@apache.org on 2015/10/28 14:15:18 UTC
[42/50] [abbrv] ignite git commit: IGNITE-426 Added cache continuos
query probe. Implemented for TX.
IGNITE-426 Added cache continuos query probe. Implemented for TX.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/26408c4b
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/26408c4b
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/26408c4b
Branch: refs/heads/ignite-426-2-reb
Commit: 26408c4b17f5a3462701ef1af98a0994c1246c9d
Parents: 88ecfd4
Author: nikolay_tikhonov <nt...@gridgain.com>
Authored: Wed Oct 21 15:56:36 2015 +0300
Committer: nikolay_tikhonov <nt...@gridgain.com>
Committed: Wed Oct 28 15:24:23 2015 +0300
----------------------------------------------------------------------
.../processors/cache/GridCacheEntryEx.java | 8 +-
.../processors/cache/GridCacheMapEntry.java | 39 ++--
.../cache/GridCacheUpdateTxResult.java | 30 ++-
.../GridDistributedTxRemoteAdapter.java | 22 +-
.../dht/GridDhtPartitionTopologyImpl.java | 2 +
.../distributed/dht/GridDhtTxFinishFuture.java | 12 +-
.../distributed/dht/GridDhtTxFinishRequest.java | 89 +++++++-
.../continuous/CacheContinuousQueryHandler.java | 30 +--
.../continuous/CacheContinuousQueryManager.java | 3 -
.../cache/transactions/IgniteTxEntry.java | 34 ++-
.../cache/transactions/IgniteTxHandler.java | 3 +
.../transactions/IgniteTxLocalAdapter.java | 18 +-
.../cache/transactions/IgniteTxRemoteEx.java | 7 +-
.../continuous/GridContinuousProcessor.java | 3 -
.../processors/cache/GridCacheTestEntryEx.java | 8 +-
...acheContinuousQueryFailoverAbstractTest.java | 209 ++++++++++++++-----
...ueryFailoverAtomicPrimaryWriteOrderTest.java | 14 +-
...inuousQueryFailoverAtomicReplicatedTest.java | 3 +-
.../CacheContinuousQueryFailoverAtomicTest.java | 39 ----
...CacheContinuousQueryClientReconnectTest.java | 187 +++++++++++++++++
.../IgniteCacheContinuousQueryClientTest.java | 157 ++++++++++++--
...cheContinuousQueryClientTxReconnectTest.java | 32 +++
.../IgniteCacheQuerySelfTestSuite.java | 14 +-
.../yardstick/cache/CacheEntryEventProbe.java | 156 ++++++++++++++
24 files changed, 944 insertions(+), 175 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/26408c4b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java
index eb40d20..aa6ea18 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java
@@ -382,7 +382,8 @@ public interface GridCacheEntryEx {
@Nullable GridCacheVersion explicitVer,
@Nullable UUID subjId,
String taskName,
- @Nullable GridCacheVersion dhtVer
+ @Nullable GridCacheVersion dhtVer,
+ @Nullable Long updateIdx
) throws IgniteCheckedException, GridCacheEntryRemovedException;
/**
@@ -419,7 +420,8 @@ public interface GridCacheEntryEx {
@Nullable GridCacheVersion explicitVer,
@Nullable UUID subjId,
String taskName,
- @Nullable GridCacheVersion dhtVer
+ @Nullable GridCacheVersion dhtVer,
+ @Nullable Long updatePartIdx
) throws IgniteCheckedException, GridCacheEntryRemovedException;
/**
@@ -1016,4 +1018,4 @@ public interface GridCacheEntryEx {
* Calls {@link GridDhtLocalPartition#onUnlock()} for this entry's partition.
*/
public void onUnlock();
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/26408c4b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
index d23bdf2..2c3bf8c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
@@ -1060,7 +1060,8 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
@Nullable GridCacheVersion explicitVer,
@Nullable UUID subjId,
String taskName,
- @Nullable GridCacheVersion dhtVer
+ @Nullable GridCacheVersion dhtVer,
+ @Nullable Long updateIdx
) throws IgniteCheckedException, GridCacheEntryRemovedException {
CacheObject old;
@@ -1158,6 +1159,9 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
updateIdx0 = nextPartIndex(topVer);
+ if (updateIdx != null && updateIdx != 0)
+ updateIdx0 = updateIdx;
+
update(val, expireTime, ttl, newVer);
drReplicate(drType, val, newVer);
@@ -1183,7 +1187,9 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
subjId, null, taskName);
}
- if (!isNear())
+ if (!isNear() &&
+ // Ignore events on backups for one phase commit.
+ !(tx.onePhaseCommit() && updateIdx != null && updateIdx == 0))
cctx.continuousQueries().onEntryUpdated(this, key, val, old, tx.local(), false, updateIdx0, topVer);
cctx.dataStructures().onEntryUpdated(key, false);
@@ -1200,7 +1206,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
if (intercept)
cctx.config().getInterceptor().onAfterPut(new CacheLazyEntry(cctx, key, key0, val, val0));
- return valid ? new GridCacheUpdateTxResult(true, retval ? old : null) :
+ return valid ? new GridCacheUpdateTxResult(true, retval ? old : null, updateIdx0) :
new GridCacheUpdateTxResult(false, null);
}
@@ -1227,7 +1233,8 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
@Nullable GridCacheVersion explicitVer,
@Nullable UUID subjId,
String taskName,
- @Nullable GridCacheVersion dhtVer
+ @Nullable GridCacheVersion dhtVer,
+ @Nullable Long updateIdx
) throws IgniteCheckedException, GridCacheEntryRemovedException {
assert cctx.transactional();
@@ -1322,8 +1329,8 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
updateIdx0 = nextPartIndex(topVer);
-// if (updateIdx != null)
-// updateIdx0 = updateIdx;
+ if (updateIdx != null && updateIdx != 0)
+ updateIdx0 = updateIdx;
drReplicate(drType, null, newVer);
@@ -1357,7 +1364,9 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
taskName);
}
- if (!isNear())
+ if (!isNear() &&
+ // Ignore events on backups for one phase commit.
+ !(tx.onePhaseCommit() && updateIdx != null && updateIdx == 0))
cctx.continuousQueries().onEntryUpdated(this, key, null, old, tx.local(), false, updateIdx0, topVer);
cctx.dataStructures().onEntryUpdated(key, true);
@@ -1408,7 +1417,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
else
ret = old;
- return new GridCacheUpdateTxResult(true, ret);
+ return new GridCacheUpdateTxResult(true, ret, updateIdx0);
}
else
return new GridCacheUpdateTxResult(false, null);
@@ -1976,7 +1985,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
null,
null,
false,
- updateIdx0);
+ updateIdx0 == null ? 0 : updateIdx0);
}
}
else
@@ -2053,7 +2062,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
null,
null,
false,
- updateIdx0);
+ -1);
}
}
@@ -2101,7 +2110,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
null,
null,
false,
- updateIdx0);
+ updateIdx0 == null ? 0 : updateIdx);
}
}
else
@@ -3217,13 +3226,13 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
else if (deletedUnlocked())
deletedUnlocked(false);
- drReplicate(drType, val, ver);
-
- long updateIdx = -1;
+ long updateIdx = 0;
if (!preload)
updateIdx = nextPartIndex(topVer);
+ drReplicate(drType, val, ver);
+
if (!skipQryNtf) {
cctx.continuousQueries().onEntryUpdated(this, key, val, null, true, preload, updateIdx, topVer);
@@ -4339,4 +4348,4 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
return "IteratorEntry [key=" + key + ']';
}
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/26408c4b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUpdateTxResult.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUpdateTxResult.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUpdateTxResult.java
index ffda7a2..0f63777 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUpdateTxResult.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUpdateTxResult.java
@@ -32,6 +32,9 @@ public class GridCacheUpdateTxResult {
@GridToStringInclude
private final CacheObject oldVal;
+ /** Partition idx. */
+ private long partIdx;
+
/**
* Constructor.
*
@@ -44,6 +47,31 @@ public class GridCacheUpdateTxResult {
}
/**
+ * Constructor.
+ *
+ * @param success Success flag.
+ * @param oldVal Old value (if any),
+ */
+ GridCacheUpdateTxResult(boolean success, @Nullable CacheObject oldVal, long partIdx) {
+ this.success = success;
+ this.oldVal = oldVal;
+ this.partIdx = partIdx;
+ }
+
+ /**
+ * Sets partition idx.
+ *
+ * @param partIdx Partition idx.
+ */
+ public void partIdx(long partIdx) {
+ this.partIdx = partIdx;
+ }
+
+ public long partIdx() {
+ return partIdx;
+ }
+
+ /**
* @return Success flag.
*/
public boolean success() {
@@ -61,4 +89,4 @@ public class GridCacheUpdateTxResult {
@Override public String toString() {
return S.toString(GridCacheUpdateTxResult.class, this);
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/26408c4b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java
index fcbf58d..c972f43 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java
@@ -281,6 +281,19 @@ public class GridDistributedTxRemoteAdapter extends IgniteTxAdapter
}
}
+ /** {@inheritDoc} */
+ @Override public void setPartitionUpdateIdx(long[] idxs) {
+ if (writeMap != null && !writeMap.isEmpty() && idxs != null && idxs.length > 0) {
+ int i = 0;
+
+ for (IgniteTxEntry txEntry : writeMap.values()) {
+ txEntry.partIdx(idxs[i]);
+
+ ++i;
+ }
+ }
+ }
+
/**
* Adds completed versions to an entry.
*
@@ -591,7 +604,8 @@ public class GridDistributedTxRemoteAdapter extends IgniteTxAdapter
replicate ? DR_BACKUP : DR_NONE,
near() ? null : explicitVer, CU.subjectId(this, cctx),
resolveTaskName(),
- dhtVer);
+ dhtVer,
+ txEntry.partIdx());
else {
cached.innerSet(this,
eventNodeId(),
@@ -609,7 +623,8 @@ public class GridDistributedTxRemoteAdapter extends IgniteTxAdapter
near() ? null : explicitVer,
CU.subjectId(this, cctx),
resolveTaskName(),
- dhtVer);
+ dhtVer,
+ txEntry.partIdx());
// Keep near entry up to date.
if (nearCached != null) {
@@ -638,7 +653,8 @@ public class GridDistributedTxRemoteAdapter extends IgniteTxAdapter
near() ? null : explicitVer,
CU.subjectId(this, cctx),
resolveTaskName(),
- dhtVer);
+ dhtVer,
+ txEntry.partIdx());
// Keep near entry up to date.
if (nearCached != null)
http://git-wip-us.apache.org/repos/asf/ignite/blob/26408c4b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
index 098a60d..4616b17 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
@@ -302,6 +302,8 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
long updateSeq = this.updateSeq.incrementAndGet();
+ cntrMap.clear();
+
// If this is the oldest node.
if (oldest.id().equals(loc.id()) || exchFut.isCacheAdded(cctx.cacheId(), exchId.topologyVersion())) {
if (node2part == null) {
http://git-wip-us.apache.org/repos/asf/ignite/blob/26408c4b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java
index 992bd66..96459ab 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java
@@ -31,6 +31,7 @@ import org.apache.ignite.internal.processors.cache.GridCacheFuture;
import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
import org.apache.ignite.internal.processors.cache.distributed.GridDistributedTxMapping;
import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
+import org.apache.ignite.internal.processors.cache.transactions.IgniteTxEntry;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
import org.apache.ignite.internal.transactions.IgniteTxHeuristicCheckedException;
import org.apache.ignite.internal.transactions.IgniteTxRollbackCheckedException;
@@ -376,6 +377,14 @@ public final class GridDhtTxFinishFuture<K, V> extends GridCompoundIdentityFutur
add(fut); // Append new future.
+ Collection<Long> updateIdxs = F.transform(dhtMapping.entries(), new C1<IgniteTxEntry, Long>() {
+ @Override public Long apply(IgniteTxEntry entry) {
+ assert entry != null;
+
+ return entry.partIdx();
+ }
+ });
+
GridDhtTxFinishRequest req = new GridDhtTxFinishRequest(
tx.nearNodeId(),
futId,
@@ -399,7 +408,8 @@ public final class GridDhtTxFinishFuture<K, V> extends GridCompoundIdentityFutur
tx.size(),
tx.subjectId(),
tx.taskNameHash(),
- tx.activeCachesDeploymentEnabled());
+ tx.activeCachesDeploymentEnabled(),
+ updateIdxs);
req.writeVersion(tx.writeVersion() != null ? tx.writeVersion() : tx.xidVersion());
http://git-wip-us.apache.org/repos/asf/ignite/blob/26408c4b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishRequest.java
index caa0aa5..18ac921 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishRequest.java
@@ -26,6 +26,7 @@ import org.apache.ignite.internal.GridDirectCollection;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.distributed.GridDistributedTxFinishRequest;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
+import org.apache.ignite.internal.util.GridLongList;
import org.apache.ignite.internal.util.tostring.GridToStringInclude;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.lang.IgniteUuid;
@@ -66,6 +67,11 @@ public class GridDhtTxFinishRequest extends GridDistributedTxFinishRequest {
/** Check comitted flag. */
private boolean checkCommitted;
+ /** Partition update counter. */
+ @GridToStringInclude
+ @GridDirectCollection(Long.class)
+ private GridLongList partUpdateCnt;
+
/** One phase commit write version. */
private GridCacheVersion writeVer;
@@ -163,6 +169,74 @@ public class GridDhtTxFinishRequest extends GridDistributedTxFinishRequest {
}
/**
+ * @param nearNodeId Near node ID.
+ * @param futId Future ID.
+ * @param miniId Mini future ID.
+ * @param topVer Topology version.
+ * @param xidVer Transaction ID.
+ * @param threadId Thread ID.
+ * @param commitVer Commit version.
+ * @param isolation Transaction isolation.
+ * @param commit Commit flag.
+ * @param invalidate Invalidate flag.
+ * @param sys System flag.
+ * @param sysInvalidate System invalidation flag.
+ * @param syncCommit Synchronous commit flag.
+ * @param syncRollback Synchronous rollback flag.
+ * @param baseVer Base version.
+ * @param committedVers Committed versions.
+ * @param rolledbackVers Rolled back versions.
+ * @param pendingVers Pending versions.
+ * @param txSize Expected transaction size.
+ * @param subjId Subject ID.
+ * @param taskNameHash Task name hash.
+ * @param updateIdxs Partition update idxs.
+ */
+ public GridDhtTxFinishRequest(
+ UUID nearNodeId,
+ IgniteUuid futId,
+ IgniteUuid miniId,
+ @NotNull AffinityTopologyVersion topVer,
+ GridCacheVersion xidVer,
+ GridCacheVersion commitVer,
+ long threadId,
+ TransactionIsolation isolation,
+ boolean commit,
+ boolean invalidate,
+ boolean sys,
+ byte plc,
+ boolean sysInvalidate,
+ boolean syncCommit,
+ boolean syncRollback,
+ GridCacheVersion baseVer,
+ Collection<GridCacheVersion> committedVers,
+ Collection<GridCacheVersion> rolledbackVers,
+ Collection<GridCacheVersion> pendingVers,
+ int txSize,
+ @Nullable UUID subjId,
+ int taskNameHash,
+ Collection<Long> updateIdxs
+ ) {
+ this(nearNodeId, futId, miniId, topVer, xidVer, commitVer, threadId, isolation, commit, invalidate, sys, plc,
+ sysInvalidate, syncCommit, syncRollback, baseVer, committedVers, rolledbackVers, pendingVers, txSize,
+ subjId, taskNameHash);
+
+ if (updateIdxs != null && !updateIdxs.isEmpty()) {
+ partUpdateCnt = new GridLongList(updateIdxs.size());
+
+ for (Long idx : updateIdxs)
+ partUpdateCnt.add(idx);
+ }
+ }
+
+ /**
+ * @return Partition update counters.
+ */
+ public GridLongList partUpdateCounters(){
+ return partUpdateCnt;
+ }
+
+ /**
* @return Mini ID.
*/
public IgniteUuid miniId() {
@@ -329,6 +403,11 @@ public class GridDhtTxFinishRequest extends GridDistributedTxFinishRequest {
writer.incrementState();
+ case 28:
+ if (!writer.writeMessage("partUpdateCnt", partUpdateCnt))
+ return false;
+
+ writer.incrementState();
}
return true;
@@ -429,6 +508,14 @@ public class GridDhtTxFinishRequest extends GridDistributedTxFinishRequest {
reader.incrementState();
+ case 28:
+ partUpdateCnt = reader.readMessage("partUpdateCnt");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
}
return reader.afterMessageRead(GridDhtTxFinishRequest.class);
@@ -441,6 +528,6 @@ public class GridDhtTxFinishRequest extends GridDistributedTxFinishRequest {
/** {@inheritDoc} */
@Override public byte fieldsCount() {
- return 28;
+ return 29;
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/26408c4b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
index c537854..14c1b8d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
@@ -316,10 +316,8 @@ class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler {
}
}
else {
- locLsnr.onUpdated(F.<CacheEntryEvent<? extends K, ? extends V>>asList(evt));
-
- if (!skipPrimaryCheck)
- sendBackupAcknowledge(ackBuf.onAcknowledged(entry), routineId, ctx);
+ if (!entry.filtered())
+ locLsnr.onUpdated(F.<CacheEntryEvent<? extends K, ? extends V>>asList(evt));
}
}
else {
@@ -560,13 +558,7 @@ class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler {
rec = oldRec;
}
- Collection<CacheContinuousQueryEntry> entries = rec.collectEntries(e);
-
- if (CacheContinuousQueryManager.SUPER_DEBUG)
- ctx.log(getClass()).error("Fire the following event for partition : " + e.partition() +
- " Entries: " + Arrays.toString(entries.toArray()));
-
- return entries;
+ return rec.collectEntries(e);
}
/**
@@ -608,9 +600,6 @@ class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler {
synchronized (pendingEnts) {
// Received first event.
if (lastFiredEvt == INIT_VALUE) {
- if (CacheContinuousQueryManager.SUPER_DEBUG)
- log.error("First event. " + entry);
-
lastFiredEvt = entry.updateIndex();
firedEvents.add(new T2<>(lastFiredEvt, entry));
@@ -624,29 +613,18 @@ class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler {
lastFiredEvt = 1;
- if (CacheContinuousQueryManager.SUPER_DEBUG)
- log.error("Lost partition. Start from 1. Entry: " + entry);
-
firedEvents.add(new T2<>(lastFiredEvt, entry));
return F.asList(entry);
}
// Check duplicate.
- if (entry.updateIndex() > lastFiredEvt) {
- if (CacheContinuousQueryManager.SUPER_DEBUG)
- log.error("Put message to pending queue. Counter value: " + lastFiredEvt + " Entry: " + entry);
-
+ if (entry.updateIndex() > lastFiredEvt)
pendingEnts.put(entry.updateIndex(), entry);
- }
else {
if (log.isDebugEnabled())
log.debug("Skip duplicate continuous query message: " + entry);
- if (CacheContinuousQueryManager.SUPER_DEBUG)
- log.error("Received duplicate. Counter value: " + lastFiredEvt + " Entry: " + entry
- + ", Proceed message " + Arrays.toString(firedEvents.toArray()));
-
return Collections.emptyList();
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/26408c4b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
index 16b40c7..65bb670 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
@@ -65,7 +65,6 @@ import static javax.cache.event.EventType.EXPIRED;
import static javax.cache.event.EventType.REMOVED;
import static javax.cache.event.EventType.UPDATED;
import static org.apache.ignite.events.EventType.EVT_CACHE_QUERY_OBJECT_READ;
-import static org.apache.ignite.events.EventType.EVT_CACHE_REBALANCE_PART_DATA_LOST;
import static org.apache.ignite.internal.GridTopic.TOPIC_CACHE;
/**
@@ -87,8 +86,6 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter {
/** */
private static final long BACKUP_ACK_FREQ = 5000;
- public static final boolean SUPER_DEBUG = false;
-
/** Listeners. */
private final ConcurrentMap<UUID, CacheContinuousQueryListener> lsnrs = new ConcurrentHashMap8<>();
http://git-wip-us.apache.org/repos/asf/ignite/blob/26408c4b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java
index 9eb2808..f5cf501 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java
@@ -181,6 +181,9 @@ public class IgniteTxEntry implements GridPeerDeployAware, Message {
*/
private byte flags;
+ /** Partition update index. */
+ private long partIdx;
+
/** */
private GridCacheVersion serReadVer;
@@ -373,6 +376,22 @@ public class IgniteTxEntry implements GridPeerDeployAware, Message {
}
/**
+ * Sets partition index.
+ *
+ * @param partIdx Partition index.
+ */
+ public void partIdx(long partIdx) {
+ this.partIdx = partIdx;
+ }
+
+ /**
+ * @return Partition index.
+ */
+ public long partIdx() {
+ return partIdx;
+ }
+
+ /**
* @param val Value to set.
*/
void setAndMarkValid(CacheObject val) {
@@ -934,6 +953,11 @@ public class IgniteTxEntry implements GridPeerDeployAware, Message {
writer.incrementState();
+ case 12:
+ if (!writer.writeLong("partIdx", partIdx))
+ return false;
+
+ writer.incrementState();
}
return true;
@@ -1043,6 +1067,14 @@ public class IgniteTxEntry implements GridPeerDeployAware, Message {
reader.incrementState();
+ case 12:
+ partIdx = reader.readLong("partIdx");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
}
return reader.afterMessageRead(IgniteTxEntry.class);
@@ -1055,7 +1087,7 @@ public class IgniteTxEntry implements GridPeerDeployAware, Message {
/** {@inheritDoc} */
@Override public byte fieldsCount() {
- return 12;
+ return 13;
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/ignite/blob/26408c4b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
index d9786a8..631f9f2 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
@@ -984,6 +984,9 @@ public class IgniteTxHandler {
// Complete remote candidates.
tx.doneRemote(req.baseVersion(), null, null, null);
+ tx.setPartitionUpdateIdx(
+ req.partUpdateCounters() != null ? req.partUpdateCounters().array() : null);
+
tx.commit();
}
else {
http://git-wip-us.apache.org/repos/asf/ignite/blob/26408c4b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
index 82e5f2a..6f7ae27 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
@@ -1025,7 +1025,11 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
cached.isNear() ? null : explicitVer,
CU.subjectId(this, cctx),
resolveTaskName(),
- dhtVer);
+ dhtVer,
+ null);
+
+ if (updRes.success())
+ txEntry.partIdx(updRes.partIdx());
if (nearCached != null && updRes.success()) {
nearCached.innerSet(
@@ -1045,7 +1049,8 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
null,
CU.subjectId(this, cctx),
resolveTaskName(),
- dhtVer);
+ dhtVer,
+ null);
}
}
else if (op == DELETE) {
@@ -1063,7 +1068,11 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
cached.isNear() ? null : explicitVer,
CU.subjectId(this, cctx),
resolveTaskName(),
- dhtVer);
+ dhtVer,
+ null);
+
+ if (updRes.success())
+ txEntry.partIdx(updRes.partIdx());
if (nearCached != null && updRes.success()) {
nearCached.innerRemove(
@@ -1080,7 +1089,8 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
null,
CU.subjectId(this, cctx),
resolveTaskName(),
- dhtVer);
+ dhtVer,
+ null);
}
}
else if (op == RELOAD) {
http://git-wip-us.apache.org/repos/asf/ignite/blob/26408c4b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxRemoteEx.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxRemoteEx.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxRemoteEx.java
index 9660e4e..845f4f0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxRemoteEx.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxRemoteEx.java
@@ -43,4 +43,9 @@ public interface IgniteTxRemoteEx extends IgniteInternalTx {
* @return {@code True} if entry was found.
*/
public boolean setWriteValue(IgniteTxEntry e);
-}
\ No newline at end of file
+
+ /**
+ * @param idxs Partition update indexes.
+ */
+ public void setPartitionUpdateIdx(long[] idxs);
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/26408c4b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
index c7676d2..3ed186e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
@@ -37,7 +37,6 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
import org.apache.ignite.cluster.ClusterNode;
-import org.apache.ignite.events.CacheRebalancingEvent;
import org.apache.ignite.events.DiscoveryEvent;
import org.apache.ignite.events.Event;
import org.apache.ignite.internal.GridKernalContext;
@@ -74,9 +73,7 @@ import org.apache.ignite.plugin.extensions.communication.Message;
import org.apache.ignite.thread.IgniteThread;
import org.jetbrains.annotations.Nullable;
import org.jsr166.ConcurrentHashMap8;
-import org.jsr166.ConcurrentLinkedDeque8;
-import static org.apache.ignite.events.EventType.EVT_CACHE_REBALANCE_PART_DATA_LOST;
import static org.apache.ignite.events.EventType.EVT_NODE_FAILED;
import static org.apache.ignite.events.EventType.EVT_NODE_LEFT;
import static org.apache.ignite.events.EventType.EVT_NODE_SEGMENTED;
http://git-wip-us.apache.org/repos/asf/ignite/blob/26408c4b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java
index 9ee6fe7..110b9a6 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java
@@ -479,7 +479,8 @@ public class GridCacheTestEntryEx extends GridMetadataAwareAdapter implements Gr
@Nullable GridCacheVersion drVer,
UUID subjId,
String taskName,
- @Nullable GridCacheVersion dhtVer)
+ @Nullable GridCacheVersion dhtVer,
+ @Nullable Long updateIdx)
throws IgniteCheckedException, GridCacheEntryRemovedException {
return new GridCacheUpdateTxResult(true, rawPut(val, ttl));
}
@@ -553,7 +554,8 @@ public class GridCacheTestEntryEx extends GridMetadataAwareAdapter implements Gr
@Nullable GridCacheVersion drVer,
UUID subjId,
String taskName,
- @Nullable GridCacheVersion dhtVer
+ @Nullable GridCacheVersion dhtVer,
+ @Nullable Long updatePartIdx
) throws IgniteCheckedException, GridCacheEntryRemovedException {
obsoleteVer = ver;
@@ -896,4 +898,4 @@ public class GridCacheTestEntryEx extends GridMetadataAwareAdapter implements Gr
@Override public void onUnlock() {
// No-op.
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/26408c4b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractTest.java
index ca754af..6029761 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractTest.java
@@ -36,6 +36,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import javax.cache.Cache;
+import javax.cache.CacheException;
import javax.cache.event.CacheEntryEvent;
import javax.cache.event.CacheEntryListenerException;
import javax.cache.event.CacheEntryUpdatedListener;
@@ -55,6 +56,7 @@ import org.apache.ignite.cache.affinity.Affinity;
import org.apache.ignite.cache.query.ContinuousQuery;
import org.apache.ignite.cache.query.QueryCursor;
import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.cluster.ClusterTopologyException;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.internal.IgniteInternalFuture;
@@ -84,6 +86,7 @@ import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
import org.apache.ignite.testframework.GridTestUtils;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.apache.ignite.transactions.Transaction;
import static java.util.concurrent.TimeUnit.MINUTES;
import static java.util.concurrent.TimeUnit.SECONDS;
@@ -297,7 +300,7 @@ public abstract class CacheContinuousQueryFailoverAbstractTest extends GridCommo
IgniteCache<Object, Object> srvCache = igniteSrv.cache(null);
- List<Integer> keys = testKeys(srvCache, 1);
+ List<Integer> keys = testKeys(srvCache, 3);
int keyCnt = keys.size();
@@ -371,6 +374,9 @@ public abstract class CacheContinuousQueryFailoverAbstractTest extends GridCommo
* @throws Exception If failed.
*/
public void testLeftPrimaryAndBackupNodes() throws Exception {
+ if (cacheMode() == REPLICATED)
+ return;
+
this.backups = 1;
final int SRV_NODES = 3;
@@ -485,7 +491,7 @@ public abstract class CacheContinuousQueryFailoverAbstractTest extends GridCommo
return qryClient.cluster().nodes().size() == (SRV_NODES + 1 /** client node */)
- 1 /** Primary node */ - backups;
}
- }, 10000L);
+ }, 5000L);
for (; keyIter < keys.size(); keyIter++) {
int key = keys.get(keyIter);
@@ -560,7 +566,7 @@ public abstract class CacheContinuousQueryFailoverAbstractTest extends GridCommo
final List<T3<Object, Object, Object>> expEvts = new ArrayList<>();
- for (int i = 0; i < SRV_NODES - 1; i++) {
+ for (int i = 0; i < (atomicityMode() == CacheAtomicityMode.ATOMIC ? SRV_NODES - 1 : SRV_NODES - 2); i++) {
log.info("Stop iteration: " + i);
TestCommunicationSpi spi = (TestCommunicationSpi)ignite(i).configuration().getCommunicationSpi();
@@ -654,7 +660,8 @@ public abstract class CacheContinuousQueryFailoverAbstractTest extends GridCommo
* @throws Exception If failed.
*/
private void checkBackupQueue(int backups, boolean updateFromClient) throws Exception {
- this.backups = backups;
+ this.backups = atomicityMode() == CacheAtomicityMode.ATOMIC ? backups :
+ backups < 2 ? 2 : backups;
final int SRV_NODES = 4;
@@ -668,9 +675,6 @@ public abstract class CacheContinuousQueryFailoverAbstractTest extends GridCommo
IgniteCache<Object, Object> qryClientCache = qryClient.cache(null);
- if (cacheMode() != REPLICATED)
- assertEquals(backups, qryClientCache.getConfiguration(CacheConfiguration.class).getBackups());
-
Affinity<Object> aff = qryClient.affinity(null);
CacheEventListener1 lsnr = new CacheEventListener1(false);
@@ -687,7 +691,7 @@ public abstract class CacheContinuousQueryFailoverAbstractTest extends GridCommo
List<T3<Object, Object, Object>> expEvts = new ArrayList<>();
- for (int i = 0; i < SRV_NODES - 1; i++) {
+ for (int i = 0; i < (atomicityMode() == CacheAtomicityMode.ATOMIC ? SRV_NODES - 1 : SRV_NODES - 2); i++) {
log.info("Stop iteration: " + i);
TestCommunicationSpi spi = (TestCommunicationSpi)ignite(i).configuration().getCommunicationSpi();
@@ -709,6 +713,39 @@ public abstract class CacheContinuousQueryFailoverAbstractTest extends GridCommo
T2<Object, Object> t = updates.get(key);
+ if (updateFromClient) {
+ if (atomicityMode() == CacheAtomicityMode.TRANSACTIONAL) {
+ try (Transaction tx = qryClient.transactions().txStart()) {
+ qryClientCache.put(key, key);
+
+ tx.commit();
+ }
+ catch (CacheException | ClusterTopologyException e) {
+ log.warning("Failed put. [Key=" + key + ", val=" + key + "]");
+
+ continue;
+ }
+ }
+ else
+ qryClientCache.put(key, key);
+ }
+ else {
+ if (atomicityMode() == CacheAtomicityMode.TRANSACTIONAL) {
+ try (Transaction tx = ignite.transactions().txStart()) {
+ cache.put(key, key);
+
+ tx.commit();
+ }
+ catch (CacheException | ClusterTopologyException e) {
+ log.warning("Failed put. [Key=" + key + ", val=" + key + "]");
+
+ continue;
+ }
+ }
+ else
+ cache.put(key, key);
+ }
+
if (t == null) {
updates.put(key, new T2<>((Object)key, null));
@@ -720,11 +757,6 @@ public abstract class CacheContinuousQueryFailoverAbstractTest extends GridCommo
expEvts.add(new T3<>((Object)key, (Object)key, (Object)key));
}
- if (updateFromClient)
- qryClientCache.put(key, key);
- else
- cache.put(key, key);
-
if (first) {
spi.skipMsg = true;
@@ -747,7 +779,7 @@ public abstract class CacheContinuousQueryFailoverAbstractTest extends GridCommo
checkEvents(expEvts, lsnr);
}
- for (int i = 0; i < SRV_NODES - 1; i++) {
+ for (int i = 0; i < (atomicityMode() == CacheAtomicityMode.ATOMIC ? SRV_NODES - 1 : SRV_NODES - 2); i++) {
log.info("Start iteration: " + i);
Ignite ignite = startGrid(i);
@@ -782,7 +814,7 @@ public abstract class CacheContinuousQueryFailoverAbstractTest extends GridCommo
cache.put(key, key);
}
- if (!latch.await(5, SECONDS)) {
+ if (!latch.await(10, SECONDS)) {
Set<Integer> keys0 = new HashSet<>(keys);
keys0.removeAll(lsnr.keys);
@@ -824,7 +856,7 @@ public abstract class CacheContinuousQueryFailoverAbstractTest extends GridCommo
*/
private void checkEvents(final List<T3<Object, Object, Object>> expEvts, final CacheEventListener2 lsnr,
boolean lostAllow) throws Exception {
- boolean b = GridTestUtils.waitForCondition(new PA() {
+ GridTestUtils.waitForCondition(new PA() {
@Override public boolean apply() {
return expEvts.size() == lsnr.size();
}
@@ -910,7 +942,7 @@ public abstract class CacheContinuousQueryFailoverAbstractTest extends GridCommo
for (T3<Object, Object, Object> e : lostEvents)
log.error("Lost event: " + e);
- assertTrue("Lose events, see log for details.", false);
+ fail("Lose events, see log for details.");
}
log.error("Lost event cnt: " + lostEvents.size());
@@ -1155,17 +1187,19 @@ public abstract class CacheContinuousQueryFailoverAbstractTest extends GridCommo
* @throws Exception If failed.
*/
public void testFailover() throws Exception {
+ this.backups = 2;
+
final int SRV_NODES = 4;
startGridsMultiThreaded(SRV_NODES);
client = true;
- Ignite qryClient = startGrid(SRV_NODES);
+ final Ignite qryCln = startGrid(SRV_NODES);
client = false;
- IgniteCache<Object, Object> qryClientCache = qryClient.cache(null);
+ final IgniteCache<Object, Object> qryClnCache = qryCln.cache(null);
final CacheEventListener2 lsnr = new CacheEventListener2();
@@ -1173,7 +1207,7 @@ public abstract class CacheContinuousQueryFailoverAbstractTest extends GridCommo
qry.setLocalListener(lsnr);
- QueryCursor<?> cur = qryClientCache.query(qry);
+ QueryCursor<?> cur = qryClnCache.query(qry);
final AtomicBoolean stop = new AtomicBoolean();
@@ -1194,7 +1228,12 @@ public abstract class CacheContinuousQueryFailoverAbstractTest extends GridCommo
log.info("Stop node: " + idx);
- stopGrid(idx);
+ try {
+ stopGrid(idx);
+ }
+ catch (Exception e) {
+ log.warning("Failed to stop nodes.", e);
+ }
CountDownLatch latch = new CountDownLatch(1);
@@ -1216,9 +1255,9 @@ public abstract class CacheContinuousQueryFailoverAbstractTest extends GridCommo
final Map<Integer, List<T2<Integer, Integer>>> expEvts = new HashMap<>();
try {
- long stopTime = System.currentTimeMillis() + 1 * 60_000;
+ long stopTime = System.currentTimeMillis() + 60_000;
- final int PARTS = qryClient.affinity(null).partitions();
+ final int PARTS = qryCln.affinity(null).partitions();
ThreadLocalRandom rnd = ThreadLocalRandom.current();
@@ -1234,17 +1273,51 @@ public abstract class CacheContinuousQueryFailoverAbstractTest extends GridCommo
val = val + 1;
if (processorPut && prevVal != null) {
- qryClientCache.invoke(key, new CacheEntryProcessor<Object, Object, Void>() {
- @Override public Void process(MutableEntry<Object, Object> entry,
- Object... arguments) throws EntryProcessorException {
- entry.setValue(arguments[0]);
+ if (atomicityMode() == CacheAtomicityMode.TRANSACTIONAL) {
+ try (Transaction tx = qryCln.transactions().txStart()) {
+ qryClnCache.invoke(key, new CacheEntryProcessor<Object, Object, Void>() {
+ @Override public Void process(MutableEntry<Object, Object> e,
+ Object... arg) throws EntryProcessorException {
+ e.setValue(arg[0]);
+
+ return null;
+ }
+ }, val);
+
+ tx.commit();
+ }
+ catch (CacheException | ClusterTopologyException e) {
+ log.warning("Failed put. [Key=" + key + ", val=" + val + "]");
- return null;
+ continue;
}
- }, val);
+ }
+ else
+ qryClnCache.invoke(key, new CacheEntryProcessor<Object, Object, Void>() {
+ @Override public Void process(MutableEntry<Object, Object> e,
+ Object... arg) throws EntryProcessorException {
+ e.setValue(arg[0]);
+
+ return null;
+ }
+ }, val);
+ }
+ else {
+ if (atomicityMode() == CacheAtomicityMode.TRANSACTIONAL) {
+ try (Transaction tx = qryCln.transactions().txStart()) {
+ qryClnCache.put(key, val);
+
+ tx.commit();
+ }
+ catch (CacheException | ClusterTopologyException e) {
+ log.warning("Failed put. [Key=" + key + ", val=" + val + "]");
+
+ continue;
+ }
+ }
+ else
+ qryClnCache.put(key, val);
}
- else
- qryClientCache.put(key, val);
processorPut = !processorPut;
@@ -1306,11 +1379,14 @@ public abstract class CacheContinuousQueryFailoverAbstractTest extends GridCommo
restartFut.get();
- boolean check = GridTestUtils.waitForCondition(new GridAbsPredicate() {
- @Override public boolean apply() {
- return checkEvents(false, expEvts, lsnr);
- }
- }, 10_000);
+ boolean check = true;
+
+ if (!expEvts.isEmpty())
+ check = GridTestUtils.waitForCondition(new GridAbsPredicate() {
+ @Override public boolean apply() {
+ return checkEvents(false, expEvts, lsnr);
+ }
+ }, 10_000);
if (!check)
assertTrue(checkEvents(true, expEvts, lsnr));
@@ -1324,6 +1400,8 @@ public abstract class CacheContinuousQueryFailoverAbstractTest extends GridCommo
* @throws Exception If failed.
*/
public void testFailoverFilter() throws Exception {
+ this.backups = 2;
+
final int SRV_NODES = 4;
startGridsMultiThreaded(SRV_NODES);
@@ -1385,7 +1463,7 @@ public abstract class CacheContinuousQueryFailoverAbstractTest extends GridCommo
final Map<Integer, List<T2<Integer, Integer>>> expEvts = new HashMap<>();
try {
- long stopTime = System.currentTimeMillis() + 1 * 60_000;
+ long stopTime = System.currentTimeMillis() + 60_000;
final int PARTS = qryClient.affinity(null).partitions();
@@ -1510,15 +1588,15 @@ public abstract class CacheContinuousQueryFailoverAbstractTest extends GridCommo
/**
* @throws Exception If failed.
*/
- public void testFailoverStartStopOneBackup() throws Exception {
- failoverStartStopFilter(1);
+ public void testFailoverStartStopBackup() throws Exception {
+ failoverStartStopFilter(atomicityMode() == CacheAtomicityMode.ATOMIC ? 1 : 2);
}
/**
* @throws Exception If failed.
*/
- public void _testStartStop() throws Exception {
- this.backups = 0;
+ public void testStartStop() throws Exception {
+ this.backups = 2;
final int SRV_NODES = 4;
@@ -1532,6 +1610,8 @@ public abstract class CacheContinuousQueryFailoverAbstractTest extends GridCommo
IgniteCache<Object, Object> qryClnCache = qryClient.cache(null);
+ Affinity<Object> aff = qryClient.affinity(null);
+
final CacheEventListener2 lsnr = new CacheEventListener2();
ContinuousQuery<Object, Object> qry = new ContinuousQuery<>();
@@ -1542,18 +1622,18 @@ public abstract class CacheContinuousQueryFailoverAbstractTest extends GridCommo
QueryCursor<?> cur = qryClnCache.query(qry);
- for (int i = 0; i < 100; i++) {
+ for (int i = 0; i < 20; i++) {
final int idx = i % (SRV_NODES - 1);
log.info("Stop node: " + idx);
stopGrid(idx);
- Thread.sleep(200);
+ awaitPartitionMapExchange();
List<T3<Object, Object, Object>> afterRestEvents = new ArrayList<>();
- for (int j = 0; j < 10; j++) {
+ for (int j = 0; j < aff.partitions(); j++) {
Integer oldVal = (Integer)qryClnCache.get(j);
qryClnCache.put(j, i);
@@ -1646,7 +1726,7 @@ public abstract class CacheContinuousQueryFailoverAbstractTest extends GridCommo
final List<T3<Object, Object, Object>> expEvtsLsnr = new ArrayList<>();
try {
- long stopTime = System.currentTimeMillis() + 60_000;
+ long stopTime = System.currentTimeMillis() + 10_000;
// Start new filter each 5 sec.
long startFilterTime = System.currentTimeMillis() + 5_000;
@@ -1785,13 +1865,11 @@ public abstract class CacheContinuousQueryFailoverAbstractTest extends GridCommo
dinLsnr.evts.clear();
dinLsnr.vals.clear();
-
- dinQry.close();
}
List<T3<Object, Object, Object>> afterRestEvents = new ArrayList<>();
- for (int i = 0; i < 1024; i++) {
+ for (int i = 0; i < qryClient.affinity(null).partitions(); i++) {
Integer oldVal = (Integer)qryClnCache.get(i);
qryClnCache.put(i, i);
@@ -1801,12 +1879,13 @@ public abstract class CacheContinuousQueryFailoverAbstractTest extends GridCommo
checkEvents(new ArrayList<>(afterRestEvents), lsnr, false);
- //checkEvents(new ArrayList<>(afterRestEvents), dinLsnr, false);
-
cur.close();
- if (dinQry != null)
+ if (dinQry != null) {
+ checkEvents(new ArrayList<>(afterRestEvents), dinLsnr, false);
+
dinQry.close();
+ }
assertFalse("Unexpected error during test, see log for details.", err);
}
@@ -1815,6 +1894,8 @@ public abstract class CacheContinuousQueryFailoverAbstractTest extends GridCommo
* @throws Exception If failed.
*/
public void testMultiThreaded() throws Exception {
+ this.backups = 2;
+
final int SRV_NODES = 3;
startGridsMultiThreaded(SRV_NODES);
@@ -1957,8 +2038,24 @@ public abstract class CacheContinuousQueryFailoverAbstractTest extends GridCommo
T2<Integer, Integer> expEvt = exp.get(i);
CacheEntryEvent<?, ?> rcvdEvt = rcvdEvts.get(i);
- assertEquals(key, rcvdEvt.getKey());
- assertEquals(expEvt.get1(), rcvdEvt.getValue());
+ if (pass) {
+ assertEquals(key, rcvdEvt.getKey());
+ assertEquals(expEvt.get1(), rcvdEvt.getValue());
+ }
+ else {
+ if (!key.equals(rcvdEvt.getKey()) || !expEvt.get1().equals(rcvdEvt.getValue()))
+ log.warning("Missed events. [key=" + key + ", actKey=" + rcvdEvt.getKey()
+ + ", expVal=" + expEvt.get1() + ", actVal=" + rcvdEvt.getValue() + "]");
+ }
+ }
+
+ if (!pass) {
+ for (int i = cnt; i < exp.size(); i++) {
+ T2<Integer, Integer> val = exp.get(i);
+
+ log.warning("Missed events. [key=" + key + ", expVal=" + val.get1()
+ + ", prevVal=" + val.get2() + "]");
+ }
}
}
}
@@ -2168,7 +2265,8 @@ public abstract class CacheContinuousQueryFailoverAbstractTest extends GridCommo
if (msg0 instanceof GridContinuousMessage) {
if (skipMsg) {
- log.info("Skip continuous message: " + msg0);
+ if (log.isDebugEnabled())
+ log.debug("Skip continuous message: " + msg0);
return;
}
@@ -2176,7 +2274,8 @@ public abstract class CacheContinuousQueryFailoverAbstractTest extends GridCommo
AtomicBoolean sndFirstOnly = this.sndFirstOnly;
if (sndFirstOnly != null && !sndFirstOnly.compareAndSet(false, true)) {
- log.info("Skip continuous message: " + msg0);
+ if (log.isDebugEnabled())
+ log.debug("Skip continuous message: " + msg0);
return;
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/26408c4b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAtomicPrimaryWriteOrderTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAtomicPrimaryWriteOrderTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAtomicPrimaryWriteOrderTest.java
index 4ddcf0d..8bd7ea7 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAtomicPrimaryWriteOrderTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAtomicPrimaryWriteOrderTest.java
@@ -18,15 +18,27 @@
package org.apache.ignite.internal.processors.cache.query.continuous;
import org.apache.ignite.cache.CacheAtomicWriteOrderMode;
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.cache.CacheMode;
import static org.apache.ignite.cache.CacheAtomicWriteOrderMode.PRIMARY;
/**
*
*/
-public class CacheContinuousQueryFailoverAtomicPrimaryWriteOrderTest extends CacheContinuousQueryFailoverAtomicTest {
+public class CacheContinuousQueryFailoverAtomicPrimaryWriteOrderTest extends CacheContinuousQueryFailoverAbstractTest {
/** {@inheritDoc} */
@Override protected CacheAtomicWriteOrderMode writeOrderMode() {
return PRIMARY;
}
+
+ /** {@inheritDoc} */
+ @Override protected CacheMode cacheMode() {
+ return CacheMode.PARTITIONED;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected CacheAtomicityMode atomicityMode() {
+ return CacheAtomicityMode.ATOMIC;
+ }
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/26408c4b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAtomicReplicatedTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAtomicReplicatedTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAtomicReplicatedTest.java
index 8fc58d3..db5b8cb 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAtomicReplicatedTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAtomicReplicatedTest.java
@@ -26,7 +26,8 @@ import static org.apache.ignite.cache.CacheMode.REPLICATED;
/**
*
*/
-public class CacheContinuousQueryFailoverAtomicReplicatedTest extends CacheContinuousQueryFailoverAtomicTest {
+public class CacheContinuousQueryFailoverAtomicReplicatedTest
+ extends CacheContinuousQueryFailoverAtomicPrimaryWriteOrderTest {
/** {@inheritDoc} */
@Override protected CacheMode cacheMode() {
return REPLICATED;
http://git-wip-us.apache.org/repos/asf/ignite/blob/26408c4b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAtomicTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAtomicTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAtomicTest.java
deleted file mode 100644
index fb50387..0000000
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAtomicTest.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.cache.query.continuous;
-
-import org.apache.ignite.cache.CacheAtomicityMode;
-import org.apache.ignite.cache.CacheMode;
-
-import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC;
-import static org.apache.ignite.cache.CacheMode.PARTITIONED;
-
-/**
- *
- */
-public class CacheContinuousQueryFailoverAtomicTest extends CacheContinuousQueryFailoverAbstractTest {
- /** {@inheritDoc} */
- @Override protected CacheMode cacheMode() {
- return PARTITIONED;
- }
-
- /** {@inheritDoc} */
- @Override protected CacheAtomicityMode atomicityMode() {
- return ATOMIC;
- }
-}
http://git-wip-us.apache.org/repos/asf/ignite/blob/26408c4b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/IgniteCacheContinuousQueryClientReconnectTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/IgniteCacheContinuousQueryClientReconnectTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/IgniteCacheContinuousQueryClientReconnectTest.java
new file mode 100644
index 0000000..560f2e0
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/IgniteCacheContinuousQueryClientReconnectTest.java
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.query.continuous;
+
+import java.util.concurrent.CountDownLatch;
+import javax.cache.event.CacheEntryEvent;
+import javax.cache.event.CacheEntryUpdatedListener;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.cache.query.ContinuousQuery;
+import org.apache.ignite.cache.query.QueryCursor;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteClientReconnectAbstractTest;
+import org.apache.ignite.resources.LoggerResource;
+
+import static java.util.concurrent.TimeUnit.SECONDS;
+import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC;
+import static org.apache.ignite.cache.CacheMode.PARTITIONED;
+import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
+
+/**
+ *
+ */
+public class IgniteCacheContinuousQueryClientReconnectTest extends IgniteClientReconnectAbstractTest {
+ /** {@inheritDoc} */
+ @Override protected int serverCount() {
+ return 4;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected int clientCount() {
+ return 1;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+ CacheConfiguration ccfg = new CacheConfiguration();
+
+ ccfg.setCacheMode(PARTITIONED);
+ ccfg.setAtomicityMode(atomicMode());
+ ccfg.setWriteSynchronizationMode(FULL_SYNC);
+
+ cfg.setCacheConfiguration(ccfg);
+
+ return cfg;
+ }
+
+ /**
+ * @return Atomic mode.
+ */
+ protected CacheAtomicityMode atomicMode() {
+ return ATOMIC;
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testReconnectClient() throws Exception {
+ Ignite client = grid(serverCount());
+
+ Ignite srv = clientRouter(client);
+
+ assertTrue(client.cluster().localNode().isClient());
+
+ final CacheEventListener lsnr = new CacheEventListener();
+
+ ContinuousQuery<Object, Object> qry = new ContinuousQuery<>();
+
+ qry.setLocalListener(lsnr);
+
+ IgniteCache<Object, Object> clnCache = client.cache(null);
+
+ QueryCursor<?> cur = clnCache.query(qry);
+
+ int keyCnt = 100;
+
+ for (int i = 0; i < 30; i++) {
+ lsnr.latch = new CountDownLatch(keyCnt);
+
+ for (int key = 0; key < keyCnt; key++)
+ clnCache.put(key, key);
+
+ assertTrue("Failed to wait for event.", lsnr.latch.await(5, SECONDS));
+
+ reconnectClientNode(client, srv, null);
+
+ lsnr.latch = new CountDownLatch(keyCnt);
+
+ for (int key = 0; key < keyCnt; key++)
+ clnCache.put(key, key);
+
+ assertTrue("Failed to wait for event.", lsnr.latch.await(5, SECONDS));
+ }
+
+ cur.close();
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testReconnectClientAndLeftRouter() throws Exception {
+ Ignite client = grid(serverCount());
+
+ final Ignite srv = clientRouter(client);
+
+ final String clnRouterName = srv.name();
+
+ assertTrue(client.cluster().localNode().isClient());
+
+ final CacheEventListener lsnr = new CacheEventListener();
+
+ ContinuousQuery<Object, Object> qry = new ContinuousQuery<>();
+
+ qry.setLocalListener(lsnr);
+
+ IgniteCache<Object, Object> clnCache = client.cache(null);
+
+ QueryCursor<?> cur = clnCache.query(qry);
+
+ int keyCnt = 100;
+
+ lsnr.latch = new CountDownLatch(keyCnt);
+
+ for (int key = 0; key < keyCnt; key++)
+ clnCache.put(key, key);
+
+ assertTrue("Failed to wait for event.", lsnr.latch.await(5, SECONDS));
+
+ reconnectClientNode(client, srv, new Runnable() {
+ @Override public void run() {
+ stopGrid(clnRouterName);
+ }
+ });
+
+ assertFalse("Client connected to the same server node.", clnRouterName.equals(clientRouter(client).name()));
+
+ lsnr.latch = new CountDownLatch(keyCnt);
+
+ for (int key = 0; key < keyCnt; key++)
+ clnCache.put(key, key);
+
+ assertTrue("Failed to wait for event.", lsnr.latch.await(5, SECONDS));
+
+ cur.close();
+ }
+
+ /**
+ *
+ */
+ private static class CacheEventListener implements CacheEntryUpdatedListener<Object, Object> {
+ /** */
+ private volatile CountDownLatch latch = new CountDownLatch(1);
+
+ /** */
+ @LoggerResource
+ private IgniteLogger log;
+
+ /** {@inheritDoc} */
+ @Override public void onUpdated(Iterable<CacheEntryEvent<?, ?>> evts) {
+ for (CacheEntryEvent<?, ?> evt : evts) {
+ log.info("Received cache event: " + evt);
+
+ latch.countDown();
+ }
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/26408c4b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/IgniteCacheContinuousQueryClientTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/IgniteCacheContinuousQueryClientTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/IgniteCacheContinuousQueryClientTest.java
index 1afeb05..534f298 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/IgniteCacheContinuousQueryClientTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/IgniteCacheContinuousQueryClientTest.java
@@ -27,11 +27,13 @@ import org.apache.ignite.cache.query.ContinuousQuery;
import org.apache.ignite.cache.query.QueryCursor;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
-import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.internal.util.lang.GridAbsPredicate;
+import org.apache.ignite.lang.IgniteOutClosure;
import org.apache.ignite.resources.LoggerResource;
import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.testframework.GridTestUtils;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import static java.util.concurrent.TimeUnit.SECONDS;
@@ -83,11 +85,13 @@ public class IgniteCacheContinuousQueryClientTest extends GridCommonAbstractTest
client = true;
- Ignite clientNode = startGrid(3);
+ final int CLIENT_ID = 3;
+
+ Ignite clientNode = startGrid(CLIENT_ID);
client = false;
- CacheEventListener lsnr = new CacheEventListener();
+ final CacheEventListener lsnr = new CacheEventListener();
ContinuousQuery<Object, Object> qry = new ContinuousQuery<>();
@@ -95,27 +99,154 @@ public class IgniteCacheContinuousQueryClientTest extends GridCommonAbstractTest
QueryCursor<?> cur = clientNode.cache(null).query(qry);
- Ignite joined1 = startGrid(4);
+ for (int i = 0; i < 10; i++) {
+ log.info("Start iteration: " + i);
+
+ lsnr.latch = new CountDownLatch(1);
+
+ Ignite joined1 = startGrid(4);
+
+ IgniteCache<Object, Object> joinedCache1 = joined1.cache(null);
+
+ joinedCache1.put(primaryKey(joinedCache1), 1);
+
+ assertTrue("Failed to wait for event.", lsnr.latch.await(5, SECONDS));
+
+ lsnr.latch = new CountDownLatch(1);
+
+ Ignite joined2 = startGrid(5);
+
+ IgniteCache<Object, Object> joinedCache2 = joined2.cache(null);
- IgniteCache<Object, Object> joinedCache1 = joined1.cache(null);
+ joinedCache2.put(primaryKey(joinedCache2), 2);
- joinedCache1.put(primaryKey(joinedCache1), 1);
+ assertTrue("Failed to wait for event.", lsnr.latch.await(5, SECONDS));
- assertTrue("Failed to wait for event.", lsnr.latch.await(5, SECONDS));
+ stopGrid(4);
+
+ stopGrid(5);
+ }
cur.close();
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testNodeJoinsRestartQuery() throws Exception {
+ startGrids(2);
+
+ client = true;
+
+ final int CLIENT_ID = 3;
+
+ Ignite clientNode = startGrid(CLIENT_ID);
+
+ client = false;
+
+ for (int i = 0; i < 10; i++) {
+ log.info("Start iteration: " + i);
+
+ final CacheEventListener lsnr = new CacheEventListener();
+
+ ContinuousQuery<Object, Object> qry = new ContinuousQuery<>();
+
+ qry.setLocalListener(lsnr);
+
+ QueryCursor<?> cur = clientNode.cache(null).query(qry);
+
+ lsnr.latch = new CountDownLatch(1);
+
+ Ignite joined1 = startGrid(4);
+
+ IgniteCache<Object, Object> joinedCache1 = joined1.cache(null);
+
+ joinedCache1.put(primaryKey(joinedCache1), 1);
+
+ assertTrue("Failed to wait for event.", lsnr.latch.await(5, SECONDS));
+
+ cur.close();
- lsnr.latch = new CountDownLatch(1);
+ lsnr.latch = new CountDownLatch(1);
- Ignite joined2 = startGrid(5);
+ Ignite joined2 = startGrid(5);
- IgniteCache<Object, Object> joinedCache2 = joined2.cache(null);
+ IgniteCache<Object, Object> joinedCache2 = joined2.cache(null);
- joinedCache2.put(primaryKey(joinedCache2), 2);
+ joinedCache2.put(primaryKey(joinedCache2), 2);
- U.sleep(1000);
+ assertFalse("Unexpected event received.", GridTestUtils.waitForCondition(new GridAbsPredicate() {
+ @Override public boolean apply() {
+ return 1 != lsnr.latch.getCount();
+ }
+ }, 1000));
- assertEquals("Unexpected event received.", 1, lsnr.latch.getCount());
+ stopGrid(4);
+
+ stopGrid(5);
+ }
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testServerNodeLeft() throws Exception {
+ startGrids(3);
+
+ client = true;
+
+ final int CLIENT_ID = 3;
+
+ Ignite clnNode = startGrid(CLIENT_ID);
+
+ client = false;
+
+ IgniteOutClosure<IgniteCache<Integer, Integer>> rndCache =
+ new IgniteOutClosure<IgniteCache<Integer, Integer>>() {
+ int cnt = 0;
+
+ @Override public IgniteCache<Integer, Integer> apply() {
+ ++cnt;
+
+ return grid(CLIENT_ID).cache(null);
+ }
+ };
+
+ final CacheEventListener lsnr = new CacheEventListener();
+
+ ContinuousQuery<Object, Object> qry = new ContinuousQuery<>();
+
+ qry.setLocalListener(lsnr);
+
+ QueryCursor<?> cur = clnNode.cache(null).query(qry);
+
+ boolean first = true;
+
+ int keyCnt = 1;
+
+ for (int i = 0; i < 10; i++) {
+ log.info("Start iteration: " + i);
+
+ if (first)
+ first = false;
+ else {
+ for (int srv = 0; srv < CLIENT_ID - 1; srv++)
+ startGrid(srv);
+ }
+
+ lsnr.latch = new CountDownLatch(keyCnt);
+
+ for (int key = 0; key < keyCnt; key++)
+ rndCache.apply().put(key, key);
+
+ assertTrue("Failed to wait for event. Left events: " + lsnr.latch.getCount(),
+ lsnr.latch.await(10, SECONDS));
+
+ for (int srv = 0; srv < CLIENT_ID - 1; srv++)
+ stopGrid(srv);
+ }
+
+ cur.close();
}
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/26408c4b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/IgniteCacheContinuousQueryClientTxReconnectTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/IgniteCacheContinuousQueryClientTxReconnectTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/IgniteCacheContinuousQueryClientTxReconnectTest.java
new file mode 100644
index 0000000..a10ebc9
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/IgniteCacheContinuousQueryClientTxReconnectTest.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.query.continuous;
+
+import org.apache.ignite.cache.CacheAtomicityMode;
+
+import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
+
+/**
+ *
+ */
+public class IgniteCacheContinuousQueryClientTxReconnectTest extends IgniteCacheContinuousQueryClientReconnectTest {
+ /** {@inheritDoc} */
+ @Override protected CacheAtomicityMode atomicMode() {
+ return TRANSACTIONAL;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/26408c4b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java
index 6cb1a52..91dc388 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java
@@ -65,6 +65,10 @@ import org.apache.ignite.internal.processors.cache.local.IgniteCacheLocalAtomicQ
import org.apache.ignite.internal.processors.cache.local.IgniteCacheLocalFieldsQuerySelfTest;
import org.apache.ignite.internal.processors.cache.local.IgniteCacheLocalQuerySelfTest;
import org.apache.ignite.internal.processors.cache.query.GridCacheSwapScanQuerySelfTest;
+import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryFailoverAtomicPrimaryWriteOrderTest;
+import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryFailoverAtomicReplicatedTest;
+import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryFailoverTxReplicatedTest;
+import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryFailoverTxTest;
import org.apache.ignite.internal.processors.cache.query.continuous.GridCacheContinuousQueryAtomicNearEnabledSelfTest;
import org.apache.ignite.internal.processors.cache.query.continuous.GridCacheContinuousQueryAtomicP2PDisabledSelfTest;
import org.apache.ignite.internal.processors.cache.query.continuous.GridCacheContinuousQueryAtomicSelfTest;
@@ -77,7 +81,9 @@ import org.apache.ignite.internal.processors.cache.query.continuous.GridCacheCon
import org.apache.ignite.internal.processors.cache.query.continuous.GridCacheContinuousQueryReplicatedOneNodeSelfTest;
import org.apache.ignite.internal.processors.cache.query.continuous.GridCacheContinuousQueryReplicatedP2PDisabledSelfTest;
import org.apache.ignite.internal.processors.cache.query.continuous.GridCacheContinuousQueryReplicatedSelfTest;
+import org.apache.ignite.internal.processors.cache.query.continuous.IgniteCacheContinuousQueryClientReconnectTest;
import org.apache.ignite.internal.processors.cache.query.continuous.IgniteCacheContinuousQueryClientTest;
+import org.apache.ignite.internal.processors.cache.query.continuous.IgniteCacheContinuousQueryClientTxReconnectTest;
import org.apache.ignite.internal.processors.cache.reducefields.GridCacheReduceFieldsQueryAtomicSelfTest;
import org.apache.ignite.internal.processors.cache.reducefields.GridCacheReduceFieldsQueryLocalSelfTest;
import org.apache.ignite.internal.processors.cache.reducefields.GridCacheReduceFieldsQueryPartitionedSelfTest;
@@ -160,8 +166,14 @@ public class IgniteCacheQuerySelfTestSuite extends TestSuite {
suite.addTestSuite(GridCacheContinuousQueryAtomicSelfTest.class);
suite.addTestSuite(GridCacheContinuousQueryAtomicNearEnabledSelfTest.class);
suite.addTestSuite(GridCacheContinuousQueryAtomicP2PDisabledSelfTest.class);
- suite.addTestSuite(IgniteCacheContinuousQueryClientTest.class);
suite.addTestSuite(GridCacheContinuousQueryReplicatedOneNodeSelfTest.class);
+ suite.addTestSuite(IgniteCacheContinuousQueryClientTest.class);
+ suite.addTestSuite(IgniteCacheContinuousQueryClientReconnectTest.class);
+ suite.addTestSuite(IgniteCacheContinuousQueryClientTxReconnectTest.class);
+ suite.addTestSuite(CacheContinuousQueryFailoverAtomicPrimaryWriteOrderTest.class);
+ suite.addTestSuite(CacheContinuousQueryFailoverAtomicReplicatedTest.class);
+ suite.addTestSuite(CacheContinuousQueryFailoverTxTest.class);
+ suite.addTestSuite(CacheContinuousQueryFailoverTxReplicatedTest.class);
// Reduce fields queries.
suite.addTestSuite(GridCacheReduceFieldsQueryLocalSelfTest.class);