You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by gv...@apache.org on 2019/02/07 12:46:16 UTC
[ignite] branch master updated: IGNITE-10755: MVCC: Fix CQ
registration in the middle of tx. This closes #5841.
This is an automated email from the ASF dual-hosted git repository.
gvvinblade pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new 937164e IGNITE-10755: MVCC: Fix CQ registration in the middle of tx. This closes #5841.
937164e is described below
commit 937164ea1a922ae32e54aa32b7356ef758599e87
Author: rkondakov <ko...@mail.ru>
AuthorDate: Thu Feb 7 15:45:49 2019 +0300
IGNITE-10755: MVCC: Fix CQ registration in the middle of tx. This closes #5841.
---
.../ignite/internal/GridEventConsumeHandler.java | 6 +
.../ignite/internal/GridMessageListenHandler.java | 6 +
.../processors/cache/CacheGroupContext.java | 73 ++++--
.../processors/cache/GridCacheContext.java | 10 +
.../processors/cache/GridCacheMapEntry.java | 4 +-
.../dht/GridDhtTxAbstractEnlistFuture.java | 2 +-
.../processors/cache/mvcc/MvccCachingManager.java | 280 +++++++++++++--------
.../continuous/CacheContinuousQueryHandler.java | 58 ++++-
.../continuous/CacheContinuousQueryListener.java | 7 +
.../continuous/CacheContinuousQueryManager.java | 38 +--
.../cache/transactions/IgniteTxHandler.java | 2 +-
.../IgniteTxImplicitSingleStateImpl.java | 12 +
.../transactions/IgniteTxRemoteStateAdapter.java | 15 +-
.../cache/transactions/IgniteTxState.java | 8 +
.../cache/transactions/IgniteTxStateImpl.java | 15 +-
.../processors/cache/transactions/TxCounters.java | 2 +-
.../continuous/GridContinuousHandler.java | 5 +
.../continuous/GridContinuousProcessor.java | 3 +-
.../eviction/paged/PageEvictionMetricTest.java | 2 +-
.../PageEvictionPagesRecyclingAndReusingTest.java | 4 +-
.../eviction/paged/PageEvictionTouchOrderTest.java | 4 +-
.../PageEvictionWithRebalanceAbstractTest.java | 2 +-
...ntinuousQueryConcurrentPartitionUpdateTest.java | 5 +-
.../CacheContinuousQueryExecuteInPrimaryTest.java | 30 ++-
24 files changed, 411 insertions(+), 182 deletions(-)
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridEventConsumeHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/GridEventConsumeHandler.java
index 58d0d9a..e15cdd0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/GridEventConsumeHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/GridEventConsumeHandler.java
@@ -22,6 +22,7 @@ import java.io.IOException;
import java.io.ObjectInput;
import java.io.ObjectOutput;
import java.util.Collection;
+import java.util.Collections;
import java.util.LinkedList;
import java.util.Map;
import java.util.Queue;
@@ -142,6 +143,11 @@ class GridEventConsumeHandler implements GridContinuousHandler {
}
/** {@inheritDoc} */
+ @Override public Map<Integer, T2<Long, Long>> updateCounters() {
+ return Collections.emptyMap();
+ }
+
+ /** {@inheritDoc} */
@Override public RegisterStatus register(final UUID nodeId, final UUID routineId, final GridKernalContext ctx)
throws IgniteCheckedException {
assert nodeId != null;
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridMessageListenHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/GridMessageListenHandler.java
index c146eca..c6a2d05 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/GridMessageListenHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/GridMessageListenHandler.java
@@ -22,6 +22,7 @@ import java.io.IOException;
import java.io.ObjectInput;
import java.io.ObjectOutput;
import java.util.Collection;
+import java.util.Collections;
import java.util.Map;
import java.util.UUID;
import org.apache.ignite.IgniteCheckedException;
@@ -132,6 +133,11 @@ public class GridMessageListenHandler implements GridContinuousHandler {
}
/** {@inheritDoc} */
+ @Override public Map<Integer, T2<Long, Long>> updateCounters() {
+ return Collections.emptyMap();
+ }
+
+ /** {@inheritDoc} */
@Override public RegisterStatus register(UUID nodeId, UUID routineId, final GridKernalContext ctx)
throws IgniteCheckedException {
ctx.io().addUserMessageListener(topic, pred);
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupContext.java
index 46de040..4cdcf3b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupContext.java
@@ -25,6 +25,8 @@ import java.util.List;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.cache.affinity.AffinityFunction;
@@ -114,7 +116,11 @@ public class CacheGroupContext {
/** We modify content under lock, by making defencive copy, field always contains unmodifiable list. */
private volatile List<GridCacheContext> caches = Collections.unmodifiableList(new ArrayList<>());
- private volatile List<GridCacheContext> contQryCaches;
+ /** List of caches with registered CQ listeners. */
+ private List<GridCacheContext> contQryCaches;
+
+ /** ReadWriteLock to control the continuous query setup - this is to prevent the race between cache update and listener setup */
+ private final ReentrantReadWriteLock listenerLock = new ReentrantReadWriteLock();
/** */
private final IgniteLogger log;
@@ -883,17 +889,16 @@ public class CacheGroupContext {
assert sharedGroup() : cacheOrGroupName();
assert cctx.group() == this : cctx.name();
assert !cctx.isLocal() : cctx.name();
+ assert listenerLock.writeLock().isHeldByCurrentThread();
- synchronized (this) {
- List<GridCacheContext> contQryCaches = this.contQryCaches;
+ List<GridCacheContext> contQryCaches = this.contQryCaches;
- if (contQryCaches == null)
- contQryCaches = new ArrayList<>();
+ if (contQryCaches == null)
+ contQryCaches = new ArrayList<>();
- contQryCaches.add(cctx);
+ contQryCaches.add(cctx);
- this.contQryCaches = contQryCaches;
- }
+ this.contQryCaches = contQryCaches;
}
/**
@@ -903,20 +908,30 @@ public class CacheGroupContext {
assert sharedGroup() : cacheOrGroupName();
assert cctx.group() == this : cctx.name();
assert !cctx.isLocal() : cctx.name();
+ assert listenerLock.isWriteLockedByCurrentThread();
- synchronized (this) {
- List<GridCacheContext> contQryCaches = this.contQryCaches;
+ List<GridCacheContext> contQryCaches = this.contQryCaches;
- if (contQryCaches == null)
- return;
+ if (contQryCaches == null)
+ return;
- contQryCaches.remove(cctx);
+ contQryCaches.remove(cctx);
- if (contQryCaches.isEmpty())
- contQryCaches = null;
+ if (contQryCaches.isEmpty())
+ contQryCaches = null;
- this.contQryCaches = contQryCaches;
- }
+ this.contQryCaches = contQryCaches;
+ }
+
+
+ /**
+ * Obtain the group listeners lock. Write lock should be held to register/unregister listeners. Read lock should be
+ * hel for CQ listeners notification.
+ *
+ * @return Lock for the CQ listeners.
+ */
+ public ReadWriteLock listenerLock() {
+ return listenerLock;
}
/**
@@ -935,7 +950,16 @@ public class CacheGroupContext {
if (isLocal())
return;
- List<GridCacheContext> contQryCaches = this.contQryCaches;
+ List<GridCacheContext> contQryCaches;
+
+ listenerLock.readLock().lock();
+
+ try {
+ contQryCaches = this.contQryCaches;
+ }
+ finally {
+ listenerLock.readLock().unlock();
+ }
if (contQryCaches == null)
return;
@@ -965,7 +989,18 @@ public class CacheGroupContext {
* @return {@code True} if there is at least one cache with registered CQ exists in this group.
*/
public boolean hasContinuousQueryCaches() {
- return !F.isEmpty(contQryCaches);
+ List<GridCacheContext> contQryCaches;
+
+ listenerLock.readLock().lock();
+
+ try {
+ contQryCaches = this.contQryCaches;
+
+ return !F.isEmpty(contQryCaches);
+ }
+ finally {
+ listenerLock.readLock().unlock();
+ }
}
/**
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
index 2bdb275..e0b5ee0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
@@ -79,6 +79,7 @@ import org.apache.ignite.internal.processors.cache.persistence.DataRegion;
import org.apache.ignite.internal.processors.cache.query.GridCacheQueryManager;
import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryManager;
import org.apache.ignite.internal.processors.cache.store.CacheStoreManager;
+import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
import org.apache.ignite.internal.processors.cache.transactions.IgniteTxEntry;
import org.apache.ignite.internal.processors.cache.transactions.IgniteTxKey;
import org.apache.ignite.internal.processors.cache.transactions.IgniteTxManager;
@@ -2321,6 +2322,15 @@ public class GridCacheContext<K, V> implements Externalizable {
near().dht().context().statisticsEnabled = statisticsEnabled;
}
+ /**
+ * @param tx Transaction.
+ * @return {@code True} if it is need to notify continuous query listeners.
+ */
+ public boolean hasContinuousQueryListeners(@Nullable IgniteInternalTx tx) {
+ return grp.sharedGroup() ? grp.hasContinuousQueryCaches() :
+ contQryMgr.notifyContinuousQueries(tx) && !F.isEmpty(contQryMgr.updateListeners(false, false));
+ }
+
/** {@inheritDoc} */
@Override public void writeExternal(ObjectOutput out) throws IOException {
U.writeString(out, igniteInstanceName());
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
index 373c4b3..4753e9f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
@@ -257,7 +257,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
this.key = key;
this.hash = key.hashCode();
this.cctx = cctx;
- this.listenerLock = cctx.continuousQueries().getListenerReadLock();
+ this.listenerLock = cctx.group().listenerLock().readLock();
ver = cctx.shared().versions().startVersion();
}
@@ -5159,7 +5159,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
valid = entry.valid(tx.topologyVersion());
- boolean needOldVal = cctx.shared().mvccCaching().continuousQueryListeners(cctx, tx, entry.key()) != null;
+ boolean needOldVal = tx.txState().useMvccCaching(cctx.cacheId());
cctx.shared().database().checkpointReadLock();
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxAbstractEnlistFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxAbstractEnlistFuture.java
index e5adfcb..bc310ec 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxAbstractEnlistFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxAbstractEnlistFuture.java
@@ -432,7 +432,7 @@ public abstract class GridDhtTxAbstractEnlistFuture<T> extends GridCacheFutureAd
assert entryProc != null || !op.isInvoke();
- boolean needOldVal = cctx.shared().mvccCaching().continuousQueryListeners(cctx, tx, key) != null;
+ boolean needOldVal = tx.txState().useMvccCaching(cctx.cacheId());
GridCacheUpdateTxResult res;
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccCachingManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccCachingManager.java
index 8f83b6e..06dfe01 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccCachingManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccCachingManager.java
@@ -33,6 +33,7 @@ import org.apache.ignite.internal.processors.cache.CacheObject;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.GridCacheSharedManagerAdapter;
import org.apache.ignite.internal.processors.cache.KeyCacheObject;
+import org.apache.ignite.internal.processors.cache.KeyCacheObjectImpl;
import org.apache.ignite.internal.processors.cache.distributed.dht.PartitionUpdateCountersMessage;
import org.apache.ignite.internal.processors.cache.mvcc.txlog.TxKey;
import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryListener;
@@ -40,6 +41,7 @@ import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinu
import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
import org.apache.ignite.internal.processors.cache.transactions.TxCounters;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
+import org.apache.ignite.internal.util.GridIntList;
import org.apache.ignite.internal.util.tostring.GridToStringInclude;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.T2;
@@ -52,15 +54,14 @@ import static org.apache.ignite.internal.processors.dr.GridDrType.DR_BACKUP;
import static org.apache.ignite.internal.processors.dr.GridDrType.DR_PRIMARY;
/**
- * Manager for caching MVCC transaction updates.
- * This updates can be used further in CQ, DR and other places.
+ * Manager for caching MVCC transaction updates. This updates can be used further in CQ, DR and other places.
*/
public class MvccCachingManager extends GridCacheSharedManagerAdapter {
/** Maximum possible transaction size when caching is enabled. */
public static final int TX_SIZE_THRESHOLD = IgniteSystemProperties.getInteger(IGNITE_MVCC_TX_SIZE_CACHING_THRESHOLD,
20_000);
- /** Cached enlist values*/
+ /** Cached enlist values. */
private final Map<GridCacheVersion, EnlistBuffer> enlistCache = new ConcurrentHashMap<>();
/** Counters map. Used for OOM prevention caused by the big transactions. */
@@ -68,6 +69,7 @@ public class MvccCachingManager extends GridCacheSharedManagerAdapter {
/**
* Adds enlisted tx entry to cache.
+ *
* @param key Key.
* @param val Value.
* @param ttl Time to live.
@@ -102,15 +104,11 @@ public class MvccCachingManager extends GridCacheSharedManagerAdapter {
if (log.isDebugEnabled()) {
log.debug("Added entry to mvcc cache: [key=" + key + ", val=" + val + ", oldVal=" + oldVal +
- ", primary=" + primary + ", mvccVer=" + mvccVer + ", cacheId=" + cacheId + ", ver=" + ver +']');
+ ", primary=" + primary + ", mvccVer=" + mvccVer + ", cacheId=" + cacheId + ", ver=" + ver + ']');
}
- GridCacheContext ctx0 = cctx.cacheContext(cacheId);
-
- // Do not cache updates if there is no DR or CQ enabled.
- if (!needDrReplicate(ctx0, key) &&
- F.isEmpty(continuousQueryListeners(ctx0, tx, key)) &&
- !ctx0.group().hasContinuousQueryCaches())
+ // Do not cache updates if there are no DR or CQ were enabled when cache was added as active for the current tx.
+ if (!tx.txState().useMvccCaching(cacheId))
return;
AtomicInteger cntr = cntrs.computeIfAbsent(new TxKey(mvccVer.coordinatorVersion(), mvccVer.counter()),
@@ -124,17 +122,16 @@ public class MvccCachingManager extends GridCacheSharedManagerAdapter {
EnlistBuffer cached = enlistCache.computeIfAbsent(ver, v -> new EnlistBuffer());
- cached.add(primary ? null : futId, primary ? -1 : batchNum, key, e);
+ cached.add(primary ? null : futId, primary ? -1 : batchNum, e);
}
/**
- *
* @param tx Transaction.
* @param commit {@code True} if commit.
*/
public void onTxFinished(IgniteInternalTx tx, boolean commit) throws IgniteCheckedException {
if (log.isDebugEnabled())
- log.debug("Transaction finished: [commit=" + commit + ", tx=" + tx + ']');
+ log.debug("Transaction finished: [commit=" + commit + ", tx=" + tx + ']');
if (tx.system() || tx.internal() || tx.mvccSnapshot() == null)
return;
@@ -143,129 +140,191 @@ public class MvccCachingManager extends GridCacheSharedManagerAdapter {
EnlistBuffer buf = enlistCache.remove(tx.xidVersion());
- if (buf == null)
- return;
+ Map<Integer, Map<KeyCacheObject, MvccTxEntry>> allCached = buf == null ? null : buf.getCached();
- Map<KeyCacheObject, MvccTxEntry> cached = buf.getCached();
+ TxCounters txCntrs = tx.txCounters(false);
- if (F.isEmpty(cached))
+ Collection<PartitionUpdateCountersMessage> cntrsColl = txCntrs == null ? null : txCntrs.updateCounters();
+
+ if (txCntrs == null || F.isEmpty(cntrsColl))
return;
- TxCounters txCntrs = tx.txCounters(false);
+ GridIntList cacheIds = tx.txState().cacheIds();
- assert txCntrs != null || !commit;
+ assert cacheIds != null;
- if (txCntrs == null)
- return;
+ for (int i = 0; i < cacheIds.size(); i++) {
+ int cacheId = cacheIds.get(i);
- Collection<PartitionUpdateCountersMessage> cntrsColl = txCntrs.updateCounters();
+ GridCacheContext ctx0 = cctx.cacheContext(cacheId);
- if (F.isEmpty(cntrsColl)) {
- assert !commit;
+ assert ctx0 != null;
- return;
- }
+ ctx0.group().listenerLock().readLock().lock();
- // cacheId -> partId -> initCntr -> cntr + delta.
- Map<Integer, Map<Integer, T2<AtomicLong, Long>>> cntrsMap = new HashMap<>();
+ try {
+ boolean hasListeners = ctx0.hasContinuousQueryListeners(tx);
+ boolean drEnabled = ctx0.isDrEnabled();
- for (PartitionUpdateCountersMessage msg : cntrsColl) {
- for (int i = 0; i < msg.size(); i++) {
- Map<Integer, T2<AtomicLong, Long>> cntrPerPart =
- cntrsMap.computeIfAbsent(msg.cacheId(), k -> new HashMap<>());
+ if (!hasListeners && !drEnabled)
+ continue; // There are no listeners to notify.
- T2 prev = cntrPerPart.put(msg.partition(i),
- new T2<>(new AtomicLong(msg.initialCounter(i)), msg.initialCounter(i) + msg.updatesCount(i)));
+ // Get cached entries for the given cache.
+ Map<KeyCacheObject, MvccTxEntry> cached = allCached == null ? null : allCached.get(cacheId);
- assert prev == null;
- }
- }
+ Map<Integer, Map<Integer, T2<AtomicLong, Long>>> cntrsMap = countersPerPartition(cntrsColl);
- // Feed CQ & DR with entries.
- for (Map.Entry<KeyCacheObject, MvccTxEntry> entry : cached.entrySet()) {
- MvccTxEntry e = entry.getValue();
+ Map<Integer, T2<AtomicLong, Long>> cntrPerCache = cntrsMap.get(cacheId);
- assert e.key().partition() != -1;
+ if (F.isEmpty(cntrPerCache))
+ continue; // No updates were made for this cache.
- Map<Integer, T2<AtomicLong, Long>> cntrPerCache = cntrsMap.get(e.cacheId());
+ boolean fakeEntries = false;
- GridCacheContext ctx0 = cctx.cacheContext(e.cacheId());
+ if (F.isEmpty(cached)) {
+ if (log.isDebugEnabled())
+ log.debug("Transaction updates were not cached fully (this can happen when listener started" +
+ " during the transaction execution). [tx=" + tx + ']');
- assert ctx0 != null && cntrPerCache != null;
+ if (hasListeners) {
+ cached = createFakeCachedEntries(cntrPerCache, tx, cacheId); // Create fake update entries if we have CQ listeners.
- T2<AtomicLong, Long> cntr = cntrPerCache.get(e.key().partition());
+ fakeEntries = true;
+ }
+ else
+ continue; // Nothing to do further if tx is not cached entirely and there are no any CQ listeners.
+ }
- long resCntr = cntr.getKey().incrementAndGet();
+ if (F.isEmpty(cached))
+ continue;
- assert resCntr <= cntr.getValue();
+ // Feed CQ & DR with entries.
+ for (Map.Entry<KeyCacheObject, MvccTxEntry> entry : cached.entrySet()) {
+ MvccTxEntry e = entry.getValue();
- e.updateCounter(resCntr);
+ assert e.key().partition() != -1;
- if (ctx0.group().sharedGroup()) {
- ctx0.group().onPartitionCounterUpdate(ctx0.cacheId(), e.key().partition(), resCntr,
- tx.topologyVersion(), tx.local());
- }
+ assert cntrPerCache != null;
+ assert e.cacheId() == cacheId;
- if (log.isDebugEnabled())
- log.debug("Process cached entry:" + e);
+ T2<AtomicLong, Long> cntr = cntrPerCache.get(e.key().partition());
- // DR
- if (ctx0.isDrEnabled()) {
- ctx0.dr().replicate(e.key(), e.value(), e.ttl(), e.expireTime(), e.version(),
- tx.local() ? DR_PRIMARY : DR_BACKUP, e.topologyVersion());
- }
+ long resCntr = cntr.getKey().incrementAndGet();
+
+ assert resCntr <= cntr.getValue();
- // CQ
- CacheContinuousQueryManager contQryMgr = ctx0.continuousQueries();
-
- if (ctx0.continuousQueries().notifyContinuousQueries(tx)) {
- contQryMgr.getListenerReadLock().lock();
-
- try {
- Map<UUID, CacheContinuousQueryListener> lsnrCol = continuousQueryListeners(ctx0, tx, e.key());
-
- if (!F.isEmpty(lsnrCol)) {
- contQryMgr.onEntryUpdated(
- lsnrCol,
- e.key(),
- commit ? e.value() : null, // Force skip update counter if rolled back.
- commit ? e.oldValue() : null, // Force skip update counter if rolled back.
- false,
- e.key().partition(),
- tx.local(),
- false,
- e.updateCounter(),
- null,
- e.topologyVersion());
+ e.updateCounter(resCntr);
+
+ if (ctx0.group().sharedGroup()) {
+ ctx0.group().onPartitionCounterUpdate(cacheId, e.key().partition(), resCntr,
+ tx.topologyVersion(), tx.local());
+ }
+
+ if (log.isDebugEnabled())
+ log.debug("Process cached entry:" + e);
+
+ // DR
+ if (ctx0.isDrEnabled() && !fakeEntries) {
+ ctx0.dr().replicate(e.key(), e.value(), e.ttl(), e.expireTime(), e.version(),
+ tx.local() ? DR_PRIMARY : DR_BACKUP, e.topologyVersion());
+ }
+
+ // CQ
+ CacheContinuousQueryManager contQryMgr = ctx0.continuousQueries();
+
+ if (ctx0.continuousQueries().notifyContinuousQueries(tx)) {
+ Map<UUID, CacheContinuousQueryListener> lsnrCol = continuousQueryListeners(ctx0, tx);
+
+ if (!F.isEmpty(lsnrCol)) {
+ contQryMgr.onEntryUpdated(
+ lsnrCol,
+ e.key(),
+ commit ? e.value() : null, // Force skip update counter if rolled back.
+ commit ? e.oldValue() : null, // Force skip update counter if rolled back.
+ false,
+ e.key().partition(),
+ tx.local(),
+ false,
+ e.updateCounter(),
+ null,
+ e.topologyVersion());
+ }
}
- }
- finally {
- contQryMgr.getListenerReadLock().unlock();
}
}
+ finally {
+ ctx0.group().listenerLock().readLock().unlock();
+ }
}
}
/**
- * @param ctx0 Cache context.
- * @param key Key.
- * @return {@code True} if need to replicate this value.
+ * Calculates counters updates per cache and partition: cacheId -> partId -> initCntr -> cntr + delta.
+ *
+ * @param cntrsColl Counters collection.
+ * @return Counters updates per cache and partition.
*/
- public boolean needDrReplicate(GridCacheContext ctx0, KeyCacheObject key) {
- return ctx0.isDrEnabled() && !key.internal();
+ private Map<Integer, Map<Integer, T2<AtomicLong, Long>>> countersPerPartition(
+ Collection<PartitionUpdateCountersMessage> cntrsColl) {
+ //
+ Map<Integer, Map<Integer, T2<AtomicLong, Long>>> cntrsMap = new HashMap<>();
+
+ for (PartitionUpdateCountersMessage msg : cntrsColl) {
+ for (int i = 0; i < msg.size(); i++) {
+ Map<Integer, T2<AtomicLong, Long>> cntrPerPart =
+ cntrsMap.computeIfAbsent(msg.cacheId(), k -> new HashMap<>());
+
+ T2 prev = cntrPerPart.put(msg.partition(i),
+ new T2<>(new AtomicLong(msg.initialCounter(i)), msg.initialCounter(i) + msg.updatesCount(i)));
+
+ assert prev == null;
+ }
+ }
+
+ return cntrsMap;
+ }
+
+ /**
+ * If transaction was not cached entirely (if listener was set during tx execution), we should feed the CQ engine
+ * with a fake entries prepared by this method.
+ *
+ * @param cntrPerCache Update counters deltas made by transaction.
+ * @param tx Transaction.
+ * @param cacheId Cache id.
+ * @return Fake entries for each tx update.
+ */
+ private Map<KeyCacheObject, MvccTxEntry> createFakeCachedEntries(Map<Integer, T2<AtomicLong, Long>> cntrPerCache,
+ IgniteInternalTx tx, int cacheId) {
+ Map<KeyCacheObject, MvccTxEntry> fakeCached = new HashMap<>();
+
+ for (Map.Entry<Integer, T2<AtomicLong, Long>> e : cntrPerCache.entrySet()) {
+ int part = e.getKey();
+
+ long startCntr = e.getValue().get1().get(); // Init update counter.
+ long endCntr = e.getValue().get1().get() + e.getValue().get2(); // Init update counter + delta.
+
+ for (long i = startCntr; i < endCntr; i++) {
+ KeyCacheObject fakeKey = new KeyCacheObjectImpl("", null, part);
+
+ MvccTxEntry fakeEntry = new MvccTxEntry(fakeKey, null, 0, 0, tx.xidVersion(), null,
+ tx.local(), tx.topologyVersion(), tx.mvccSnapshot(), cacheId);
+
+ fakeCached.put(fakeKey, fakeEntry);
+ }
+ }
+
+ return fakeCached;
}
/**
* @param ctx0 Cache context.
* @param tx Transaction.
- * @param key Key.
* @return Map of listeners to be notified by this update.
*/
- public Map<UUID, CacheContinuousQueryListener> continuousQueryListeners(GridCacheContext ctx0, @Nullable IgniteInternalTx tx, KeyCacheObject key) {
- boolean internal = key != null && key.internal() || !ctx0.userCache();
-
+ public Map<UUID, CacheContinuousQueryListener> continuousQueryListeners(GridCacheContext ctx0,
+ @Nullable IgniteInternalTx tx) {
return ctx0.continuousQueries().notifyContinuousQueries(tx) ?
- ctx0.continuousQueries().updateListeners(internal, false) : null;
+ ctx0.continuousQueries().updateListeners(!ctx0.userCache(), false) : null;
}
/**
@@ -276,11 +335,11 @@ public class MvccCachingManager extends GridCacheSharedManagerAdapter {
/** Last DHT future id. */
private IgniteUuid lastFutId;
- /** Main buffer for entries. */
+ /** Main buffer for entries. CacheId -> entriesMap. */
@GridToStringInclude
- private Map<KeyCacheObject, MvccTxEntry> cached = new LinkedHashMap<>();
+ private Map<Integer, Map<KeyCacheObject, MvccTxEntry>> cached = new TreeMap<>();
- /** Pending entries. */
+ /** Pending entries. BatchId -> entriesMap. */
@GridToStringInclude
private SortedMap<Integer, Map<KeyCacheObject, MvccTxEntry>> pending;
@@ -289,10 +348,11 @@ public class MvccCachingManager extends GridCacheSharedManagerAdapter {
*
* @param futId Future id.
* @param batchNum Batch number.
- * @param key Key.
* @param e Entry.
*/
- synchronized void add(IgniteUuid futId, int batchNum, KeyCacheObject key, MvccTxEntry e) {
+ synchronized void add(IgniteUuid futId, int batchNum, MvccTxEntry e) {
+ KeyCacheObject key = e.key();
+
if (batchNum >= 0) {
/*
* Assume that batches within one future may be reordered. But batches between futures cannot be
@@ -307,7 +367,7 @@ public class MvccCachingManager extends GridCacheSharedManagerAdapter {
}
if (pending == null)
- pending = new TreeMap<>() ;
+ pending = new TreeMap<>();
MvccTxEntry prev = pending.computeIfAbsent(batchNum, k -> new LinkedHashMap<>()).put(key, e);
@@ -317,8 +377,12 @@ public class MvccCachingManager extends GridCacheSharedManagerAdapter {
else { // batchNum == -1 means no reordering (e.g. this is a primary node).
assert batchNum == -1;
- MvccTxEntry prev = cached.put(key, e);
+ Map<KeyCacheObject, MvccTxEntry> entriesForCache = cached.computeIfAbsent(e.cacheId(), k -> new LinkedHashMap<>());
+ MvccTxEntry prev = entriesForCache.put(key, e);
+
+ // If key is updated more than once within transaction, we should copy old value
+ // (the value existed before tx started) from the previous entry to the new one.
if (prev != null && prev.oldValue() != null)
e.oldValue(prev.oldValue());
}
@@ -327,7 +391,7 @@ public class MvccCachingManager extends GridCacheSharedManagerAdapter {
/**
* @return Cached entries map.
*/
- synchronized Map<KeyCacheObject, MvccTxEntry> getCached() {
+ synchronized Map<Integer, Map<KeyCacheObject, MvccTxEntry>> getCached() {
flushPending();
return cached;
@@ -343,7 +407,17 @@ public class MvccCachingManager extends GridCacheSharedManagerAdapter {
for (Map.Entry<Integer, Map<KeyCacheObject, MvccTxEntry>> entry : pending.entrySet()) {
Map<KeyCacheObject, MvccTxEntry> vals = entry.getValue();
- cached.putAll(vals);
+ for (Map.Entry<KeyCacheObject, MvccTxEntry> e : vals.entrySet()) {
+ Map<KeyCacheObject, MvccTxEntry> entriesForCache = cached
+ .computeIfAbsent(e.getValue().cacheId(), k -> new LinkedHashMap<>());
+
+ MvccTxEntry prev = entriesForCache.put(e.getKey(), e.getValue());
+
+ // If key is updated more than once within transaction, we should copy old value
+ // (the value existed before tx started) from the previous entry to the new one.
+ if (prev != null && prev.oldValue() != null)
+ e.getValue().oldValue(prev.oldValue());
+ }
}
pending.clear();
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
index 503a514..7972c150 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
@@ -54,8 +54,8 @@ import org.apache.ignite.internal.processors.cache.GridCacheAdapter;
import org.apache.ignite.internal.processors.cache.GridCacheAffinityManager;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.GridCacheDeploymentManager;
-import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtLocalPartition;
import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicAbstractUpdateFuture;
+import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtLocalPartition;
import org.apache.ignite.internal.processors.cache.query.CacheQueryType;
import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryManager.JCacheQueryLocalListener;
import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryManager.JCacheQueryRemoteFilter;
@@ -78,6 +78,7 @@ import org.jetbrains.annotations.Nullable;
import static org.apache.ignite.events.EventType.EVT_CACHE_QUERY_EXECUTED;
import static org.apache.ignite.events.EventType.EVT_CACHE_QUERY_OBJECT_READ;
+import static org.apache.ignite.internal.processors.cache.distributed.dht.preloader.CachePartitionPartialCountersMap.toCountersMap;
/**
* Continuous query handler.
@@ -185,6 +186,9 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
/** */
private transient UUID routineId;
+ /** Local update counters values on listener start. Used for skipping events fired before the listener start. */
+ private transient volatile Map<Integer, T2<Long, Long>> locInitUpdCntrs;
+
/** */
private transient GridKernalContext ctx;
@@ -308,6 +312,11 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
}
/** {@inheritDoc} */
+ @Override public Map<Integer, T2<Long, Long>> updateCounters() {
+ return locInitUpdCntrs;
+ }
+
+ /** {@inheritDoc} */
@Override public RegisterStatus register(final UUID nodeId, final UUID routineId, final GridKernalContext ctx)
throws IgniteCheckedException {
assert nodeId != null;
@@ -368,6 +377,7 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
GridCacheContext<K, V> cctx = cacheContext(ctx);
if (cctx != null && cctx.events().isRecordable(EVT_CACHE_QUERY_EXECUTED)) {
+ //noinspection unchecked
ctx.event().record(new CacheQueryExecutedEvent<>(
ctx.discovery().localNode(),
"Continuous query executed.",
@@ -386,6 +396,13 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
}
}
+ @Override public void onRegister() {
+ GridCacheContext<K, V> cctx = cacheContext(ctx);
+
+ if (cctx != null && !cctx.isLocal())
+ locInitUpdCntrs = toCountersMap(cctx.topology().localUpdateCounters(false));
+ }
+
@Override public boolean keepBinary() {
return keepBinary;
}
@@ -406,6 +423,9 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
if (cctx == null)
return;
+ if (!needNotify(false, cctx, -1, -1, evt))
+ return;
+
// skipPrimaryCheck is set only when listen locally for replicated cache events.
assert !skipPrimaryCheck || (cctx.isReplicated() && ctx.localNodeId().equals(nodeId));
@@ -513,6 +533,9 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
if (skipCtx == null)
skipCtx = new CounterSkipContext(part, cntr, topVer);
+ if (!needNotify(true, cctx, part, cntr, null))
+ return skipCtx;
+
if (loc) {
assert !locOnly;
@@ -587,6 +610,38 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
@Override public boolean isPrimaryOnly() {
return locOnly && !skipPrimaryCheck;
}
+
+ /**
+ * Checks whether it is need to notify listeners.
+ *
+ * @param skipEvt {@code True} if this is a skip counter event.
+ * @param cctx Cache context.
+ * @param part Partition id.
+ * @param cntr Update counter.
+ * @param evt CQ event.
+ * @return {@code True} if notification should happen immediately, or {@code false} if it should be delayed.
+ */
+ private boolean needNotify(boolean skipEvt,
+ GridCacheContext cctx,
+ int part,
+ long cntr,
+ CacheContinuousQueryEvent evt) {
+ assert !skipEvt || evt == null;
+ assert skipEvt || part == -1 && cntr == -1; // part == -1 && cntr == -1 means skip counter.
+
+ if (!cctx.mvccEnabled() || cctx.isLocal())
+ return true;
+
+ assert locInitUpdCntrs != null;
+
+ cntr = skipEvt ? cntr : evt.getPartitionUpdateCounter();
+ part = skipEvt ? part : evt.partitionId();
+
+ T2<Long, Long> initCntr = locInitUpdCntrs.get(part);
+
+ // Do not notify listener if entry was updated before the query is started.
+ return initCntr == null || cntr >= initCntr.get2();
+ }
};
CacheContinuousQueryManager mgr = manager(ctx);
@@ -905,6 +960,7 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
}
if (recordIgniteEvt && notify) {
+ //noinspection unchecked
ctx.event().record(new CacheQueryReadEvent<>(
ctx.discovery().localNode(),
"Continuous query executed.",
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryListener.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryListener.java
index 34e3d86..a652c51 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryListener.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryListener.java
@@ -20,6 +20,7 @@ package org.apache.ignite.internal.processors.cache.query.continuous;
import java.util.Map;
import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.cache.CacheGroupContext;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicAbstractUpdateFuture;
import org.jetbrains.annotations.Nullable;
@@ -45,6 +46,12 @@ public interface CacheContinuousQueryListener<K, V> {
boolean recordIgniteEvt, @Nullable GridDhtAtomicAbstractUpdateFuture fut);
/**
+ * Listener registration callback.
+ * NOTE: This method should be called under the {@link CacheGroupContext#listenerLock} write lock held.
+ */
+ public void onRegister();
+
+ /**
* Listener unregistered callback.
*/
public void onUnregister();
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
index 5b93a7c..b9f8fc5 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
@@ -33,7 +33,6 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.locks.Lock;
import javax.cache.Cache;
import javax.cache.configuration.CacheEntryListenerConfiguration;
import javax.cache.configuration.Factory;
@@ -69,7 +68,6 @@ import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx
import org.apache.ignite.internal.processors.continuous.GridContinuousHandler;
import org.apache.ignite.internal.processors.timeout.GridTimeoutProcessor;
import org.apache.ignite.internal.util.GridLongList;
-import org.apache.ignite.internal.util.StripedCompositeReadWriteLock;
import org.apache.ignite.internal.util.tostring.GridToStringInclude;
import org.apache.ignite.internal.util.typedef.CI2;
import org.apache.ignite.internal.util.typedef.F;
@@ -131,9 +129,6 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter {
/** Ordered topic prefix. */
private String topicPrefix;
- /** ReadWriteLock to control the continuous query setup - this is to prevent the race between cache update and listener setup */
- private final StripedCompositeReadWriteLock listenerLock = new StripedCompositeReadWriteLock(Runtime.getRuntime().availableProcessors()) ;
-
/** Cancelable future task for backup cleaner */
private GridTimeoutProcessor.CancelableTask cancelableTask;
@@ -203,16 +198,6 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter {
}
/**
- * Obtain the listener read lock, which must be held if any component need to
- * read the list listener (generally caller to updateListener).
- *
- * @return Read lock for the listener update
- */
- public Lock getListenerReadLock() {
- return listenerLock.readLock();
- }
-
- /**
* @param tx Transaction.
* @return {@code True} if should notify continuous query manager.
*/
@@ -956,21 +941,24 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter {
intLsnrCnt.incrementAndGet();
}
else {
- listenerLock.writeLock().lock();
+ cctx.group().listenerLock().writeLock().lock();
try {
- if (lsnrCnt.get() == 0) {
- if (cctx.group().sharedGroup() && !cctx.isLocal())
- cctx.group().addCacheWithContinuousQuery(cctx);
- }
-
added = lsnrs.putIfAbsent(lsnrId, lsnr) == null;
- if (added)
+ if (added) {
lsnrCnt.incrementAndGet();
+
+ lsnr.onRegister();
+
+ if (lsnrCnt.get() == 1) {
+ if (cctx.group().sharedGroup() && !cctx.isLocal())
+ cctx.group().addCacheWithContinuousQuery(cctx);
+ }
+ }
}
finally {
- listenerLock.writeLock().unlock();
+ cctx.group().listenerLock().writeLock().unlock();
}
if (added)
@@ -996,7 +984,7 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter {
}
}
else {
- listenerLock.writeLock().lock();
+ cctx.group().listenerLock().writeLock().lock();
try {
if ((lsnr = lsnrs.remove(id)) != null) {
@@ -1007,7 +995,7 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter {
}
}
finally {
- listenerLock.writeLock().unlock();
+ cctx.group().listenerLock().writeLock().unlock();
}
if (lsnr != null)
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
index dd7f108..2d31360 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
@@ -1884,7 +1884,7 @@ public class IgniteTxHandler {
EntryProcessor entryProc = null;
Object[] invokeArgs = null;
- boolean needOldVal = ctx.shared().mvccCaching().continuousQueryListeners(ctx, tx, key) != null;
+ boolean needOldVal = tx.txState().useMvccCaching(ctx.cacheId());
Message val0 = vals != null ? vals.get(i) : null;
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxImplicitSingleStateImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxImplicitSingleStateImpl.java
index b1e1b02..c5f736a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxImplicitSingleStateImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxImplicitSingleStateImpl.java
@@ -58,6 +58,9 @@ public class IgniteTxImplicitSingleStateImpl extends IgniteTxLocalStateAdapter {
/** */
private boolean recovery;
+ /** */
+ private volatile boolean useMvccCaching;
+
/** {@inheritDoc} */
@Override public void addActiveCache(GridCacheContext ctx, boolean recovery, IgniteTxAdapter tx)
throws IgniteCheckedException {
@@ -68,6 +71,8 @@ public class IgniteTxImplicitSingleStateImpl extends IgniteTxLocalStateAdapter {
this.recovery = recovery;
tx.activeCachesDeploymentEnabled(cacheCtx.deploymentEnabled());
+
+ useMvccCaching = cacheCtx.mvccEnabled() && (cacheCtx.isDrEnabled() || cacheCtx.hasContinuousQueryListeners(tx));
}
/** {@inheritDoc} */
@@ -307,6 +312,13 @@ public class IgniteTxImplicitSingleStateImpl extends IgniteTxLocalStateAdapter {
}
/** {@inheritDoc} */
+ @Override public boolean useMvccCaching(int cacheId) {
+ assert cacheCtx == null || cacheCtx.cacheId() == cacheId;
+
+ return useMvccCaching;
+ }
+
+ /** {@inheritDoc} */
@Override public boolean recovery() {
return recovery;
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxRemoteStateAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxRemoteStateAdapter.java
index c1d973e..e145e95 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxRemoteStateAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxRemoteStateAdapter.java
@@ -22,6 +22,7 @@ import org.apache.ignite.cache.CacheWriteSynchronizationMode;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTopologyFuture;
+import org.apache.ignite.internal.processors.cache.mvcc.MvccCachingManager;
import org.apache.ignite.internal.util.GridIntList;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
import org.jetbrains.annotations.Nullable;
@@ -35,6 +36,9 @@ public abstract class IgniteTxRemoteStateAdapter implements IgniteTxRemoteState
/** Active cache IDs. */
private GridIntList activeCacheIds = new GridIntList();
+ /** Cache ids used for mvcc caching. See {@link MvccCachingManager}. */
+ private GridIntList mvccCachingCacheIds = new GridIntList();
+
/** {@inheritDoc} */
@Override public boolean implicitSingle() {
return false;
@@ -81,8 +85,12 @@ public abstract class IgniteTxRemoteStateAdapter implements IgniteTxRemoteState
int cacheId = cctx.cacheId();
// Check if we can enlist new cache to transaction.
- if (!activeCacheIds.contains(cacheId))
+ if (!activeCacheIds.contains(cacheId)) {
activeCacheIds.add(cacheId);
+
+ if (cctx.mvccEnabled() && (cctx.hasContinuousQueryListeners(tx) || cctx.isDrEnabled()))
+ mvccCachingCacheIds.add(cacheId);
+ }
}
/** {@inheritDoc} */
@@ -116,4 +124,9 @@ public abstract class IgniteTxRemoteStateAdapter implements IgniteTxRemoteState
@Override public void onTxEnd(GridCacheSharedContext cctx, IgniteInternalTx tx, boolean commit) {
assert false;
}
+
+ /** {@inheritDoc} */
+ @Override public boolean useMvccCaching(int cacheId) {
+ return mvccCachingCacheIds.contains(cacheId);
+ }
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxState.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxState.java
index 2039cc92..6feab42 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxState.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxState.java
@@ -25,6 +25,7 @@ import org.apache.ignite.cache.CacheWriteSynchronizationMode;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTopologyFuture;
+import org.apache.ignite.internal.processors.cache.mvcc.MvccCachingManager;
import org.apache.ignite.internal.processors.cache.store.CacheStoreManager;
import org.apache.ignite.internal.util.GridIntList;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
@@ -193,4 +194,11 @@ public interface IgniteTxState {
* @return {@code True} if MVCC mode is enabled for transaction.
*/
public boolean mvccEnabled();
+
+ /**
+ * @param cacheId Cache id.
+ * @return {@code True} if it is need to store in the heap updates made by the current TX for the given cache.
+ * These updates will be used for CQ and DR. See {@link MvccCachingManager}.
+ */
+ public boolean useMvccCaching(int cacheId);
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxStateImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxStateImpl.java
index 9cbea0f..b5a0539 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxStateImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxStateImpl.java
@@ -34,6 +34,7 @@ import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
import org.apache.ignite.internal.processors.cache.KeyCacheObject;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTopologyFuture;
+import org.apache.ignite.internal.processors.cache.mvcc.MvccCachingManager;
import org.apache.ignite.internal.processors.cache.store.CacheStoreManager;
import org.apache.ignite.internal.util.GridIntList;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
@@ -76,6 +77,9 @@ public class IgniteTxStateImpl extends IgniteTxLocalStateAdapter {
@GridToStringInclude
protected Boolean mvccEnabled;
+ /** Cache ids used for mvcc caching. See {@link MvccCachingManager}. */
+ private GridIntList mvccCachingCacheIds = new GridIntList();
+
/** {@inheritDoc} */
@Override public boolean implicitSingle() {
return false;
@@ -261,9 +265,13 @@ public class IgniteTxStateImpl extends IgniteTxLocalStateAdapter {
", cacheSystem=" + cacheCtx.systemTx() +
", txSystem=" + tx.system() + ']');
}
- else
+ else {
activeCacheIds.add(cacheId);
+ if (cacheCtx.mvccEnabled() && (cacheCtx.hasContinuousQueryListeners(tx) || cacheCtx.isDrEnabled()))
+ mvccCachingCacheIds.add(cacheId);
+ }
+
if (activeCacheIds.size() == 1)
tx.activeCachesDeploymentEnabled(cacheCtx.deploymentEnabled());
}
@@ -491,6 +499,11 @@ public class IgniteTxStateImpl extends IgniteTxLocalStateAdapter {
}
/** {@inheritDoc} */
+ @Override public boolean useMvccCaching(int cacheId) {
+ return mvccCachingCacheIds.contains(cacheId);
+ }
+
+ /** {@inheritDoc} */
@Override public String toString() {
return S.toString(IgniteTxStateImpl.class, this, "txMap", allEntriesCopy());
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/TxCounters.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/TxCounters.java
index b12ee56..515aeef 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/TxCounters.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/TxCounters.java
@@ -37,7 +37,7 @@ public class TxCounters {
private final Map<Integer, Map<Integer, AtomicLong>> updCntrsAcc = new HashMap<>();
/** Final update counters for cache partitions in the end of transaction */
- private Collection<PartitionUpdateCountersMessage> updCntrs;
+ private volatile Collection<PartitionUpdateCountersMessage> updCntrs;
/** Counter tracking number of entries locked by tx. */
private final AtomicInteger lockCntr = new AtomicInteger();
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousHandler.java
index 9801746..7cd4bf9 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousHandler.java
@@ -163,4 +163,9 @@ public interface GridContinuousHandler extends Externalizable, Cloneable {
*/
public void updateCounters(AffinityTopologyVersion topVer, Map<UUID, Map<Integer, T2<Long, Long>>> cntrsPerNode,
Map<Integer, T2<Long, Long>> cntrs);
+
+ /**
+ * @return Init state for partition counters.
+ */
+ public Map<Integer, T2<Long, Long>> updateCounters();
}
\ No newline at end of file
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
index 2456efa..bbd2290 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
@@ -1423,8 +1423,7 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
GridCacheAdapter cache = ctx.cache().internalCache(hnd0.cacheName());
if (cache != null && !cache.isLocal() && cache.context().userCache())
- req.addUpdateCounters(ctx.localNodeId(),
- toCountersMap(cache.context().topology().localUpdateCounters(false)));
+ req.addUpdateCounters(ctx.localNodeId(), hnd0.updateCounters());
}
}
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/eviction/paged/PageEvictionMetricTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/eviction/paged/PageEvictionMetricTest.java
index be1705a..9c2facc 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/eviction/paged/PageEvictionMetricTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/eviction/paged/PageEvictionMetricTest.java
@@ -53,7 +53,7 @@ public class PageEvictionMetricTest extends PageEvictionAbstractTest {
/**
* @throws Exception If failed.
*/
- @Ignore("https://issues.apache.org/jira/browse/IGNITE-10448")
+ @Ignore("https://issues.apache.org/jira/browse/IGNITE-10738")
@Test
public void testPageEvictionMetricMvcc() throws Exception {
checkPageEvictionMetric(CacheAtomicityMode.TRANSACTIONAL_SNAPSHOT);
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/eviction/paged/PageEvictionPagesRecyclingAndReusingTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/eviction/paged/PageEvictionPagesRecyclingAndReusingTest.java
index 1b54833..13e54eb 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/eviction/paged/PageEvictionPagesRecyclingAndReusingTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/eviction/paged/PageEvictionPagesRecyclingAndReusingTest.java
@@ -91,7 +91,7 @@ public class PageEvictionPagesRecyclingAndReusingTest extends PageEvictionAbstra
/**
* @throws Exception If failed.
*/
- @Ignore("https://issues.apache.org/jira/browse/IGNITE-10448")
+ @Ignore("https://issues.apache.org/jira/browse/IGNITE-10738")
@Test
public void testPagesRecyclingAndReusingMvccTxPartitioned() throws Exception {
testPagesRecyclingAndReusing(CacheAtomicityMode.TRANSACTIONAL_SNAPSHOT, CacheMode.PARTITIONED);
@@ -101,7 +101,7 @@ public class PageEvictionPagesRecyclingAndReusingTest extends PageEvictionAbstra
/**
* @throws Exception If failed.
*/
- @Ignore("https://issues.apache.org/jira/browse/IGNITE-10448")
+ @Ignore("https://issues.apache.org/jira/browse/IGNITE-10738")
@Test
public void testPagesRecyclingAndReusingMvccTxReplicated() throws Exception {
testPagesRecyclingAndReusing(CacheAtomicityMode.TRANSACTIONAL_SNAPSHOT, CacheMode.REPLICATED);
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/eviction/paged/PageEvictionTouchOrderTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/eviction/paged/PageEvictionTouchOrderTest.java
index 8306084..12a2097 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/eviction/paged/PageEvictionTouchOrderTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/eviction/paged/PageEvictionTouchOrderTest.java
@@ -79,7 +79,7 @@ public class PageEvictionTouchOrderTest extends PageEvictionAbstractTest {
/**
* @throws Exception If failed.
*/
- @Ignore("https://issues.apache.org/jira/browse/IGNITE-10448,https://issues.apache.org/jira/browse/IGNITE-7956")
+ @Ignore("https://issues.apache.org/jira/browse/IGNITE-10738,https://issues.apache.org/jira/browse/IGNITE-7956")
@Test
public void testTouchOrderWithFairFifoEvictionMvccTxReplicated() throws Exception {
testTouchOrderWithFairFifoEviction(CacheAtomicityMode.TRANSACTIONAL_SNAPSHOT, CacheMode.REPLICATED);
@@ -88,7 +88,7 @@ public class PageEvictionTouchOrderTest extends PageEvictionAbstractTest {
/**
* @throws Exception If failed.
*/
- @Ignore("https://issues.apache.org/jira/browse/IGNITE-10448,https://issues.apache.org/jira/browse/IGNITE-7956")
+ @Ignore("https://issues.apache.org/jira/browse/IGNITE-10738,https://issues.apache.org/jira/browse/IGNITE-7956")
@Test
public void testTouchOrderWithFairFifoEvictionMvccTxPartitioned() throws Exception {
testTouchOrderWithFairFifoEviction(CacheAtomicityMode.TRANSACTIONAL_SNAPSHOT, CacheMode.PARTITIONED);
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/eviction/paged/PageEvictionWithRebalanceAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/eviction/paged/PageEvictionWithRebalanceAbstractTest.java
index 38b5dc7..cedd773 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/eviction/paged/PageEvictionWithRebalanceAbstractTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/eviction/paged/PageEvictionWithRebalanceAbstractTest.java
@@ -41,7 +41,7 @@ public abstract class PageEvictionWithRebalanceAbstractTest extends PageEviction
/**
* @throws Exception If failed.
*/
- @Ignore("https://issues.apache.org/jira/browse/IGNITE-10448")
+ @Ignore("https://issues.apache.org/jira/browse/IGNITE-10738")
@Test
public void testEvictionWithRebalanceMvcc() throws Exception {
checkEvictionWithRebalance(CacheAtomicityMode.TRANSACTIONAL_SNAPSHOT);
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryConcurrentPartitionUpdateTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryConcurrentPartitionUpdateTest.java
index 928c33a..2757650 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryConcurrentPartitionUpdateTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryConcurrentPartitionUpdateTest.java
@@ -43,7 +43,6 @@ import org.apache.ignite.testframework.GridTestUtils;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import org.apache.ignite.transactions.Transaction;
import org.apache.ignite.transactions.TransactionSerializationException;
-import org.junit.Ignore;
import org.junit.Test;
import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC;
@@ -315,7 +314,6 @@ public class CacheContinuousQueryConcurrentPartitionUpdateTest extends GridCommo
/**
* @throws Exception If failed.
*/
- @Ignore("https://issues.apache.org/jira/browse/IGNITE-10755")
@Test
public void testConcurrentUpdatesAndQueryStartMvccTxCacheGroup() throws Exception {
concurrentUpdatesAndQueryStart(TRANSACTIONAL_SNAPSHOT, true);
@@ -429,7 +427,8 @@ public class CacheContinuousQueryConcurrentPartitionUpdateTest extends GridCommo
U.sleep(1000);
for (String cache : caches)
- qrys.add(startListener(client.cache(cache)));
+ for (int l = 0; l < 10; l++)
+ qrys.add(startListener(client.cache(cache)));
U.sleep(1000);
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryExecuteInPrimaryTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryExecuteInPrimaryTest.java
index dd01540..bbcd62f 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryExecuteInPrimaryTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryExecuteInPrimaryTest.java
@@ -17,9 +17,20 @@
package org.apache.ignite.internal.processors.cache.query.continuous;
+import java.io.Serializable;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
import javax.cache.Cache;
import javax.cache.configuration.FactoryBuilder;
-
+import javax.cache.event.CacheEntryEvent;
+import javax.cache.event.CacheEntryListenerException;
+import javax.cache.event.CacheEntryUpdatedListener;
+import javax.cache.processor.EntryProcessor;
+import javax.cache.processor.EntryProcessorException;
+import javax.cache.processor.MutableEntry;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.cache.CacheAtomicityMode;
@@ -33,19 +44,6 @@ import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import org.apache.ignite.transactions.Transaction;
import org.jetbrains.annotations.NotNull;
-
-import javax.cache.event.CacheEntryEvent;
-import javax.cache.event.CacheEntryListenerException;
-import javax.cache.event.CacheEntryUpdatedListener;
-import javax.cache.processor.EntryProcessor;
-import javax.cache.processor.EntryProcessorException;
-import javax.cache.processor.MutableEntry;
-import java.io.Serializable;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
import org.junit.Ignore;
import org.junit.Test;
@@ -228,7 +226,7 @@ public class CacheContinuousQueryExecuteInPrimaryTest extends GridCommonAbstract
}
}));
- executeQuery(cache, qry, ccfg.getAtomicityMode() == TRANSACTIONAL);
+ executeQuery(cache, qry, ccfg.getAtomicityMode() != ATOMIC);
}
assertTrue(noOneListen.get());
@@ -326,7 +324,7 @@ public class CacheContinuousQueryExecuteInPrimaryTest extends GridCommonAbstract
));
// Execute query.
- executeQuery(cache, qry, ccfg.getAtomicityMode() == TRANSACTIONAL);
+ executeQuery(cache, qry, ccfg.getAtomicityMode() != ATOMIC);
assertTrue(latch.await(LATCH_TIMEOUT, MILLISECONDS));
assertEquals(16, cnt.get());