You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by gv...@apache.org on 2018/11/20 12:29:03 UTC
ignite git commit: IGNITE-9828 MVCC: Notify Continuous Query manager
about missed updates during PME. This closes #5189.
Repository: ignite
Updated Branches:
refs/heads/master eb8888561 -> 74f312e0c
IGNITE-9828 MVCC: Notify Continuous Query manager about missed updates during PME. This closes #5189.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/74f312e0
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/74f312e0
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/74f312e0
Branch: refs/heads/master
Commit: 74f312e0c863824f1cefdd589a7a0d7da140a276
Parents: eb88885
Author: Igor Seliverstov <gv...@gmail.com>
Authored: Tue Nov 20 15:26:31 2018 +0300
Committer: Igor Seliverstov <gv...@gmail.com>
Committed: Tue Nov 20 15:26:31 2018 +0300
----------------------------------------------------------------------
.../processors/cache/GridCacheMapEntry.java | 12 +-
.../GridCachePartitionExchangeManager.java | 4 +-
.../cache/IgniteCacheOffheapManager.java | 4 +-
.../cache/IgniteCacheOffheapManagerImpl.java | 4 +-
.../cache/PartitionUpdateCounter.java | 28 +-
.../GridDistributedTxRemoteAdapter.java | 2 +
.../dht/GridDhtTransactionalCacheAdapter.java | 53 ++-
.../dht/GridDhtTxAbstractEnlistFuture.java | 10 +-
.../GridDhtPartitionsExchangeFuture.java | 49 +++
.../topology/GridClientPartitionTopology.java | 8 +-
.../dht/topology/GridDhtLocalPartition.java | 7 +-
.../dht/topology/GridDhtPartitionTopology.java | 9 +-
.../topology/GridDhtPartitionTopologyImpl.java | 51 ++-
.../cache/mvcc/MvccCachingManager.java | 30 +-
.../persistence/GridCacheOffheapManager.java | 5 +-
.../continuous/CacheContinuousQueryManager.java | 33 ++
.../cache/transactions/IgniteTxAdapter.java | 3 +
.../cache/transactions/IgniteTxHandler.java | 2 +-
.../continuous/GridContinuousProcessor.java | 6 +-
...tinuousQueryAsyncFailoverMvccTxSelfTest.java | 53 +++
...ContinuousQueryFailoverAbstractSelfTest.java | 90 ++++-
...usQueryFailoverMvccTxReplicatedSelfTest.java | 31 ++
...heContinuousQueryFailoverMvccTxSelfTest.java | 48 +++
.../mvcc/CacheMvccBasicContinuousQueryTest.java | 363 +++++++++++++++++++
.../IgniteCacheQuerySelfTestSuite4.java | 6 +
25 files changed, 821 insertions(+), 90 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/74f312e0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
index 9124884..96f4dcc 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
@@ -6733,12 +6733,14 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
ensureFreeSpace();
+ CacheObject val = null;
+ CacheObject oldVal = null;
+
lockEntry();
try {
checkObsolete();
- CacheObject val = null;
for (int i = 0; i < entries.size(); i++) {
GridCacheMvccEntryInfo info = (GridCacheMvccEntryInfo)entries.get(i);
@@ -6759,7 +6761,6 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
info.newMvccTxState());
}
- CacheObject oldVal = null;
// Assume list contains only previous committed row and rows changed by the current tx.
if (!entries.isEmpty()) {
@@ -6806,7 +6807,12 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
}
}
- return new GridCacheUpdateTxResult(true, logPtr);
+ GridCacheUpdateTxResult res = new GridCacheUpdateTxResult(true, logPtr);
+
+ res.newValue(val);
+ res.oldValue(oldVal);
+
+ return res;
}
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/74f312e0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
index 55c5d6c..fa63b7e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
@@ -1475,7 +1475,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
grp.affinity().similarAffinityKey());
if (sndCounters) {
- CachePartitionPartialCountersMap cntrsMap = grp.topology().localUpdateCounters(true, true);
+ CachePartitionPartialCountersMap cntrsMap = grp.topology().localUpdateCounters(true);
m.addPartitionUpdateCounters(grp.groupId(),
newCntrMap ? cntrsMap : CachePartitionPartialCountersMap.toCountersMap(cntrsMap));
@@ -1499,7 +1499,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
top.similarAffinityKey());
if (sndCounters) {
- CachePartitionPartialCountersMap cntrsMap = top.localUpdateCounters(true, true);
+ CachePartitionPartialCountersMap cntrsMap = top.localUpdateCounters(true);
m.addPartitionUpdateCounters(top.groupId(),
newCntrMap ? cntrsMap : CachePartitionPartialCountersMap.toCountersMap(cntrsMap));
http://git-wip-us.apache.org/repos/asf/ignite/blob/74f312e0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java
index d311708..2cf302f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java
@@ -1117,8 +1117,10 @@ public interface IgniteCacheOffheapManager {
/**
* Flushes pending update counters closing all possible gaps.
+ *
+ * @return Even-length array of pairs [start, end] for each gap.
*/
- public void finalizeUpdateCountres();
+ GridLongList finalizeUpdateCounters();
/**
* Preload a store into page memory.
http://git-wip-us.apache.org/repos/asf/ignite/blob/74f312e0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
index 5f467b3..08ce978 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
@@ -1580,8 +1580,8 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager
}
/** {@inheritDoc} */
- @Override public void finalizeUpdateCountres() {
- pCntr.finalizeUpdateCountres();
+ @Override public GridLongList finalizeUpdateCounters() {
+ return pCntr.finalizeUpdateCounters();
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/ignite/blob/74f312e0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/PartitionUpdateCounter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/PartitionUpdateCounter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/PartitionUpdateCounter.java
index fe44708..39d8d5f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/PartitionUpdateCounter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/PartitionUpdateCounter.java
@@ -20,6 +20,7 @@ package org.apache.ignite.internal.processors.cache;
import java.util.TreeSet;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.internal.util.GridLongList;
import org.jetbrains.annotations.NotNull;
/**
@@ -180,14 +181,31 @@ public class PartitionUpdateCounter {
/**
* Flushes pending update counters closing all possible gaps.
+ *
+ * @return Even-length array of pairs [start, end] for each gap.
*/
- public synchronized void finalizeUpdateCountres() {
- Item last = queue.pollLast();
+ public synchronized GridLongList finalizeUpdateCounters() {
+ Item item = poll();
+
+ GridLongList gaps = null;
+
+ while (item != null) {
+ if (gaps == null)
+ gaps = new GridLongList((queue.size() + 1) * 2);
+
+ long start = cntr.get() + 1;
+ long end = item.start;
- if (last != null)
- update(last.start + last.delta);
+ gaps.add(start);
+ gaps.add(end);
+
+ // Close pending ranges.
+ update(item.start + item.delta);
+
+ item = poll();
+ }
- queue.clear();
+ return gaps;
}
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/74f312e0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java
index c93e771..a915478 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java
@@ -935,6 +935,8 @@ public abstract class GridDistributedTxRemoteAdapter extends IgniteTxAdapter
catch (IgniteCheckedException | RuntimeException | Error e) {
state(UNKNOWN);
+ U.error(log, "Error during tx rollback.", e);
+
if (e instanceof IgniteCheckedException)
throw new IgniteException(e);
else if (e instanceof RuntimeException)
http://git-wip-us.apache.org/repos/asf/ignite/blob/74f312e0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java
index 60463b4..4693232 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java
@@ -1960,23 +1960,24 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach
req.subjectId(),
req.taskNameHash());
}
- catch (IgniteCheckedException | IgniteException ex) {
+ catch (Throwable e) {
GridNearTxQueryResultsEnlistResponse res = new GridNearTxQueryResultsEnlistResponse(req.cacheId(),
req.futureId(),
req.miniId(),
req.version(),
- ex);
+ e);
try {
ctx.io().send(nearNode, res, ctx.ioPolicy());
}
- catch (IgniteCheckedException e) {
- U.error(log, "Failed to send near enlist response [" +
- "txId=" + req.version() +
- ", node=" + nodeId +
- ", res=" + res + ']', e);
+ catch (IgniteCheckedException ioEx) {
+ U.error(log, "Failed to send near enlist response " +
+ "[txId=" + req.version() + ", node=" + nodeId + ", res=" + res + ']', ioEx);
}
+ if (e instanceof Error)
+ throw (Error) e;
+
return;
}
@@ -2231,26 +2232,6 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach
/**
* @param primary Primary node.
- * @param req Request.
- * @param e Error.
- */
- private void onError(UUID primary, GridDhtTxQueryEnlistRequest req, Throwable e) {
- GridDhtTxQueryEnlistResponse res = new GridDhtTxQueryEnlistResponse(ctx.cacheId(),
- req.dhtFutureId(),
- req.batchId(),
- e);
-
- try {
- ctx.io().send(primary, res, ctx.ioPolicy());
- }
- catch (IgniteCheckedException ioEx) {
- U.error(log, "Failed to send DHT enlist reply to primary node [node: " + primary + ", req=" + req +
- ']', ioEx);
- }
- }
-
- /**
- * @param primary Primary node.
* @param req Message.
* @param first Flag if this is a first request in current operation.
*/
@@ -2321,8 +2302,22 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach
req + ']', ioEx);
}
}
- catch (IgniteCheckedException e) {
- onError(primary, req, e);
+ catch (Throwable e) {
+ GridDhtTxQueryEnlistResponse res = new GridDhtTxQueryEnlistResponse(ctx.cacheId(),
+ req.dhtFutureId(),
+ req.batchId(),
+ e);
+
+ try {
+ ctx.io().send(primary, res, ctx.ioPolicy());
+ }
+ catch (IgniteCheckedException ioEx) {
+ U.error(log, "Failed to send DHT enlist reply to primary node " +
+ "[node: " + primary + ", req=" + req + ']', ioEx);
+ }
+
+ if (e instanceof Error)
+ throw (Error) e;
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/74f312e0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxAbstractEnlistFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxAbstractEnlistFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxAbstractEnlistFuture.java
index cfa8eb7..9949930 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxAbstractEnlistFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxAbstractEnlistFuture.java
@@ -629,7 +629,7 @@ public abstract class GridDhtTxAbstractEnlistFuture<T> extends GridCacheFutureAd
updRes.oldValue(), tx.local(), tx.topologyVersion(), mvccSnapshot, cctx.cacheId(), tx, null, -1);
if (op != EnlistOperation.LOCK)
- addToBatch(entry.key(), val, updRes.mvccHistory(), entry.context().cacheId());
+ addToBatch(entry.key(), val, updRes.mvccHistory(), entry.context().cacheId(), op);
}
/**
@@ -642,7 +642,7 @@ public abstract class GridDhtTxAbstractEnlistFuture<T> extends GridCacheFutureAd
* @param cacheId Cache Id.
*/
private void addToBatch(KeyCacheObject key, Message val, List<MvccLinkAwareSearchRow> hist,
- int cacheId) throws IgniteCheckedException {
+ int cacheId, EnlistOperation op) throws IgniteCheckedException {
List<ClusterNode> backups = backupNodes(key);
int part = cctx.affinity().partition(key);
@@ -680,13 +680,15 @@ public abstract class GridDhtTxAbstractEnlistFuture<T> extends GridCacheFutureAd
if (batch == null)
batches.put(node.id(), batch = new Batch(node));
- if (moving && hist0 == null) {
+ if (moving && hist0 == null && !op.isInvoke()) {
assert !F.isEmpty(hist);
hist0 = fetchHistoryInfo(key, hist);
}
- batch.add(key, moving ? hist0 : val);
+ Message m = moving && !op.isInvoke() ? hist0 : val;
+
+ batch.add(key, m);
if (batch.size() == BATCH_SIZE) {
assert batches != null;
http://git-wip-us.apache.org/repos/asf/ignite/blob/74f312e0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
index a79c95f..44fc266 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
@@ -1466,6 +1466,9 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
changeWalModeIfNeeded();
+ if (events().hasServerLeft())
+ finalizePartitionCounters();
+
cctx.exchange().exchangerBlockingSectionBegin();
try {
@@ -3465,6 +3468,17 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
}
/**
+ * Collects non local cache groups.
+ *
+ * @return Collection of non local cache groups.
+ */
+ private List<CacheGroupContext> nonLocalCacheGroups() {
+ return cctx.cache().cacheGroups().stream()
+ .filter(grp -> !grp.isLocal() && !cacheGroupStopping(grp.groupId()))
+ .collect(Collectors.toList());
+ }
+
+ /**
* Validates that partition update counters and cache sizes for all caches are consistent.
*/
private void validatePartitionsState() {
@@ -3548,6 +3562,41 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
}
/**
+ * Removes gaps in the local update counters. Gaps in update counters are possible on backup node when primary
+ * failed to send update counter deltas to backup.
+ */
+ private void finalizePartitionCounters() {
+ long time = System.currentTimeMillis();
+
+ try {
+ int parallelismLvl = cctx.kernalContext().config().getSystemThreadPoolSize();
+
+ // Reserve at least 2 threads for system operations.
+ parallelismLvl = Math.max(1, parallelismLvl - 2);
+
+ if (parallelismLvl > 1) {
+ U.doInParallel(parallelismLvl,
+ cctx.kernalContext().getSystemExecutorService(),
+ nonLocalCacheGroups(),
+ grp -> {
+ grp.topology().finalizeUpdateCounters();
+
+ return null;
+ }
+ );
+ }
+ else
+ nonLocalCacheGroups().forEach(grp -> grp.topology().finalizeUpdateCounters());
+ }
+ catch (IgniteCheckedException e) {
+ throw new IgniteException("Failed to finalize partition counters", e);
+ }
+
+ if (log.isInfoEnabled())
+ log.info("Partition counters finalization performed in " + (System.currentTimeMillis() - time) + " ms.");
+ }
+
+ /**
* @param finishState State.
* @param msg Request.
* @param nodeId Node ID.
http://git-wip-us.apache.org/repos/asf/ignite/blob/74f312e0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridClientPartitionTopology.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridClientPartitionTopology.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridClientPartitionTopology.java
index 01db508..b1d1048 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridClientPartitionTopology.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridClientPartitionTopology.java
@@ -1232,12 +1232,16 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology {
}
/** {@inheritDoc} */
- @Override public CachePartitionPartialCountersMap localUpdateCounters(boolean skipZeros,
- boolean finalizeCntrsBeforeCollecting) {
+ @Override public CachePartitionPartialCountersMap localUpdateCounters(boolean skipZeros) {
return CachePartitionPartialCountersMap.EMPTY;
}
/** {@inheritDoc} */
+ @Override public void finalizeUpdateCounters() {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
@Override public Map<Integer, Long> partitionSizes() {
return Collections.emptyMap();
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/74f312e0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtLocalPartition.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtLocalPartition.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtLocalPartition.java
index 8ee7a9d..f3516a6 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtLocalPartition.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtLocalPartition.java
@@ -53,6 +53,7 @@ import org.apache.ignite.internal.processors.cache.extras.GridCacheObsoleteEntry
import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
import org.apache.ignite.internal.processors.query.GridQueryRowCacheCleaner;
+import org.apache.ignite.internal.util.GridLongList;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
import org.apache.ignite.internal.util.lang.GridIterator;
import org.apache.ignite.internal.util.tostring.GridToStringExclude;
@@ -1362,9 +1363,11 @@ public class GridDhtLocalPartition extends GridCacheConcurrentMapImpl implements
/**
* Flushes pending update counters closing all possible gaps.
+ *
+ * @return Even-length array of pairs [start, end] for each gap.
*/
- public void finalizeUpdateCountres() {
- store.finalizeUpdateCountres();
+ public GridLongList finalizeUpdateCounters() {
+ return store.finalizeUpdateCounters();
}
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/74f312e0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtPartitionTopology.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtPartitionTopology.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtPartitionTopology.java
index 25b284e..be8a789 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtPartitionTopology.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtPartitionTopology.java
@@ -345,17 +345,20 @@ public interface GridDhtPartitionTopology {
public Collection<Integer> lostPartitions();
/**
+ * Pre-processes partition update counters before exchange.
+ */
+ void finalizeUpdateCounters();
+
+ /**
* @return Partition update counters.
*/
public CachePartitionFullCountersMap fullUpdateCounters();
/**
* @param skipZeros {@code True} for adding zero counter to map.
- * @param finalizeCntrsBeforeCollecting {@code True} indicates that partition counters should be finalized.
* @return Partition update counters.
*/
- public CachePartitionPartialCountersMap localUpdateCounters(boolean skipZeros,
- boolean finalizeCntrsBeforeCollecting);
+ public CachePartitionPartialCountersMap localUpdateCounters(boolean skipZeros);
/**
* @return Partition cache sizes.
http://git-wip-us.apache.org/repos/asf/ignite/blob/74f312e0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtPartitionTopologyImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtPartitionTopologyImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtPartitionTopologyImpl.java
index 6418dc7..a127876 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtPartitionTopologyImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtPartitionTopologyImpl.java
@@ -45,6 +45,7 @@ import org.apache.ignite.internal.processors.affinity.AffinityAssignment;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.CacheGroupContext;
import org.apache.ignite.internal.processors.cache.ExchangeDiscoveryEvents;
+import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheEntry;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTopologyFuture;
@@ -57,6 +58,7 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.Gri
import org.apache.ignite.internal.processors.cache.mvcc.MvccCoordinator;
import org.apache.ignite.internal.util.F0;
import org.apache.ignite.internal.util.GridAtomicLong;
+import org.apache.ignite.internal.util.GridLongList;
import org.apache.ignite.internal.util.GridPartitionStateMap;
import org.apache.ignite.internal.util.StripedCompositeReadWriteLock;
import org.apache.ignite.internal.util.tostring.GridToStringExclude;
@@ -2670,6 +2672,49 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
}
}
+ /**
+ * Pre-processes partition update counters before exchange.
+ */
+ @Override public void finalizeUpdateCounters() {
+ if (!grp.mvccEnabled())
+ return;
+
+ // It is need to acquire checkpoint lock before topology lock acquiring.
+ ctx.database().checkpointReadLock();
+
+ try {
+ lock.readLock().lock();
+
+ try {
+ for (int i = 0; i < locParts.length(); i++) {
+ GridDhtLocalPartition part = locParts.get(i);
+
+ if (part != null && part.state().active()) {
+ // We need to close all gaps in partition update counters sequence. We assume this finalizing is
+ // happened on exchange and hence all txs are completed. Therefore each gap in update counters
+ // sequence is a result of undelivered DhtTxFinishMessage on backup (sequences on primary nodes
+ // do not have gaps). Here we close these gaps and asynchronously notify continuous query engine
+ // about the skipped events.
+ AffinityTopologyVersion topVer = ctx.exchange().readyAffinityVersion();
+
+ GridLongList gaps = part.finalizeUpdateCounters();
+
+ if (gaps != null) {
+ for (GridCacheContext ctx0 : grp.caches())
+ ctx0.continuousQueries().closeBackupUpdateCountersGaps(ctx0, part.id(), topVer, gaps);
+ }
+ }
+ }
+ }
+ finally {
+ lock.readLock().unlock();
+ }
+ }
+ finally {
+ ctx.database().checkpointReadUnlock();
+ }
+ }
+
/** {@inheritDoc} */
@Override public CachePartitionFullCountersMap fullUpdateCounters() {
lock.readLock().lock();
@@ -2683,8 +2728,7 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
}
/** {@inheritDoc} */
- @Override public CachePartitionPartialCountersMap localUpdateCounters(boolean skipZeros,
- boolean finalizeCntrsBeforeCollecting) {
+ @Override public CachePartitionPartialCountersMap localUpdateCounters(boolean skipZeros) {
lock.readLock().lock();
try {
@@ -2705,9 +2749,6 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
if (part == null)
continue;
- if (finalizeCntrsBeforeCollecting)
- part.finalizeUpdateCountres();
-
long updCntr = part.updateCounter();
long initCntr = part.initialUpdateCounter();
http://git-wip-us.apache.org/repos/asf/ignite/blob/74f312e0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccCachingManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccCachingManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccCachingManager.java
index 259f69b..8f83b6e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccCachingManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccCachingManager.java
@@ -40,8 +40,10 @@ import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinu
import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
import org.apache.ignite.internal.processors.cache.transactions.TxCounters;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
+import org.apache.ignite.internal.util.tostring.GridToStringInclude;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.T2;
+import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.lang.IgniteUuid;
import org.jetbrains.annotations.Nullable;
@@ -146,16 +148,23 @@ public class MvccCachingManager extends GridCacheSharedManagerAdapter {
Map<KeyCacheObject, MvccTxEntry> cached = buf.getCached();
- if (F.isEmpty(cached) || !commit)
+ if (F.isEmpty(cached))
return;
TxCounters txCntrs = tx.txCounters(false);
- assert txCntrs != null;
+ assert txCntrs != null || !commit;
- Collection<PartitionUpdateCountersMessage> cntrsColl = txCntrs.updateCounters();
+ if (txCntrs == null)
+ return;
+
+ Collection<PartitionUpdateCountersMessage> cntrsColl = txCntrs.updateCounters();
- assert !F.isEmpty(cntrsColl) : cntrsColl;
+ if (F.isEmpty(cntrsColl)) {
+ assert !commit;
+
+ return;
+ }
// cacheId -> partId -> initCntr -> cntr + delta.
Map<Integer, Map<Integer, T2<AtomicLong, Long>>> cntrsMap = new HashMap<>();
@@ -219,8 +228,8 @@ public class MvccCachingManager extends GridCacheSharedManagerAdapter {
contQryMgr.onEntryUpdated(
lsnrCol,
e.key(),
- e.value(),
- e.oldValue(),
+ commit ? e.value() : null, // Force skip update counter if rolled back.
+ commit ? e.oldValue() : null, // Force skip update counter if rolled back.
false,
e.key().partition(),
tx.local(),
@@ -253,7 +262,7 @@ public class MvccCachingManager extends GridCacheSharedManagerAdapter {
* @return Map of listeners to be notified by this update.
*/
public Map<UUID, CacheContinuousQueryListener> continuousQueryListeners(GridCacheContext ctx0, @Nullable IgniteInternalTx tx, KeyCacheObject key) {
- boolean internal = key.internal() || !ctx0.userCache();
+ boolean internal = key != null && key.internal() || !ctx0.userCache();
return ctx0.continuousQueries().notifyContinuousQueries(tx) ?
ctx0.continuousQueries().updateListeners(internal, false) : null;
@@ -268,9 +277,11 @@ public class MvccCachingManager extends GridCacheSharedManagerAdapter {
private IgniteUuid lastFutId;
/** Main buffer for entries. */
+ @GridToStringInclude
private Map<KeyCacheObject, MvccTxEntry> cached = new LinkedHashMap<>();
/** Pending entries. */
+ @GridToStringInclude
private SortedMap<Integer, Map<KeyCacheObject, MvccTxEntry>> pending;
/**
@@ -337,5 +348,10 @@ public class MvccCachingManager extends GridCacheSharedManagerAdapter {
pending.clear();
}
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(EnlistBuffer.class, this);
+ }
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/74f312e0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java
index 48e86e4..f24900f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java
@@ -1722,12 +1722,11 @@ public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl imple
}
/** {@inheritDoc} */
- @Override public void finalizeUpdateCountres() {
+ @Override public GridLongList finalizeUpdateCounters() {
try {
CacheDataStore delegate0 = init0(true);
- if (delegate0 != null)
- delegate0.finalizeUpdateCountres();
+ return delegate0 != null ? delegate0.finalizeUpdateCounters() : null;
}
catch (IgniteCheckedException e) {
throw new IgniteException(e);
http://git-wip-us.apache.org/repos/asf/ignite/blob/74f312e0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
index 6bd3fc2..4c399bf 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
@@ -58,6 +58,7 @@ import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.NodeStoppingException;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.CacheObject;
+import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.GridCacheEntryEx;
import org.apache.ignite.internal.processors.cache.GridCacheManagerAdapter;
import org.apache.ignite.internal.processors.cache.IgniteCacheProxy;
@@ -67,6 +68,7 @@ import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow;
import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
import org.apache.ignite.internal.processors.continuous.GridContinuousHandler;
import org.apache.ignite.internal.processors.timeout.GridTimeoutProcessor;
+import org.apache.ignite.internal.util.GridLongList;
import org.apache.ignite.internal.util.StripedCompositeReadWriteLock;
import org.apache.ignite.internal.util.tostring.GridToStringInclude;
import org.apache.ignite.internal.util.typedef.CI2;
@@ -275,6 +277,37 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter {
}
/**
+ * For cache updates in shared cache group need notify others caches CQ listeners
+ * that generated counter should be skipped.
+ *
+ * @param cctx Cache context.
+ * @param part Partition.
+ * @param topVer Topology version.
+ * @param gaps Even-length array of pairs [start, end] for each gap.
+ */
+ @Nullable public void closeBackupUpdateCountersGaps(GridCacheContext cctx,
+ int part,
+ AffinityTopologyVersion topVer,
+ GridLongList gaps) {
+ assert gaps != null && gaps.size() % 2 == 0;
+
+ for (int i = 0; i < gaps.size() / 2; i++) {
+ long gapStart = gaps.get(i * 2);
+ long gapStop = gaps.get(i * 2 + 1);
+
+ /*
+ * No user listeners should be called by this invocation. In the common case of partitioned cache or
+ * replicated cache with non-local-only listener gaps (dummy filtered CQ events) will be added to the
+ * backup queue without passing it to any listener. In the special case of local-only listener on
+ * replicated cache there is no backup queues used at all and therefore no gaps occur - all unfiltered
+ * events are passed to listeners upon arrive.
+ */
+ for (long cntr = gapStart; cntr <= gapStop; cntr++)
+ skipUpdateEvent(lsnrs, null, part, cntr, false, topVer);
+ }
+ }
+
+ /**
* @param internal Internal entry flag (internal key or not user cache).
* @param preload Whether update happened during preloading.
* @return Registered listeners.
http://git-wip-us.apache.org/repos/asf/ignite/blob/74f312e0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
index 8b24e01..52659e3 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
@@ -273,6 +273,7 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter implement
protected ConsistentIdMapper consistentIdMapper;
/** Mvcc tx update snapshot. */
+ @GridToStringInclude
protected volatile MvccSnapshot mvccSnapshot;
/** Rollback finish future. */
@@ -280,6 +281,8 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter implement
private volatile IgniteInternalFuture rollbackFut;
/** */
+ @SuppressWarnings("unused")
+ @GridToStringExclude
private volatile TxCounters txCounters;
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/74f312e0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
index d715362..3685f7a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
@@ -1895,7 +1895,7 @@ public class IgniteTxHandler {
invokeArgs = invokeVal.invokeArgs();
}
- assert entryProc != null || !op.isInvoke();
+ assert entryProc != null || !op.isInvoke() : "entryProc=" + entryProc + ", op=" + op;
GridDhtCacheEntry entry = dht.entryExx(key, tx.topologyVersion());
http://git-wip-us.apache.org/repos/asf/ignite/blob/74f312e0/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
index fbfd99b..4569f65 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
@@ -1423,7 +1423,7 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
if (cache != null && !cache.isLocal() && cache.context().userCache())
req.addUpdateCounters(ctx.localNodeId(),
- toCountersMap(cache.context().topology().localUpdateCounters(false, false)));
+ toCountersMap(cache.context().topology().localUpdateCounters(false)));
}
}
@@ -1564,7 +1564,7 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
if (cache != null && !cache.isLocal() && cache.context().userCache()) {
CachePartitionPartialCountersMap cntrsMap =
- cache.context().topology().localUpdateCounters(false, false);
+ cache.context().topology().localUpdateCounters(false);
cntrs = U.marshal(marsh, cntrsMap);
}
@@ -2504,7 +2504,7 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
if (cctx != null && cntrsPerNode != null && !cctx.isLocal() && cctx.affinityNode())
cntrsPerNode.put(ctx.localNodeId(),
- toCountersMap(cctx.topology().localUpdateCounters(false, false)));
+ toCountersMap(cctx.topology().localUpdateCounters(false)));
routine.handler().updateCounters(topVer, cntrsPerNode, cntrs);
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/74f312e0/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryAsyncFailoverMvccTxSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryAsyncFailoverMvccTxSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryAsyncFailoverMvccTxSelfTest.java
new file mode 100644
index 0000000..334d219
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryAsyncFailoverMvccTxSelfTest.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ignite.internal.processors.cache.query.continuous;
+
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.cache.CacheMode;
+
+import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL_SNAPSHOT;
+import static org.apache.ignite.cache.CacheMode.PARTITIONED;
+
+/**
+ *
+ */
+public class CacheContinuousQueryAsyncFailoverMvccTxSelfTest extends CacheContinuousQueryFailoverAbstractSelfTest {
+ /** {@inheritDoc} */
+ @Override protected CacheMode cacheMode() {
+ return PARTITIONED;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected CacheAtomicityMode atomicityMode() {
+ return TRANSACTIONAL_SNAPSHOT;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected boolean asyncCallback() {
+ return true;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void testBackupQueueEvict() throws Exception {
+ fail("https://issues.apache.org/jira/browse/IGNITE-7311");
+ }
+
+ /** {@inheritDoc} */
+ @Override public void testLeftPrimaryAndBackupNodes() throws Exception {
+ fail("https://issues.apache.org/jira/browse/IGNITE-10047");
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/74f312e0/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractSelfTest.java
index 0fef7b2..e7c2261 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractSelfTest.java
@@ -74,8 +74,8 @@ import org.apache.ignite.internal.IgniteInterruptedCheckedException;
import org.apache.ignite.internal.IgniteKernal;
import org.apache.ignite.internal.managers.communication.GridIoMessage;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
-import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionTopology;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.CachePartitionPartialCountersMap;
+import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionTopology;
import org.apache.ignite.internal.processors.continuous.GridContinuousHandler;
import org.apache.ignite.internal.processors.continuous.GridContinuousMessage;
import org.apache.ignite.internal.processors.continuous.GridContinuousProcessor;
@@ -87,6 +87,7 @@ import org.apache.ignite.internal.util.typedef.PA;
import org.apache.ignite.internal.util.typedef.PAX;
import org.apache.ignite.internal.util.typedef.T2;
import org.apache.ignite.internal.util.typedef.T3;
+import org.apache.ignite.internal.util.typedef.X;
import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteAsyncCallback;
@@ -104,6 +105,7 @@ import org.apache.ignite.spi.eventstorage.memory.MemoryEventStorageSpi;
import org.apache.ignite.testframework.GridTestUtils;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import org.apache.ignite.transactions.Transaction;
+import org.apache.ignite.transactions.TransactionRollbackException;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static java.util.concurrent.TimeUnit.MINUTES;
@@ -111,6 +113,8 @@ import static java.util.concurrent.TimeUnit.SECONDS;
import static org.apache.ignite.cache.CacheMode.REPLICATED;
import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
import static org.apache.ignite.testframework.GridTestUtils.waitForCondition;
+import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC;
+import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ;
/**
*
@@ -521,7 +525,7 @@ public abstract class CacheContinuousQueryFailoverAbstractSelfTest extends GridC
Affinity<Object> aff = grid(i).affinity(DEFAULT_CACHE_NAME);
CachePartitionPartialCountersMap act = grid(i).cachex(DEFAULT_CACHE_NAME).context().topology()
- .localUpdateCounters(false, false);
+ .localUpdateCounters(false);
for (Map.Entry<Integer, Long> e : updCntrs.entrySet()) {
if (aff.mapPartitionToPrimaryAndBackups(e.getKey()).contains(grid(i).localNode())) {
@@ -760,6 +764,8 @@ public abstract class CacheContinuousQueryFailoverAbstractSelfTest extends GridC
}
}, 5000L);
+ awaitPartitionMapExchange();
+
for (; keyIter < keys.size(); keyIter++) {
int key = keys.get(keyIter);
@@ -784,7 +790,18 @@ public abstract class CacheContinuousQueryFailoverAbstractSelfTest extends GridC
expEvts.add(new T3<>((Object)key, (Object)val, (Object)key));
}
- clnCache.put(key, val);
+ boolean updated = false;
+
+ while (!updated) {
+ try {
+ clnCache.put(key, val);
+
+ updated = true;
+ }
+ catch (Exception ignore) {
+ assertEquals(CacheAtomicityMode.TRANSACTIONAL_SNAPSHOT, atomicityMode());
+ }
+ }
filtered = !filtered;
}
@@ -977,8 +994,8 @@ public abstract class CacheContinuousQueryFailoverAbstractSelfTest extends GridC
T2<Object, Object> t = updates.get(key);
if (updateFromClient) {
- if (atomicityMode() == CacheAtomicityMode.TRANSACTIONAL) {
- try (Transaction tx = qryClient.transactions().txStart()) {
+ if (atomicityMode() != CacheAtomicityMode.ATOMIC) {
+ try (Transaction tx = qryClient.transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) {
qryClientCache.put(key, key);
tx.commit();
@@ -993,8 +1010,8 @@ public abstract class CacheContinuousQueryFailoverAbstractSelfTest extends GridC
qryClientCache.put(key, key);
}
else {
- if (atomicityMode() == CacheAtomicityMode.TRANSACTIONAL) {
- try (Transaction tx = ignite.transactions().txStart()) {
+ if (atomicityMode() != CacheAtomicityMode.ATOMIC) {
+ try (Transaction tx = ignite.transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) {
cache.put(key, key);
tx.commit();
@@ -1755,18 +1772,30 @@ public abstract class CacheContinuousQueryFailoverAbstractSelfTest extends GridC
if (filtered)
val = -val;
- if (processorPut && prevVal != null) {
- qryClnCache.invoke(key, new CacheEntryProcessor<Object, Object, Void>() {
- @Override public Void process(MutableEntry<Object, Object> entry,
- Object... arguments) throws EntryProcessorException {
- entry.setValue(arguments[0]);
+ boolean updated = false;
+
+ while (!updated) {
+ try {
+ if (processorPut && prevVal != null) {
+ qryClnCache.invoke(key, new CacheEntryProcessor<Object, Object, Void>() {
+ @Override public Void process(MutableEntry<Object, Object> entry,
+ Object... arguments) throws EntryProcessorException {
+ entry.setValue(arguments[0]);
- return null;
+ return null;
+ }
+ }, val);
}
- }, val);
+ else
+ qryClnCache.put(key, val);
+
+ updated = true;
+ }
+ catch (CacheException e) {
+ assertTrue(X.hasCause(e, TransactionRollbackException.class));
+ assertSame(atomicityMode(), CacheAtomicityMode.TRANSACTIONAL_SNAPSHOT);
+ }
}
- else
- qryClnCache.put(key, val);
processorPut = !processorPut;
@@ -2020,7 +2049,20 @@ public abstract class CacheContinuousQueryFailoverAbstractSelfTest extends GridC
Integer val = valCntr.incrementAndGet();
- Integer prevVal = (Integer)qryClnCache.getAndPut(key, val);
+ Integer prevVal = null;
+
+ boolean updated = false;
+
+ while (!updated) {
+ try {
+ prevVal = (Integer)qryClnCache.getAndPut(key, val);
+
+ updated = true;
+ }
+ catch (CacheException e) {
+ assertSame(atomicityMode(), CacheAtomicityMode.TRANSACTIONAL_SNAPSHOT);
+ }
+ }
expEvts.get(threadId).add(new T3<>((Object)key, (Object)val, (Object)prevVal));
@@ -2114,7 +2156,19 @@ public abstract class CacheContinuousQueryFailoverAbstractSelfTest extends GridC
@Override public Object call() throws Exception {
Integer val0 = val.getAndIncrement();
- cache.put(key, val0);
+ boolean updated = false;
+
+ while (!updated) {
+ try {
+ cache.put(key, val0);
+
+ updated = true;
+ }
+ catch (CacheException e) {
+ assertTrue(X.hasCause(e, TransactionRollbackException.class));
+ assertSame(atomicityMode(), CacheAtomicityMode.TRANSACTIONAL_SNAPSHOT);
+ }
+ }
return null;
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/74f312e0/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverMvccTxReplicatedSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverMvccTxReplicatedSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverMvccTxReplicatedSelfTest.java
new file mode 100644
index 0000000..2576d23
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverMvccTxReplicatedSelfTest.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ignite.internal.processors.cache.query.continuous;
+
+import org.apache.ignite.cache.CacheMode;
+
+import static org.apache.ignite.cache.CacheMode.REPLICATED;
+
+/**
+ *
+ */
+public class CacheContinuousQueryFailoverMvccTxReplicatedSelfTest extends CacheContinuousQueryFailoverMvccTxSelfTest {
+ /** {@inheritDoc} */
+ @Override protected CacheMode cacheMode() {
+ return REPLICATED;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/74f312e0/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverMvccTxSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverMvccTxSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverMvccTxSelfTest.java
new file mode 100644
index 0000000..76d1689
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverMvccTxSelfTest.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ignite.internal.processors.cache.query.continuous;
+
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.cache.CacheMode;
+
+import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL_SNAPSHOT;
+import static org.apache.ignite.cache.CacheMode.PARTITIONED;
+
+/**
+ *
+ */
+public class CacheContinuousQueryFailoverMvccTxSelfTest extends CacheContinuousQueryFailoverAbstractSelfTest {
+ /** {@inheritDoc} */
+ @Override protected CacheMode cacheMode() {
+ return PARTITIONED;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected CacheAtomicityMode atomicityMode() {
+ return TRANSACTIONAL_SNAPSHOT;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void testBackupQueueEvict() throws Exception {
+ fail("https://issues.apache.org/jira/browse/IGNITE-7311");
+ }
+
+ /** {@inheritDoc} */
+ @Override public void testLeftPrimaryAndBackupNodes() throws Exception {
+ fail("https://issues.apache.org/jira/browse/IGNITE-10047");
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/74f312e0/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccBasicContinuousQueryTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccBasicContinuousQueryTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccBasicContinuousQueryTest.java
index ed97b1b..e08341a 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccBasicContinuousQueryTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccBasicContinuousQueryTest.java
@@ -17,11 +17,14 @@
package org.apache.ignite.internal.processors.cache.mvcc;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicInteger;
import javax.cache.Cache;
import javax.cache.CacheException;
import javax.cache.event.CacheEntryEvent;
@@ -29,23 +32,35 @@ import javax.cache.event.CacheEntryUpdatedListener;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.cache.CacheMode;
+import org.apache.ignite.cache.affinity.Affinity;
import org.apache.ignite.cache.query.ContinuousQuery;
import org.apache.ignite.cache.query.QueryCursor;
import org.apache.ignite.cache.query.SqlFieldsQuery;
+import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.TestRecordingCommunicationSpi;
+import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxPrepareRequest;
+import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtLocalPartition;
import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryManager;
+import org.apache.ignite.internal.processors.continuous.GridContinuousMessage;
import org.apache.ignite.internal.processors.continuous.GridContinuousProcessor;
+import org.apache.ignite.internal.util.lang.GridAbsPredicate;
import org.apache.ignite.internal.util.typedef.G;
import org.apache.ignite.internal.util.typedef.PA;
import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteBiPredicate;
+import org.apache.ignite.plugin.extensions.communication.Message;
import org.apache.ignite.testframework.GridTestUtils;
import org.apache.ignite.transactions.Transaction;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static java.util.concurrent.TimeUnit.SECONDS;
import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
import static org.apache.ignite.internal.processors.cache.mvcc.MvccCachingManager.TX_SIZE_THRESHOLD;
import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC;
import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ;
+import static org.apache.ignite.transactions.TransactionState.PREPARING;
+import static org.apache.ignite.transactions.TransactionState.ROLLED_BACK;
/**
* Basic continuous queries test with enabled mvcc.
@@ -227,4 +242,352 @@ public class CacheMvccBasicContinuousQueryTest extends CacheMvccAbstractTest {
}
}, CacheException.class, "Failed to run update. Transaction is too large. Consider reducing transaction size");
}
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testUpdateCountersGapClosedSimplePartitioned() throws Exception {
+ checkUpdateCountersGapIsProcessedSimple(CacheMode.PARTITIONED);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testUpdateCountersGapClosedSimpleReplicated() throws Exception {
+ checkUpdateCountersGapIsProcessedSimple(CacheMode.REPLICATED);
+ }
+
+ /**
+ * @throws Exception if failed.
+ */
+ private void checkUpdateCountersGapIsProcessedSimple(CacheMode cacheMode) throws Exception {
+ testSpi = true;
+
+ int srvCnt = 4;
+
+ startGridsMultiThreaded(srvCnt);
+
+ client = true;
+
+ IgniteEx nearNode = startGrid(srvCnt);
+
+ IgniteCache<Object, Object> cache = nearNode.createCache(
+ cacheConfiguration(cacheMode, FULL_SYNC, srvCnt - 1, srvCnt)
+ .setIndexedTypes(Integer.class, Integer.class));
+
+ IgniteEx primary = grid(0);
+
+ List<Integer> keys = primaryKeys(primary.cache(DEFAULT_CACHE_NAME), 3);
+
+ ContinuousQuery<Integer, Integer> qry = new ContinuousQuery<>();
+
+ List<CacheEntryEvent> arrivedEvts = new ArrayList<>();
+
+ CountDownLatch latch = new CountDownLatch(2);
+
+ qry.setLocalListener(new CacheEntryUpdatedListener<Integer, Integer>() {
+ @Override public void onUpdated(Iterable<CacheEntryEvent<? extends Integer, ? extends Integer>> evts) {
+ for (CacheEntryEvent e : evts) {
+ arrivedEvts.add(e);
+
+ latch.countDown();
+ }
+ }
+ });
+
+ QueryCursor<Cache.Entry<Integer, Integer>> cur = nearNode.cache(DEFAULT_CACHE_NAME).query(qry);
+
+ // Initial value.
+ cache.query(new SqlFieldsQuery("insert into Integer(_key, _val) values(?, 42)").setArgs(keys.get(0))).getAll();
+
+ Transaction txA = nearNode.transactions().txStart(PESSIMISTIC, REPEATABLE_READ);
+
+ // prevent first transaction prepare on backups
+ TestRecordingCommunicationSpi spi = TestRecordingCommunicationSpi.spi(primary);
+
+ spi.blockMessages(new IgniteBiPredicate<ClusterNode, Message>() {
+ private final AtomicInteger limiter = new AtomicInteger();
+
+ @Override public boolean apply(ClusterNode node, Message msg) {
+ if (msg instanceof GridDhtTxPrepareRequest)
+ return limiter.getAndIncrement() < srvCnt - 1;
+
+ if (msg instanceof GridContinuousMessage)
+ return true;
+
+ return false;
+ }
+ });
+
+ cache.query(new SqlFieldsQuery("insert into Integer(_key, _val) values(?, 42)").setArgs(keys.get(1))).getAll();
+
+ txA.commitAsync();
+
+ GridTestUtils.waitForCondition(new GridAbsPredicate() {
+ @Override public boolean apply() {
+ return nearNode.context().cache().context().tm().activeTransactions().stream().allMatch(tx -> tx.state() == PREPARING);
+ }
+ }, 3_000);
+
+ GridTestUtils.runAsync(() -> {
+ try (Transaction txB = nearNode.transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) {
+ cache.query(new SqlFieldsQuery("insert into Integer(_key, _val) values(?, 42)").setArgs(keys.get(2)));
+
+ txB.commit();
+ }
+ }).get();
+
+ long primaryUpdCntr = getUpdateCounter(primary, keys.get(0));
+
+ assertEquals(3, primaryUpdCntr); // There were three updates.
+
+ // drop primary
+ stopGrid(primary.name());
+
+ // Wait all txs are rolled back.
+ GridTestUtils.waitForCondition(new GridAbsPredicate() {
+ @Override public boolean apply() {
+ boolean allRolledBack = true;
+
+ for (int i = 1; i < srvCnt; i++) {
+ boolean rolledBack = grid(i).context().cache().context().tm().activeTransactions().stream().allMatch(tx -> tx.state() == ROLLED_BACK);
+
+ allRolledBack &= rolledBack;
+ }
+
+ return allRolledBack;
+ }
+ }, 3_000);
+
+ for (int i = 1; i < srvCnt; i++) {
+ IgniteCache backupCache = grid(i).cache(DEFAULT_CACHE_NAME);
+
+ int size = backupCache.query(new SqlFieldsQuery("select * from Integer")).getAll().size();
+
+ long backupCntr = getUpdateCounter(grid(i), keys.get(0));
+
+ assertEquals(2, size);
+ assertEquals(primaryUpdCntr, backupCntr);
+ }
+
+ assertTrue(latch.await(3, SECONDS));
+
+ assertEquals(2, arrivedEvts.size());
+ assertEquals(keys.get(0), arrivedEvts.get(0).getKey());
+ assertEquals(keys.get(2), arrivedEvts.get(1).getKey());
+
+ cur.close();
+ nearNode.close();
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testUpdateCountersGapClosedPartitioned() throws Exception {
+ checkUpdateCountersGapsClosed(CacheMode.PARTITIONED);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testUpdateCountersGapClosedReplicated() throws Exception {
+ checkUpdateCountersGapsClosed(CacheMode.REPLICATED);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ private void checkUpdateCountersGapsClosed(CacheMode cacheMode) throws Exception {
+ testSpi = true;
+
+ int srvCnt = 4;
+
+ startGridsMultiThreaded(srvCnt);
+
+ IgniteEx nearNode = grid(srvCnt - 1);
+
+ IgniteCache<Object, Object> cache = nearNode.createCache(
+ cacheConfiguration(cacheMode, FULL_SYNC, srvCnt - 1, srvCnt)
+ .setIndexedTypes(Integer.class, Integer.class));
+
+ IgniteEx primary = grid(0);
+
+ Affinity<Object> aff = nearNode.affinity(cache.getName());
+
+ int[] nearBackupParts = aff.backupPartitions(nearNode.localNode());
+
+ int[] primaryParts = aff.primaryPartitions(primary.localNode());
+
+ Collection<Integer> nearSet = new HashSet<>();
+
+ for (int part : nearBackupParts)
+ nearSet.add(part);
+
+ Collection<Integer> primarySet = new HashSet<>();
+
+ for (int part : primaryParts)
+ primarySet.add(part);
+
+ // We need backup partitions on the near node.
+ nearSet.retainAll(primarySet);
+
+ List<Integer> keys = singlePartKeys(primary.cache(DEFAULT_CACHE_NAME), 20, nearSet.iterator().next());
+
+ int range = 3;
+
+ ContinuousQuery<Integer, Integer> qry = new ContinuousQuery<>();
+
+ List<CacheEntryEvent> arrivedEvts = new ArrayList<>();
+
+ CountDownLatch latch = new CountDownLatch(range * 2);
+
+ qry.setLocalListener(new CacheEntryUpdatedListener<Integer, Integer>() {
+ @Override public void onUpdated(Iterable<CacheEntryEvent<? extends Integer, ? extends Integer>> evts) {
+ for (CacheEntryEvent e : evts) {
+ arrivedEvts.add(e);
+
+ latch.countDown();
+ }
+ }
+ });
+
+ QueryCursor<Cache.Entry<Integer, Integer>> cur = nearNode.cache(DEFAULT_CACHE_NAME).query(qry);
+
+ // prevent first transaction prepare on backups
+ TestRecordingCommunicationSpi spi = TestRecordingCommunicationSpi.spi(primary);
+
+ spi.blockMessages(new IgniteBiPredicate<ClusterNode, Message>() {
+ private final AtomicInteger limiter = new AtomicInteger();
+
+ @Override public boolean apply(ClusterNode node, Message msg) {
+ if (msg instanceof GridDhtTxPrepareRequest)
+ return limiter.getAndIncrement() < srvCnt - 1;
+
+ return false;
+ }
+ });
+
+ Transaction txA = primary.transactions().txStart(PESSIMISTIC, REPEATABLE_READ);
+
+ for (int i = 0; i < range; i++)
+ primary.cache(DEFAULT_CACHE_NAME).put(keys.get(i), 2);
+
+ txA.commitAsync();
+
+ GridTestUtils.runAsync(() -> {
+ try (Transaction tx = primary.transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) {
+ for (int i = range; i < range * 2; i++)
+ primary.cache(DEFAULT_CACHE_NAME).put(keys.get(i), 1);
+
+ tx.commit();
+ }
+ }).get();
+
+ GridTestUtils.waitForCondition(new GridAbsPredicate() {
+ @Override public boolean apply() {
+ return primary.context().cache().context().tm().activeTransactions().stream().allMatch(tx -> tx.state() == PREPARING);
+ }
+ }, 3_000);
+
+ GridTestUtils.runAsync(() -> {
+ try (Transaction txB = primary.transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) {
+ for (int i = range * 2; i < range * 3; i++)
+ primary.cache(DEFAULT_CACHE_NAME).put(keys.get(i), 3);
+
+ txB.commit();
+ }
+ }).get();
+
+ long primaryUpdCntr = getUpdateCounter(primary, keys.get(0));
+
+ assertEquals(range * 3, primaryUpdCntr);
+
+ // drop primary
+ stopGrid(primary.name());
+
+ // Wait all txs are rolled back.
+ GridTestUtils.waitForCondition(new GridAbsPredicate() {
+ @Override public boolean apply() {
+ boolean allRolledBack = true;
+
+ for (int i = 1; i < srvCnt; i++) {
+ boolean rolledBack = grid(i).context().cache().context().tm().activeTransactions().stream().allMatch(tx -> tx.state() == ROLLED_BACK);
+
+ allRolledBack &= rolledBack;
+ }
+
+ return allRolledBack;
+ }
+ }, 3_000);
+
+ for (int i = 1; i < srvCnt; i++) {
+ IgniteCache backupCache = grid(i).cache(DEFAULT_CACHE_NAME);
+
+ int size = backupCache.query(new SqlFieldsQuery("select * from Integer")).getAll().size();
+
+ long backupCntr = getUpdateCounter(grid(i), keys.get(0));
+
+ assertEquals(range * 2, size);
+ assertEquals(primaryUpdCntr, backupCntr);
+ }
+
+ assertTrue(latch.await(5, SECONDS));
+
+ assertEquals(range * 2, arrivedEvts.size());
+
+ cur.close();
+ nearNode.close();
+ }
+
+ /**
+ * @param primaryCache Cache.
+ * @param size Number of keys.
+ * @return Keys belong to a given part.
+ * @throws Exception If failed.
+ */
+ private List<Integer> singlePartKeys(IgniteCache<Object, Object> primaryCache, int size, int part) throws Exception {
+ Ignite ignite = primaryCache.unwrap(Ignite.class);
+
+ List<Integer> res = new ArrayList<>();
+
+ final Affinity<Object> aff = ignite.affinity(primaryCache.getName());
+
+ final ClusterNode node = ignite.cluster().localNode();
+
+ assertTrue(GridTestUtils.waitForCondition(new GridAbsPredicate() {
+ @Override public boolean apply() {
+ return aff.primaryPartitions(node).length > 0;
+ }
+ }, 5000));
+
+ int cnt = 0;
+
+ for (int key = 0; key < aff.partitions() * size * 10; key++) {
+ if (aff.partition(key) == part) {
+ res.add(key);
+
+ if (++cnt == size)
+ break;
+ }
+ }
+
+ assertEquals(size, res.size());
+
+ return res;
+ }
+
+ /**
+ * @param node Node.
+ * @param key Key.
+ * @return Extracts update counter of partition which key belongs to.
+ */
+ private long getUpdateCounter(IgniteEx node, Integer key) {
+ int partId = node.cachex(DEFAULT_CACHE_NAME).context().affinity().partition(key);
+
+ GridDhtLocalPartition part = node.cachex(DEFAULT_CACHE_NAME).context().dht().topology().localPartition(partId);
+
+ assert part != null;
+
+ return part.updateCounter();
+ }
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/74f312e0/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite4.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite4.java b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite4.java
index 2aa3419..147eb47 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite4.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite4.java
@@ -19,10 +19,13 @@ package org.apache.ignite.testsuites;
import junit.framework.TestSuite;
import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryAsyncFailoverAtomicSelfTest;
+import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryAsyncFailoverMvccTxSelfTest;
import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryAsyncFailoverTxReplicatedSelfTest;
import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryAsyncFailoverTxSelfTest;
import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryFailoverAtomicReplicatedSelfTest;
import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryFailoverAtomicSelfTest;
+import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryFailoverMvccTxReplicatedSelfTest;
+import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryFailoverMvccTxSelfTest;
import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryFailoverTxReplicatedSelfTest;
import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryFailoverTxSelfTest;
@@ -42,10 +45,13 @@ public class IgniteCacheQuerySelfTestSuite4 extends TestSuite {
suite.addTestSuite(CacheContinuousQueryFailoverAtomicReplicatedSelfTest.class);
suite.addTestSuite(CacheContinuousQueryFailoverTxSelfTest.class);
suite.addTestSuite(CacheContinuousQueryFailoverTxReplicatedSelfTest.class);
+ suite.addTestSuite(CacheContinuousQueryFailoverMvccTxSelfTest.class);
+ suite.addTestSuite(CacheContinuousQueryFailoverMvccTxReplicatedSelfTest.class);
suite.addTestSuite(CacheContinuousQueryAsyncFailoverAtomicSelfTest.class);
suite.addTestSuite(CacheContinuousQueryAsyncFailoverTxReplicatedSelfTest.class);
suite.addTestSuite(CacheContinuousQueryAsyncFailoverTxSelfTest.class);
+ suite.addTestSuite(CacheContinuousQueryAsyncFailoverMvccTxSelfTest.class);
return suite;
}