You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ni...@apache.org on 2022/02/22 15:32:34 UTC
[ignite] branch master updated: IGNITE-16587 Fix putAllConflict, removeAllConflict for tx caches (#9836)
This is an automated email from the ASF dual-hosted git repository.
nizhikov pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new 4b81346 IGNITE-16587 Fix putAllConflict, removeAllConflict for tx caches (#9836)
4b81346 is described below
commit 4b813466bbf01d5335da7f25d7c1ffe8e08f51a6
Author: Nikolay <ni...@apache.org>
AuthorDate: Tue Feb 22 18:31:56 2022 +0300
IGNITE-16587 Fix putAllConflict, removeAllConflict for tx caches (#9836)
---
.../processors/cache/GridCacheMapEntry.java | 18 +-
.../GridDistributedTxRemoteAdapter.java | 3 +-
.../dht/colocated/GridDhtDetachedCacheEntry.java | 8 +-
.../cache/distributed/near/GridNearCacheEntry.java | 8 +-
.../cache/distributed/near/GridNearTxLocal.java | 28 +-
.../cache/version/GridCacheVersionEx.java | 14 +-
.../org/apache/ignite/cdc/CdcCacheVersionTest.java | 380 ++++++++++++++++-----
7 files changed, 338 insertions(+), 121 deletions(-)
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
index 456f652..951b41c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
@@ -79,7 +79,6 @@ import org.apache.ignite.internal.processors.cache.tree.mvcc.data.ResultType;
import org.apache.ignite.internal.processors.cache.version.GridCacheLazyPlainVersionedEntry;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersionConflictContext;
-import org.apache.ignite.internal.processors.cache.version.GridCacheVersionEx;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersionedEntryEx;
import org.apache.ignite.internal.processors.dr.GridDrType;
import org.apache.ignite.internal.processors.platform.PlatformProcessor;
@@ -129,6 +128,7 @@ import static org.apache.ignite.internal.processors.cache.distributed.dht.topolo
import static org.apache.ignite.internal.processors.cache.mvcc.MvccUtils.MVCC_MAX_SNAPSHOT;
import static org.apache.ignite.internal.processors.cache.mvcc.MvccUtils.compareIgnoreOpCounter;
import static org.apache.ignite.internal.processors.cache.persistence.CacheDataRowAdapter.RowData.NO_KEY;
+import static org.apache.ignite.internal.processors.cache.version.GridCacheVersionEx.addConflictVersion;
import static org.apache.ignite.internal.processors.dr.GridDrType.DR_NONE;
/**
@@ -1562,7 +1562,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
updateCntr0 = nextPartitionCounter(tx, updateCntr);
if (tx != null && cctx.group().persistenceEnabled() && cctx.group().walEnabled())
- logPtr = logTxUpdate(tx, val, expireTime, updateCntr0);
+ logPtr = logTxUpdate(tx, val, addConflictVersion(tx.writeVersion(), newVer), expireTime, updateCntr0);
update(val, expireTime, ttl, newVer, true);
@@ -1787,7 +1787,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
updateCntr0 = nextPartitionCounter(tx, updateCntr);
if (tx != null && cctx.group().persistenceEnabled() && cctx.group().walEnabled())
- logPtr = logTxUpdate(tx, null, 0, updateCntr0);
+ logPtr = logTxUpdate(tx, null, addConflictVersion(tx.writeVersion(), newVer), 0, updateCntr0);
drReplicate(drType, null, newVer, topVer);
@@ -4352,6 +4352,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
/**
* @param tx Transaction.
* @param val Value.
+ * @param writeVer New entry version.
* @param expireTime Expire time (or 0 if not applicable).
* @param updCntr Update counter.
* @throws IgniteCheckedException In case of log failure.
@@ -4359,6 +4360,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
protected WALPointer logTxUpdate(
IgniteInternalTx tx,
CacheObject val,
+ GridCacheVersion writeVer,
long expireTime,
long updCntr
) throws IgniteCheckedException {
@@ -4377,7 +4379,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
val,
op,
tx.nearXidVersion(),
- tx.writeVersion(),
+ writeVer,
expireTime,
key.partition(),
updCntr,
@@ -6229,13 +6231,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
}
// Incorporate conflict version into new version if needed.
- if (conflictVer != null && conflictVer != newVer) {
- newVer = new GridCacheVersionEx(newVer.topologyVersion(),
- newVer.order(),
- newVer.nodeOrder(),
- newVer.dataCenterId(),
- conflictVer);
- }
+ newVer = addConflictVersion(newVer, conflictVer);
if (op == UPDATE) {
assert writeObj != null;
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java
index 89be03f..72be779 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java
@@ -88,6 +88,7 @@ import static org.apache.ignite.internal.processors.cache.GridCacheOperation.REL
import static org.apache.ignite.internal.processors.cache.GridCacheOperation.UPDATE;
import static org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState.EVICTED;
import static org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState.RENTING;
+import static org.apache.ignite.internal.processors.cache.version.GridCacheVersionEx.addConflictVersion;
import static org.apache.ignite.internal.processors.dr.GridDrType.DR_BACKUP;
import static org.apache.ignite.internal.processors.dr.GridDrType.DR_NONE;
import static org.apache.ignite.transactions.TransactionState.COMMITTED;
@@ -628,7 +629,7 @@ public abstract class GridDistributedTxRemoteAdapter extends IgniteTxAdapter
val,
op,
nearXidVersion(),
- writeVersion(),
+ addConflictVersion(writeVersion(), txEntry.conflictVersion()),
0,
txEntry.key().partition(),
txEntry.updateCounter(),
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtDetachedCacheEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtDetachedCacheEntry.java
index 7c66afb..64fbb03 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtDetachedCacheEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtDetachedCacheEntry.java
@@ -85,7 +85,13 @@ public class GridDhtDetachedCacheEntry extends GridDistributedCacheEntry {
}
/** {@inheritDoc} */
- @Override protected WALPointer logTxUpdate(IgniteInternalTx tx, CacheObject val, long expireTime, long updCntr) {
+ @Override protected WALPointer logTxUpdate(
+ IgniteInternalTx tx,
+ CacheObject val,
+ GridCacheVersion writeVer,
+ long expireTime,
+ long updCntr
+ ) {
return null;
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheEntry.java
index d5d201e..1eee3f3 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheEntry.java
@@ -473,7 +473,13 @@ public class GridNearCacheEntry extends GridDistributedCacheEntry {
}
/** {@inheritDoc} */
- @Override protected WALPointer logTxUpdate(IgniteInternalTx tx, CacheObject val, long expireTime, long updCntr) {
+ @Override protected WALPointer logTxUpdate(
+ IgniteInternalTx tx,
+ CacheObject val,
+ GridCacheVersion writeVer,
+ long expireTime,
+ long updCntr
+ ) {
return null;
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
index 733ec86..cbea2b4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
@@ -1253,20 +1253,20 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements GridTimeou
else if (drRmvMap != null) {
assert drRmvMap.get(key) != null;
- drVer = drRmvMap.get(key);
- drTtl = -1L;
- drExpireTime = -1L;
- }
- else if (dataCenterId != null) {
- drVer = cacheCtx.cache().nextVersion(dataCenterId);
- drTtl = -1L;
- drExpireTime = -1L;
- }
- else {
- drVer = null;
- drTtl = -1L;
- drExpireTime = -1L;
- }
+ drVer = drRmvMap.get(key);
+ drTtl = -1L;
+ drExpireTime = -1L;
+ }
+ else if (dataCenterId != null) {
+ drVer = cacheCtx.cache().nextVersion(dataCenterId);
+ drTtl = -1L;
+ drExpireTime = -1L;
+ }
+ else {
+ drVer = null;
+ drTtl = -1L;
+ drExpireTime = -1L;
+ }
if (!rmv && val == null && entryProcessor == null) {
setRollbackOnly();
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/GridCacheVersionEx.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/GridCacheVersionEx.java
index 4848893..5a08a7d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/GridCacheVersionEx.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/GridCacheVersionEx.java
@@ -52,7 +52,7 @@ public class GridCacheVersionEx extends GridCacheVersion {
* @param dataCenterId Data center ID.
* @param drVer DR version.
*/
- public GridCacheVersionEx(int topVer, long order, int nodeOrder, byte dataCenterId,
+ private GridCacheVersionEx(int topVer, long order, int nodeOrder, byte dataCenterId,
GridCacheVersion drVer) {
super(topVer, order, nodeOrder, dataCenterId);
@@ -171,4 +171,16 @@ public class GridCacheVersionEx extends GridCacheVersion {
", dataCenterId=" + dataCenterId() +
", drVer=" + drVer + ']';
}
+
+ /** @return If {@code ver != conflictVer} then {@code ver} with {@code conflictVer} added to it. */
+ public static GridCacheVersion addConflictVersion(GridCacheVersion ver, GridCacheVersion conflictVer) {
+ if (conflictVer == null || conflictVer == ver)
+ return ver;
+
+ return new GridCacheVersionEx(ver.topologyVersion(),
+ ver.order(),
+ ver.nodeOrder(),
+ ver.dataCenterId(),
+ conflictVer);
+ }
}
diff --git a/modules/core/src/test/java/org/apache/ignite/cdc/CdcCacheVersionTest.java b/modules/core/src/test/java/org/apache/ignite/cdc/CdcCacheVersionTest.java
index efea107..8d830c4 100644
--- a/modules/core/src/test/java/org/apache/ignite/cdc/CdcCacheVersionTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/cdc/CdcCacheVersionTest.java
@@ -17,19 +17,31 @@
package org.apache.ignite.cdc;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.EnumSet;
import java.util.HashMap;
-import java.util.Iterator;
+import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Function;
+import java.util.function.Supplier;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.cache.CacheMode;
+import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.DataRegionConfiguration;
import org.apache.ignite.configuration.DataStorageConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.IgniteEx;
-import org.apache.ignite.internal.IgniteInternalFuture;
-import org.apache.ignite.internal.cdc.CdcMain;
+import org.apache.ignite.internal.IgniteInterruptedCheckedException;
+import org.apache.ignite.internal.pagemem.wal.IgniteWriteAheadLogManager;
+import org.apache.ignite.internal.pagemem.wal.record.DataEntry;
+import org.apache.ignite.internal.pagemem.wal.record.DataRecord;
+import org.apache.ignite.internal.pagemem.wal.record.WALRecord;
import org.apache.ignite.internal.processors.cache.CacheConflictResolutionManager;
import org.apache.ignite.internal.processors.cache.CacheObject;
import org.apache.ignite.internal.processors.cache.CacheObjectImpl;
@@ -39,35 +51,44 @@ import org.apache.ignite.internal.processors.cache.IgniteInternalCache;
import org.apache.ignite.internal.processors.cache.KeyCacheObject;
import org.apache.ignite.internal.processors.cache.KeyCacheObjectImpl;
import org.apache.ignite.internal.processors.cache.dr.GridCacheDrInfo;
+import org.apache.ignite.internal.processors.cache.persistence.wal.FileWriteAheadLogManager;
+import org.apache.ignite.internal.processors.cache.persistence.wal.WALPointer;
import org.apache.ignite.internal.processors.cache.version.CacheVersionConflictResolver;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersionConflictContext;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersionedEntryEx;
-import org.apache.ignite.internal.processors.metric.MetricRegistry;
-import org.apache.ignite.internal.util.typedef.G;
+import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.plugin.AbstractCachePluginProvider;
import org.apache.ignite.plugin.AbstractTestPluginProvider;
import org.apache.ignite.plugin.CachePluginContext;
import org.apache.ignite.plugin.CachePluginProvider;
+import org.apache.ignite.plugin.PluginContext;
import org.apache.ignite.spi.metric.IntMetric;
import org.apache.ignite.spi.systemview.view.CacheView;
import org.apache.ignite.spi.systemview.view.SystemView;
+import org.apache.ignite.transactions.Transaction;
+import org.apache.ignite.transactions.TransactionConcurrency;
+import org.apache.ignite.transactions.TransactionIsolation;
import org.jetbrains.annotations.Nullable;
import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC;
+import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
+import static org.apache.ignite.cache.CacheMode.PARTITIONED;
+import static org.apache.ignite.cache.CacheMode.REPLICATED;
import static org.apache.ignite.cluster.ClusterState.ACTIVE;
+import static org.apache.ignite.internal.pagemem.wal.record.WALRecord.RecordType.DATA_RECORD_V2;
import static org.apache.ignite.internal.processors.cache.CacheMetricsImpl.CACHE_METRICS;
import static org.apache.ignite.internal.processors.cache.ClusterCachesInfo.CACHES_VIEW;
import static org.apache.ignite.internal.processors.cache.version.GridCacheVersionManager.DATA_VER_CLUSTER_ID;
-import static org.apache.ignite.testframework.GridTestUtils.runAsync;
import static org.apache.ignite.testframework.GridTestUtils.waitForCondition;
/** */
+@RunWith(Parameterized.class)
public class CdcCacheVersionTest extends AbstractCdcTest {
/** */
- public static final String FOR_OTHER_CLUSTER_ID = "for-other-cluster-id";
-
- /** */
public static final byte DFLT_CLUSTER_ID = 1;
/** */
@@ -76,6 +97,43 @@ public class CdcCacheVersionTest extends AbstractCdcTest {
/** */
public static final int KEY_TO_UPD = 42;
+ /** */
+ @Parameterized.Parameter
+ public CacheAtomicityMode atomicityMode;
+
+ /** */
+ @Parameterized.Parameter(1)
+ public CacheMode cacheMode;
+
+ /** */
+ @Parameterized.Parameter(2)
+ public int gridCnt;
+
+ /** */
+ private final AtomicLong walRecCheckedCntr = new AtomicLong();
+
+ /** */
+ private final AtomicLong conflictCheckedCntr = new AtomicLong();
+
+ /** */
+ private volatile Function<GridKernalContext, IgniteWriteAheadLogManager> walProvider;
+
+ /** */
+ private volatile Supplier<CacheVersionConflictResolver> conflictResolutionMgrSupplier;
+
+ /** */
+ @Parameterized.Parameters(name = "atomicity={0}, mode={1}, gridCnt={2}")
+ public static Collection<?> parameters() {
+ List<Object[]> params = new ArrayList<>();
+
+ for (CacheAtomicityMode atomicity : EnumSet.of(ATOMIC, TRANSACTIONAL))
+ for (CacheMode mode : EnumSet.of(PARTITIONED, REPLICATED))
+ for (int gridCnt : new int[] {1, 3})
+ params.add(new Object[] {atomicity, mode, gridCnt});
+
+ return params;
+ }
+
/** {@inheritDoc} */
@Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
@@ -91,139 +149,260 @@ public class CdcCacheVersionTest extends AbstractCdcTest {
}
@Override public CachePluginProvider createCacheProvider(CachePluginContext ctx) {
- if (!ctx.igniteCacheConfiguration().getName().equals(FOR_OTHER_CLUSTER_ID))
+ if (!ctx.igniteCacheConfiguration().getName().equals(DEFAULT_CACHE_NAME))
return null;
return new AbstractCachePluginProvider() {
@Override public @Nullable Object createComponent(Class cls) {
- if (cls != CacheConflictResolutionManager.class)
+ if (cls != CacheConflictResolutionManager.class || conflictResolutionMgrSupplier == null)
return null;
- return new TestCacheConflictResolutionManager();
+ return new TestCacheConflictResolutionManager<>();
}
};
}
+
+ @Override public <T> @Nullable T createComponent(PluginContext ctx, Class<T> cls) {
+ if (IgniteWriteAheadLogManager.class.equals(cls))
+ return (T)walProvider.apply(((IgniteEx)ctx.grid()).context());
+
+ return null;
+ }
});
return cfg;
}
- /** Simplest CDC test with usage of {@link IgniteInternalCache#putAllConflict(Map)}. */
+ /** Test that conflict version is writtern to WAL. */
@Test
- public void testReadAllKeysFromOtherCluster() throws Exception {
- IgniteConfiguration cfg = getConfiguration("ignite-conflict-resolver");
+ public void testConflictVersionWritten() throws Exception {
+ walProvider = (ctx) -> new FileWriteAheadLogManager(ctx) {
+ @Override public WALPointer log(WALRecord rec) throws IgniteCheckedException {
+ if (rec.type() != DATA_RECORD_V2)
+ return super.log(rec);
- IgniteEx ign = startGrid(cfg);
+ DataRecord dataRec = (DataRecord)rec;
- ign.context().cache().context().versions().dataCenterId(DFLT_CLUSTER_ID);
- ign.cluster().state(ACTIVE);
+ for (int i = 0; i < dataRec.entryCount(); i++) {
+ DataEntry dataEntry = dataRec.writeEntries().get(i);
+
+ assertEquals(CU.cacheId(DEFAULT_CACHE_NAME), dataEntry.cacheId());
+ assertEquals(DFLT_CLUSTER_ID, dataEntry.writeVersion().dataCenterId());
+ assertNotNull(dataEntry.writeVersion().conflictVersion());
+ assertEquals(OTHER_CLUSTER_ID, dataEntry.writeVersion().conflictVersion().dataCenterId());
+
+ walRecCheckedCntr.incrementAndGet();
+ }
- UserCdcConsumer cnsmr = new UserCdcConsumer() {
- @Override public void checkEvent(CdcEvent evt) {
- assertEquals(DFLT_CLUSTER_ID, evt.version().clusterId());
- assertEquals(OTHER_CLUSTER_ID, evt.version().otherClusterVersion().clusterId());
+ return super.log(rec);
}
};
- IgniteCache<Integer, User> cache = ign.getOrCreateCache(FOR_OTHER_CLUSTER_ID);
+ conflictResolutionMgrSupplier = () -> new CacheVersionConflictResolver() {
+ @Override public <K1, V1> GridCacheVersionConflictContext<K1, V1> resolve(
+ CacheObjectValueContext ctx,
+ GridCacheVersionedEntryEx<K1, V1> oldEntry,
+ GridCacheVersionedEntryEx<K1, V1> newEntry,
+ boolean atomicVerComparator
+ ) {
+ GridCacheVersionConflictContext<K1, V1> res =
+ new GridCacheVersionConflictContext<>(ctx, oldEntry, newEntry);
- addAndWaitForConsumption(cnsmr, cfg, cache, null, this::addConflictData, 0, KEYS_CNT, true);
+ res.useNew();
- assertEquals(
- DFLT_CLUSTER_ID,
- ign.context().metric().registry(CACHE_METRICS).<IntMetric>findMetric(DATA_VER_CLUSTER_ID).value()
- );
+ assertEquals(OTHER_CLUSTER_ID, newEntry.version().dataCenterId());
- boolean found = false;
+ if (!oldEntry.isStartVersion())
+ assertEquals(OTHER_CLUSTER_ID, oldEntry.version().dataCenterId());
- SystemView<CacheView> caches = ign.context().systemView().view(CACHES_VIEW);
+ conflictCheckedCntr.incrementAndGet();
- for (CacheView v : caches) {
- if (v.cacheName().equals(FOR_OTHER_CLUSTER_ID)) {
- assertEquals(v.conflictResolver(), "TestCacheConflictResolutionManager");
+ return res;
+ }
- found = true;
+ @Override public String toString() {
+ return "TestCacheConflictResolutionManager";
}
- else
- assertNull(v.conflictResolver());
+ };
+
+ startGrids(gridCnt);
+ IgniteEx cli = startClientGrid(gridCnt);
+
+ for (int i = 0; i < gridCnt; i++) {
+ grid(i).context().cache().context().versions().dataCenterId(DFLT_CLUSTER_ID);
+
+ assertEquals(
+ DFLT_CLUSTER_ID,
+ grid(i).context().metric().registry(CACHE_METRICS).<IntMetric>findMetric(DATA_VER_CLUSTER_ID).value()
+ );
+ }
+
+ cli.cluster().state(ACTIVE);
+
+ IgniteCache<Integer, User> cache = cli.getOrCreateCache(
+ new CacheConfiguration<Integer, User>(DEFAULT_CACHE_NAME)
+ .setCacheMode(cacheMode)
+ .setAtomicityMode(atomicityMode)
+ .setBackups(Integer.MAX_VALUE));
+
+ if (atomicityMode == ATOMIC)
+ putRemoveCheck(cli, cache, null, null);
+ else {
+ // Check operations for transaction cache without explicit transaction.
+ putRemoveCheck(cli, cache, null, null);
+
+ // Check operations for transaction cache with explicit transaction in all modes.
+ for (TransactionConcurrency concurrency : TransactionConcurrency.values())
+ for (TransactionIsolation isolation : TransactionIsolation.values())
+ putRemoveCheck(cli, cache, concurrency, isolation);
+ }
+
+ for (int i = 0; i < gridCnt; i++) {
+ boolean dfltCacheFound = false;
+
+ assertFalse(grid(i).context().clientNode());
+
+ SystemView<CacheView> caches = grid(i).context().systemView().view(CACHES_VIEW);
+
+ for (CacheView v : caches) {
+ if (v.cacheName().equals(DEFAULT_CACHE_NAME)) {
+ assertEquals(v.conflictResolver(), "TestCacheConflictResolutionManager");
+
+ dfltCacheFound = true;
+ }
+ else
+ assertNull(v.conflictResolver());
+ }
+
+ assertTrue(dfltCacheFound);
}
- assertTrue(found);
}
/** */
- @Test
- public void testOrderIncrease() throws Exception {
- IgniteConfiguration cfg = getConfiguration("ignite-0");
+ private void putRemoveCheck(
+ IgniteEx cli,
+ IgniteCache<Integer, User> cache,
+ TransactionConcurrency concurrency,
+ TransactionIsolation isolation
+ ) throws Exception {
+ conflictCheckedCntr.set(0);
+ walRecCheckedCntr.set(0);
- IgniteEx ign = startGrid(cfg);
+ // Put data with conflict version.
+ // Conflict version will be checked during WAL record and conflict resolution.
+ addConflictData(cli, cache, 0, KEYS_CNT, concurrency, isolation);
- ign.cluster().state(ACTIVE);
+ checkResolverAndWal();
- AtomicLong updCntr = new AtomicLong(0);
+ // Replacing existing data.
+ addConflictData(cli, cache, 0, KEYS_CNT, concurrency, isolation);
- CdcConsumer cnsmr = new CdcConsumer() {
- private long order = -1;
+ checkResolverAndWal();
- @Override public boolean onEvents(Iterator<CdcEvent> evts) {
- evts.forEachRemaining(evt -> {
- assertEquals(KEY_TO_UPD, evt.key());
+ // Removing existing data with conflict version.
+ removeConflictData(cli, cache, 0, KEYS_CNT, concurrency, isolation);
- assertTrue(evt.version().order() > order);
+ checkResolverAndWal();
+ }
- order = evt.version().order();
+ /** */
+ private void checkResolverAndWal() throws IgniteInterruptedCheckedException {
+ // Conflict resolver for ATOMIC caches invoked only on primary.
+ long expConflictResolverCnt = atomicityMode == ATOMIC ? KEYS_CNT : (KEYS_CNT * (long)gridCnt);
- updCntr.incrementAndGet();
- });
+ if (!waitForCondition(() -> conflictCheckedCntr.get() == expConflictResolverCnt, WAL_ARCHIVE_TIMEOUT))
+ fail("Expected " + expConflictResolverCnt + " but was " + conflictCheckedCntr.get());
- return true;
- }
+ long expWalRecCnt = (long)KEYS_CNT * gridCnt;
- @Override public void start(MetricRegistry mreg) {
- // No-op.
- }
+ if (!waitForCondition(() -> walRecCheckedCntr.get() == expWalRecCnt, WAL_ARCHIVE_TIMEOUT))
+ fail("Expected " + expWalRecCnt + " but was " + walRecCheckedCntr.get());
+
+ conflictCheckedCntr.set(0);
+ walRecCheckedCntr.set(0);
+ }
- @Override public void stop() {
- // No-op.
+ /** */
+ @Test
+ public void testOrderIncrease() throws Exception {
+ walProvider = (ctx) -> new FileWriteAheadLogManager(ctx) {
+ /** */
+ private long prevOrder = -1;
+
+ @Override public WALPointer log(WALRecord rec) throws IgniteCheckedException {
+ if (rec.type() != DATA_RECORD_V2)
+ return super.log(rec);
+
+
+ DataRecord dataRec = (DataRecord)rec;
+
+ for (int i = 0; i < dataRec.entryCount(); i++) {
+ assertEquals(CU.cacheId(DEFAULT_CACHE_NAME), dataRec.get(i).cacheId());
+ assertEquals(KEY_TO_UPD, (int)dataRec.get(i).key().value(null, false));
+ assertTrue(dataRec.get(i).writeVersion().order() > prevOrder);
+
+ prevOrder = dataRec.get(i).writeVersion().order();
+
+ walRecCheckedCntr.incrementAndGet();
+ }
+
+ return super.log(rec);
}
};
- CdcMain cdc = createCdc(cnsmr, cfg);
+ IgniteConfiguration cfg = getConfiguration("ignite-0");
+
+ IgniteEx ign = startGrid(cfg);
+
+ ign.cluster().state(ACTIVE);
- IgniteCache<Integer, User> cache = ign.getOrCreateCache("my-cache");
+ IgniteCache<Integer, User> cache = ign.getOrCreateCache(
+ new CacheConfiguration<Integer, User>(DEFAULT_CACHE_NAME)
+ .setAtomicityMode(atomicityMode)
+ .setCacheMode(cacheMode));
- IgniteInternalFuture<?> fut = runAsync(cdc);
+ walRecCheckedCntr.set(0);
// Update the same key several time.
// Expect {@link CacheEntryVersion#order()} will monotically increase.
for (int i = 0; i < KEYS_CNT; i++)
cache.put(KEY_TO_UPD, createUser(i));
- assertTrue(waitForCondition(() -> updCntr.get() == KEYS_CNT, getTestTimeout()));
-
- fut.cancel();
+ assertTrue(waitForCondition(() -> walRecCheckedCntr.get() == KEYS_CNT, getTestTimeout()));
}
/** */
- private void addConflictData(IgniteCache<Integer, User> cache, int from, int to) {
+ private void addConflictData(
+ IgniteEx cli,
+ IgniteCache<Integer, User> cache,
+ int from,
+ int to,
+ TransactionConcurrency concurrency,
+ TransactionIsolation isolation
+ ) {
try {
- IgniteEx ign = (IgniteEx)G.allGrids().get(0);
-
- IgniteInternalCache<Integer, User> intCache = ign.cachex(cache.getName());
+ IgniteInternalCache<Integer, User> intCache = cli.cachex(cache.getName());
Map<KeyCacheObject, GridCacheDrInfo> drMap = new HashMap<>();
for (int i = from; i < to; i++) {
KeyCacheObject key = new KeyCacheObjectImpl(i, null, intCache.affinity().partition(i));
- CacheObject val =
- new CacheObjectImpl(createUser(i), null);
+ CacheObject val = new CacheObjectImpl(createUser(i), null);
val.prepareMarshal(intCache.context().cacheObjectContext());
drMap.put(key, new GridCacheDrInfo(val, new GridCacheVersion(1, i, 1, OTHER_CLUSTER_ID)));
}
- intCache.putAllConflict(drMap);
+ if (concurrency != null) {
+ try (Transaction tx = cli.transactions().txStart(concurrency, isolation)) {
+ intCache.putAllConflict(drMap);
+ tx.commit();
+ }
+ }
+ else
+ intCache.putAllConflict(drMap);
}
catch (IgniteCheckedException e) {
throw new IgniteException(e);
@@ -231,30 +410,47 @@ public class CdcCacheVersionTest extends AbstractCdcTest {
}
/** */
- public static class TestCacheConflictResolutionManager<K, V> extends GridCacheManagerAdapter<K, V>
+ private void removeConflictData(
+ IgniteEx cli,
+ IgniteCache<Integer, User> cache,
+ int from,
+ int to,
+ TransactionConcurrency concurrency,
+ TransactionIsolation isolation
+ ) {
+ try {
+ IgniteInternalCache<Integer, User> intCache = cli.cachex(cache.getName());
+
+ Map<KeyCacheObject, GridCacheVersion> drMap = new HashMap<>();
+
+ for (int i = from; i < to; i++) {
+ drMap.put(
+ new KeyCacheObjectImpl(i, null, intCache.affinity().partition(i)),
+ new GridCacheVersion(1, i, 1, OTHER_CLUSTER_ID)
+ );
+ }
+
+ if (concurrency != null) {
+ try (Transaction tx = cli.transactions().txStart(concurrency, isolation)) {
+ intCache.removeAllConflict(drMap);
+ tx.commit();
+ }
+ }
+ else
+ intCache.removeAllConflict(drMap);
+ }
+ catch (IgniteCheckedException e) {
+ throw new IgniteException(e);
+ }
+ }
+
+ /** */
+ public class TestCacheConflictResolutionManager<K, V> extends GridCacheManagerAdapter<K, V>
implements CacheConflictResolutionManager<K, V> {
/** {@inheritDoc} */
@Override public CacheVersionConflictResolver conflictResolver() {
- return new CacheVersionConflictResolver() {
- @Override public <K1, V1> GridCacheVersionConflictContext<K1, V1> resolve(
- CacheObjectValueContext ctx,
- GridCacheVersionedEntryEx<K1, V1> oldEntry,
- GridCacheVersionedEntryEx<K1, V1> newEntry,
- boolean atomicVerComparator
- ) {
- GridCacheVersionConflictContext<K1, V1> res =
- new GridCacheVersionConflictContext<>(ctx, oldEntry, newEntry);
-
- res.useNew();
-
- return res;
- }
-
- @Override public String toString() {
- return "TestCacheConflictResolutionManager";
- }
- };
+ return conflictResolutionMgrSupplier.get();
}
}
}