You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ni...@apache.org on 2022/02/22 15:32:34 UTC

[ignite] branch master updated: IGNITE-16587 Fix putAllConflict, removeAllConflict for tx caches (#9836)

This is an automated email from the ASF dual-hosted git repository.

nizhikov pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/master by this push:
     new 4b81346  IGNITE-16587 Fix putAllConflict, removeAllConflict for tx caches (#9836)
4b81346 is described below

commit 4b813466bbf01d5335da7f25d7c1ffe8e08f51a6
Author: Nikolay <ni...@apache.org>
AuthorDate: Tue Feb 22 18:31:56 2022 +0300

    IGNITE-16587 Fix putAllConflict, removeAllConflict for tx caches (#9836)
---
 .../processors/cache/GridCacheMapEntry.java        |  18 +-
 .../GridDistributedTxRemoteAdapter.java            |   3 +-
 .../dht/colocated/GridDhtDetachedCacheEntry.java   |   8 +-
 .../cache/distributed/near/GridNearCacheEntry.java |   8 +-
 .../cache/distributed/near/GridNearTxLocal.java    |  28 +-
 .../cache/version/GridCacheVersionEx.java          |  14 +-
 .../org/apache/ignite/cdc/CdcCacheVersionTest.java | 380 ++++++++++++++++-----
 7 files changed, 338 insertions(+), 121 deletions(-)

diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
index 456f652..951b41c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
@@ -79,7 +79,6 @@ import org.apache.ignite.internal.processors.cache.tree.mvcc.data.ResultType;
 import org.apache.ignite.internal.processors.cache.version.GridCacheLazyPlainVersionedEntry;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersionConflictContext;
-import org.apache.ignite.internal.processors.cache.version.GridCacheVersionEx;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersionedEntryEx;
 import org.apache.ignite.internal.processors.dr.GridDrType;
 import org.apache.ignite.internal.processors.platform.PlatformProcessor;
@@ -129,6 +128,7 @@ import static org.apache.ignite.internal.processors.cache.distributed.dht.topolo
 import static org.apache.ignite.internal.processors.cache.mvcc.MvccUtils.MVCC_MAX_SNAPSHOT;
 import static org.apache.ignite.internal.processors.cache.mvcc.MvccUtils.compareIgnoreOpCounter;
 import static org.apache.ignite.internal.processors.cache.persistence.CacheDataRowAdapter.RowData.NO_KEY;
+import static org.apache.ignite.internal.processors.cache.version.GridCacheVersionEx.addConflictVersion;
 import static org.apache.ignite.internal.processors.dr.GridDrType.DR_NONE;
 
 /**
@@ -1562,7 +1562,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
             updateCntr0 = nextPartitionCounter(tx, updateCntr);
 
             if (tx != null && cctx.group().persistenceEnabled() && cctx.group().walEnabled())
-                logPtr = logTxUpdate(tx, val, expireTime, updateCntr0);
+                logPtr = logTxUpdate(tx, val, addConflictVersion(tx.writeVersion(), newVer), expireTime, updateCntr0);
 
             update(val, expireTime, ttl, newVer, true);
 
@@ -1787,7 +1787,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
             updateCntr0 = nextPartitionCounter(tx, updateCntr);
 
             if (tx != null && cctx.group().persistenceEnabled() && cctx.group().walEnabled())
-                logPtr = logTxUpdate(tx, null, 0, updateCntr0);
+                logPtr = logTxUpdate(tx, null, addConflictVersion(tx.writeVersion(), newVer), 0, updateCntr0);
 
             drReplicate(drType, null, newVer, topVer);
 
@@ -4352,6 +4352,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
     /**
      * @param tx Transaction.
      * @param val Value.
+     * @param writeVer New entry version.
      * @param expireTime Expire time (or 0 if not applicable).
      * @param updCntr Update counter.
      * @throws IgniteCheckedException In case of log failure.
@@ -4359,6 +4360,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
     protected WALPointer logTxUpdate(
         IgniteInternalTx tx,
         CacheObject val,
+        GridCacheVersion writeVer,
         long expireTime,
         long updCntr
     ) throws IgniteCheckedException {
@@ -4377,7 +4379,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
                 val,
                 op,
                 tx.nearXidVersion(),
-                tx.writeVersion(),
+                writeVer,
                 expireTime,
                 key.partition(),
                 updCntr,
@@ -6229,13 +6231,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
             }
 
             // Incorporate conflict version into new version if needed.
-            if (conflictVer != null && conflictVer != newVer) {
-                newVer = new GridCacheVersionEx(newVer.topologyVersion(),
-                    newVer.order(),
-                    newVer.nodeOrder(),
-                    newVer.dataCenterId(),
-                    conflictVer);
-            }
+            newVer = addConflictVersion(newVer, conflictVer);
 
             if (op == UPDATE) {
                 assert writeObj != null;
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java
index 89be03f..72be779 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java
@@ -88,6 +88,7 @@ import static org.apache.ignite.internal.processors.cache.GridCacheOperation.REL
 import static org.apache.ignite.internal.processors.cache.GridCacheOperation.UPDATE;
 import static org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState.EVICTED;
 import static org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState.RENTING;
+import static org.apache.ignite.internal.processors.cache.version.GridCacheVersionEx.addConflictVersion;
 import static org.apache.ignite.internal.processors.dr.GridDrType.DR_BACKUP;
 import static org.apache.ignite.internal.processors.dr.GridDrType.DR_NONE;
 import static org.apache.ignite.transactions.TransactionState.COMMITTED;
@@ -628,7 +629,7 @@ public abstract class GridDistributedTxRemoteAdapter extends IgniteTxAdapter
                                                     val,
                                                     op,
                                                     nearXidVersion(),
-                                                    writeVersion(),
+                                                    addConflictVersion(writeVersion(), txEntry.conflictVersion()),
                                                     0,
                                                     txEntry.key().partition(),
                                                     txEntry.updateCounter(),
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtDetachedCacheEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtDetachedCacheEntry.java
index 7c66afb..64fbb03 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtDetachedCacheEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtDetachedCacheEntry.java
@@ -85,7 +85,13 @@ public class GridDhtDetachedCacheEntry extends GridDistributedCacheEntry {
     }
 
     /** {@inheritDoc} */
-    @Override protected WALPointer logTxUpdate(IgniteInternalTx tx, CacheObject val, long expireTime, long updCntr) {
+    @Override protected WALPointer logTxUpdate(
+        IgniteInternalTx tx,
+        CacheObject val,
+        GridCacheVersion writeVer,
+        long expireTime,
+        long updCntr
+    ) {
         return null;
     }
 
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheEntry.java
index d5d201e..1eee3f3 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheEntry.java
@@ -473,7 +473,13 @@ public class GridNearCacheEntry extends GridDistributedCacheEntry {
     }
 
     /** {@inheritDoc} */
-    @Override protected WALPointer logTxUpdate(IgniteInternalTx tx, CacheObject val, long expireTime, long updCntr) {
+    @Override protected WALPointer logTxUpdate(
+        IgniteInternalTx tx,
+        CacheObject val,
+        GridCacheVersion writeVer,
+        long expireTime,
+        long updCntr
+    ) {
         return null;
     }
 
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
index 733ec86..cbea2b4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
@@ -1253,20 +1253,20 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements GridTimeou
                     else if (drRmvMap != null) {
                         assert drRmvMap.get(key) != null;
 
-                    drVer = drRmvMap.get(key);
-                    drTtl = -1L;
-                    drExpireTime = -1L;
-                }
-                else if (dataCenterId != null) {
-                    drVer = cacheCtx.cache().nextVersion(dataCenterId);
-                    drTtl = -1L;
-                    drExpireTime = -1L;
-                }
-                else {
-                    drVer = null;
-                    drTtl = -1L;
-                    drExpireTime = -1L;
-                }
+                        drVer = drRmvMap.get(key);
+                        drTtl = -1L;
+                        drExpireTime = -1L;
+                    }
+                    else if (dataCenterId != null) {
+                        drVer = cacheCtx.cache().nextVersion(dataCenterId);
+                        drTtl = -1L;
+                        drExpireTime = -1L;
+                    }
+                    else {
+                        drVer = null;
+                        drTtl = -1L;
+                        drExpireTime = -1L;
+                    }
 
                     if (!rmv && val == null && entryProcessor == null) {
                         setRollbackOnly();
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/GridCacheVersionEx.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/GridCacheVersionEx.java
index 4848893..5a08a7d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/GridCacheVersionEx.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/GridCacheVersionEx.java
@@ -52,7 +52,7 @@ public class GridCacheVersionEx extends GridCacheVersion {
      * @param dataCenterId Data center ID.
      * @param drVer DR version.
      */
-    public GridCacheVersionEx(int topVer, long order, int nodeOrder, byte dataCenterId,
+    private GridCacheVersionEx(int topVer, long order, int nodeOrder, byte dataCenterId,
         GridCacheVersion drVer) {
         super(topVer, order, nodeOrder, dataCenterId);
 
@@ -171,4 +171,16 @@ public class GridCacheVersionEx extends GridCacheVersion {
             ", dataCenterId=" + dataCenterId() +
             ", drVer=" + drVer + ']';
     }
+
+    /** @return If {@code ver != conflictVer} then {@code ver} with {@code conflictVer} added to it. */
+    public static GridCacheVersion addConflictVersion(GridCacheVersion ver, GridCacheVersion conflictVer) {
+        if (conflictVer == null || conflictVer == ver)
+            return ver;
+
+        return new GridCacheVersionEx(ver.topologyVersion(),
+            ver.order(),
+            ver.nodeOrder(),
+            ver.dataCenterId(),
+            conflictVer);
+    }
 }
diff --git a/modules/core/src/test/java/org/apache/ignite/cdc/CdcCacheVersionTest.java b/modules/core/src/test/java/org/apache/ignite/cdc/CdcCacheVersionTest.java
index efea107..8d830c4 100644
--- a/modules/core/src/test/java/org/apache/ignite/cdc/CdcCacheVersionTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/cdc/CdcCacheVersionTest.java
@@ -17,19 +17,31 @@
 
 package org.apache.ignite.cdc;
 
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.EnumSet;
 import java.util.HashMap;
-import java.util.Iterator;
+import java.util.List;
 import java.util.Map;
 import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Function;
+import java.util.function.Supplier;
 import org.apache.ignite.IgniteCache;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteException;
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.cache.CacheMode;
+import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.configuration.DataRegionConfiguration;
 import org.apache.ignite.configuration.DataStorageConfiguration;
 import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.GridKernalContext;
 import org.apache.ignite.internal.IgniteEx;
-import org.apache.ignite.internal.IgniteInternalFuture;
-import org.apache.ignite.internal.cdc.CdcMain;
+import org.apache.ignite.internal.IgniteInterruptedCheckedException;
+import org.apache.ignite.internal.pagemem.wal.IgniteWriteAheadLogManager;
+import org.apache.ignite.internal.pagemem.wal.record.DataEntry;
+import org.apache.ignite.internal.pagemem.wal.record.DataRecord;
+import org.apache.ignite.internal.pagemem.wal.record.WALRecord;
 import org.apache.ignite.internal.processors.cache.CacheConflictResolutionManager;
 import org.apache.ignite.internal.processors.cache.CacheObject;
 import org.apache.ignite.internal.processors.cache.CacheObjectImpl;
@@ -39,35 +51,44 @@ import org.apache.ignite.internal.processors.cache.IgniteInternalCache;
 import org.apache.ignite.internal.processors.cache.KeyCacheObject;
 import org.apache.ignite.internal.processors.cache.KeyCacheObjectImpl;
 import org.apache.ignite.internal.processors.cache.dr.GridCacheDrInfo;
+import org.apache.ignite.internal.processors.cache.persistence.wal.FileWriteAheadLogManager;
+import org.apache.ignite.internal.processors.cache.persistence.wal.WALPointer;
 import org.apache.ignite.internal.processors.cache.version.CacheVersionConflictResolver;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersionConflictContext;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersionedEntryEx;
-import org.apache.ignite.internal.processors.metric.MetricRegistry;
-import org.apache.ignite.internal.util.typedef.G;
+import org.apache.ignite.internal.util.typedef.internal.CU;
 import org.apache.ignite.plugin.AbstractCachePluginProvider;
 import org.apache.ignite.plugin.AbstractTestPluginProvider;
 import org.apache.ignite.plugin.CachePluginContext;
 import org.apache.ignite.plugin.CachePluginProvider;
+import org.apache.ignite.plugin.PluginContext;
 import org.apache.ignite.spi.metric.IntMetric;
 import org.apache.ignite.spi.systemview.view.CacheView;
 import org.apache.ignite.spi.systemview.view.SystemView;
+import org.apache.ignite.transactions.Transaction;
+import org.apache.ignite.transactions.TransactionConcurrency;
+import org.apache.ignite.transactions.TransactionIsolation;
 import org.jetbrains.annotations.Nullable;
 import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
 
+import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC;
+import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
+import static org.apache.ignite.cache.CacheMode.PARTITIONED;
+import static org.apache.ignite.cache.CacheMode.REPLICATED;
 import static org.apache.ignite.cluster.ClusterState.ACTIVE;
+import static org.apache.ignite.internal.pagemem.wal.record.WALRecord.RecordType.DATA_RECORD_V2;
 import static org.apache.ignite.internal.processors.cache.CacheMetricsImpl.CACHE_METRICS;
 import static org.apache.ignite.internal.processors.cache.ClusterCachesInfo.CACHES_VIEW;
 import static org.apache.ignite.internal.processors.cache.version.GridCacheVersionManager.DATA_VER_CLUSTER_ID;
-import static org.apache.ignite.testframework.GridTestUtils.runAsync;
 import static org.apache.ignite.testframework.GridTestUtils.waitForCondition;
 
 /** */
+@RunWith(Parameterized.class)
 public class CdcCacheVersionTest extends AbstractCdcTest {
     /** */
-    public static final String FOR_OTHER_CLUSTER_ID = "for-other-cluster-id";
-
-    /** */
     public static final byte DFLT_CLUSTER_ID = 1;
 
     /** */
@@ -76,6 +97,43 @@ public class CdcCacheVersionTest extends AbstractCdcTest {
     /** */
     public static final int KEY_TO_UPD = 42;
 
+    /** */
+    @Parameterized.Parameter
+    public CacheAtomicityMode atomicityMode;
+
+    /** */
+    @Parameterized.Parameter(1)
+    public CacheMode cacheMode;
+
+    /** */
+    @Parameterized.Parameter(2)
+    public int gridCnt;
+
+    /** */
+    private final AtomicLong walRecCheckedCntr = new AtomicLong();
+
+    /** */
+    private final AtomicLong conflictCheckedCntr = new AtomicLong();
+
+    /** */
+    private volatile Function<GridKernalContext, IgniteWriteAheadLogManager> walProvider;
+
+    /** */
+    private volatile Supplier<CacheVersionConflictResolver> conflictResolutionMgrSupplier;
+
+    /** */
+    @Parameterized.Parameters(name = "atomicity={0}, mode={1}, gridCnt={2}")
+    public static Collection<?> parameters() {
+        List<Object[]> params = new ArrayList<>();
+
+        for (CacheAtomicityMode atomicity : EnumSet.of(ATOMIC, TRANSACTIONAL))
+            for (CacheMode mode : EnumSet.of(PARTITIONED, REPLICATED))
+                for (int gridCnt : new int[] {1, 3})
+                    params.add(new Object[] {atomicity, mode, gridCnt});
+
+        return params;
+    }
+
     /** {@inheritDoc} */
     @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
         IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
@@ -91,139 +149,260 @@ public class CdcCacheVersionTest extends AbstractCdcTest {
             }
 
             @Override public CachePluginProvider createCacheProvider(CachePluginContext ctx) {
-                if (!ctx.igniteCacheConfiguration().getName().equals(FOR_OTHER_CLUSTER_ID))
+                if (!ctx.igniteCacheConfiguration().getName().equals(DEFAULT_CACHE_NAME))
                     return null;
 
                 return new AbstractCachePluginProvider() {
                     @Override public @Nullable Object createComponent(Class cls) {
-                        if (cls != CacheConflictResolutionManager.class)
+                        if (cls != CacheConflictResolutionManager.class || conflictResolutionMgrSupplier == null)
                             return null;
 
-                        return new TestCacheConflictResolutionManager();
+                        return new TestCacheConflictResolutionManager<>();
                     }
                 };
             }
+
+            @Override public <T> @Nullable T createComponent(PluginContext ctx, Class<T> cls) {
+                if (IgniteWriteAheadLogManager.class.equals(cls))
+                    return (T)walProvider.apply(((IgniteEx)ctx.grid()).context());
+
+                return null;
+            }
         });
 
         return cfg;
     }
 
-    /** Simplest CDC test with usage of {@link IgniteInternalCache#putAllConflict(Map)}. */
+    /** Test that conflict version is writtern to WAL. */
     @Test
-    public void testReadAllKeysFromOtherCluster() throws Exception {
-        IgniteConfiguration cfg = getConfiguration("ignite-conflict-resolver");
+    public void testConflictVersionWritten() throws Exception {
+        walProvider = (ctx) -> new FileWriteAheadLogManager(ctx) {
+            @Override public WALPointer log(WALRecord rec) throws IgniteCheckedException {
+                if (rec.type() != DATA_RECORD_V2)
+                    return super.log(rec);
 
-        IgniteEx ign = startGrid(cfg);
+                DataRecord dataRec = (DataRecord)rec;
 
-        ign.context().cache().context().versions().dataCenterId(DFLT_CLUSTER_ID);
-        ign.cluster().state(ACTIVE);
+                for (int i = 0; i < dataRec.entryCount(); i++) {
+                    DataEntry dataEntry = dataRec.writeEntries().get(i);
+
+                    assertEquals(CU.cacheId(DEFAULT_CACHE_NAME), dataEntry.cacheId());
+                    assertEquals(DFLT_CLUSTER_ID, dataEntry.writeVersion().dataCenterId());
+                    assertNotNull(dataEntry.writeVersion().conflictVersion());
+                    assertEquals(OTHER_CLUSTER_ID, dataEntry.writeVersion().conflictVersion().dataCenterId());
+
+                    walRecCheckedCntr.incrementAndGet();
+                }
 
-        UserCdcConsumer cnsmr = new UserCdcConsumer() {
-            @Override public void checkEvent(CdcEvent evt) {
-                assertEquals(DFLT_CLUSTER_ID, evt.version().clusterId());
-                assertEquals(OTHER_CLUSTER_ID, evt.version().otherClusterVersion().clusterId());
+                return super.log(rec);
             }
         };
 
-        IgniteCache<Integer, User> cache = ign.getOrCreateCache(FOR_OTHER_CLUSTER_ID);
+        conflictResolutionMgrSupplier = () -> new CacheVersionConflictResolver() {
+            @Override public <K1, V1> GridCacheVersionConflictContext<K1, V1> resolve(
+                CacheObjectValueContext ctx,
+                GridCacheVersionedEntryEx<K1, V1> oldEntry,
+                GridCacheVersionedEntryEx<K1, V1> newEntry,
+                boolean atomicVerComparator
+            ) {
+                GridCacheVersionConflictContext<K1, V1> res =
+                    new GridCacheVersionConflictContext<>(ctx, oldEntry, newEntry);
 
-        addAndWaitForConsumption(cnsmr, cfg, cache, null, this::addConflictData, 0, KEYS_CNT, true);
+                res.useNew();
 
-        assertEquals(
-            DFLT_CLUSTER_ID,
-            ign.context().metric().registry(CACHE_METRICS).<IntMetric>findMetric(DATA_VER_CLUSTER_ID).value()
-        );
+                assertEquals(OTHER_CLUSTER_ID, newEntry.version().dataCenterId());
 
-        boolean found = false;
+                if (!oldEntry.isStartVersion())
+                    assertEquals(OTHER_CLUSTER_ID, oldEntry.version().dataCenterId());
 
-        SystemView<CacheView> caches = ign.context().systemView().view(CACHES_VIEW);
+                conflictCheckedCntr.incrementAndGet();
 
-        for (CacheView v : caches) {
-            if (v.cacheName().equals(FOR_OTHER_CLUSTER_ID)) {
-                assertEquals(v.conflictResolver(), "TestCacheConflictResolutionManager");
+                return res;
+            }
 
-                found = true;
+            @Override public String toString() {
+                return "TestCacheConflictResolutionManager";
             }
-            else
-                assertNull(v.conflictResolver());
+        };
+
+        startGrids(gridCnt);
+        IgniteEx cli = startClientGrid(gridCnt);
+
+        for (int i = 0; i < gridCnt; i++) {
+            grid(i).context().cache().context().versions().dataCenterId(DFLT_CLUSTER_ID);
+
+            assertEquals(
+                DFLT_CLUSTER_ID,
+                grid(i).context().metric().registry(CACHE_METRICS).<IntMetric>findMetric(DATA_VER_CLUSTER_ID).value()
+            );
+        }
+
+        cli.cluster().state(ACTIVE);
+
+        IgniteCache<Integer, User> cache = cli.getOrCreateCache(
+            new CacheConfiguration<Integer, User>(DEFAULT_CACHE_NAME)
+                .setCacheMode(cacheMode)
+                .setAtomicityMode(atomicityMode)
+                .setBackups(Integer.MAX_VALUE));
+
+        if (atomicityMode == ATOMIC)
+            putRemoveCheck(cli, cache, null, null);
+        else {
+            // Check operations for transaction cache without explicit transaction.
+            putRemoveCheck(cli, cache, null, null);
+
+            // Check operations for transaction cache with explicit transaction in all modes.
+            for (TransactionConcurrency concurrency : TransactionConcurrency.values())
+                for (TransactionIsolation isolation : TransactionIsolation.values())
+                    putRemoveCheck(cli, cache, concurrency, isolation);
+        }
+
+        for (int i = 0; i < gridCnt; i++) {
+            boolean dfltCacheFound = false;
+
+            assertFalse(grid(i).context().clientNode());
+
+            SystemView<CacheView> caches = grid(i).context().systemView().view(CACHES_VIEW);
+
+            for (CacheView v : caches) {
+                if (v.cacheName().equals(DEFAULT_CACHE_NAME)) {
+                    assertEquals(v.conflictResolver(), "TestCacheConflictResolutionManager");
+
+                    dfltCacheFound = true;
+                }
+                else
+                    assertNull(v.conflictResolver());
+            }
+
+            assertTrue(dfltCacheFound);
         }
 
-        assertTrue(found);
     }
 
     /** */
-    @Test
-    public void testOrderIncrease() throws Exception {
-        IgniteConfiguration cfg = getConfiguration("ignite-0");
+    private void putRemoveCheck(
+        IgniteEx cli,
+        IgniteCache<Integer, User> cache,
+        TransactionConcurrency concurrency,
+        TransactionIsolation isolation
+    ) throws Exception {
+        conflictCheckedCntr.set(0);
+        walRecCheckedCntr.set(0);
 
-        IgniteEx ign = startGrid(cfg);
+        // Put data with conflict version.
+        // Conflict version will be checked during WAL record and conflict resolution.
+        addConflictData(cli, cache, 0, KEYS_CNT, concurrency, isolation);
 
-        ign.cluster().state(ACTIVE);
+        checkResolverAndWal();
 
-        AtomicLong updCntr = new AtomicLong(0);
+        // Replacing existing data.
+        addConflictData(cli, cache, 0, KEYS_CNT, concurrency, isolation);
 
-        CdcConsumer cnsmr = new CdcConsumer() {
-            private long order = -1;
+        checkResolverAndWal();
 
-            @Override public boolean onEvents(Iterator<CdcEvent> evts) {
-                evts.forEachRemaining(evt -> {
-                    assertEquals(KEY_TO_UPD, evt.key());
+        // Removing existing data with conflict version.
+        removeConflictData(cli, cache, 0, KEYS_CNT, concurrency, isolation);
 
-                    assertTrue(evt.version().order() > order);
+        checkResolverAndWal();
+    }
 
-                    order = evt.version().order();
+    /** */
+    private void checkResolverAndWal() throws IgniteInterruptedCheckedException {
+        // Conflict resolver for ATOMIC caches invoked only on primary.
+        long expConflictResolverCnt = atomicityMode == ATOMIC ? KEYS_CNT : (KEYS_CNT * (long)gridCnt);
 
-                    updCntr.incrementAndGet();
-                });
+        if (!waitForCondition(() -> conflictCheckedCntr.get() == expConflictResolverCnt, WAL_ARCHIVE_TIMEOUT))
+            fail("Expected " + expConflictResolverCnt + " but was " + conflictCheckedCntr.get());
 
-                return true;
-            }
+        long expWalRecCnt = (long)KEYS_CNT * gridCnt;
 
-            @Override public void start(MetricRegistry mreg) {
-                // No-op.
-            }
+        if (!waitForCondition(() -> walRecCheckedCntr.get() == expWalRecCnt, WAL_ARCHIVE_TIMEOUT))
+            fail("Expected " + expWalRecCnt + " but was " + walRecCheckedCntr.get());
+
+        conflictCheckedCntr.set(0);
+        walRecCheckedCntr.set(0);
+    }
 
-            @Override public void stop() {
-                // No-op.
+    /** */
+    @Test
+    public void testOrderIncrease() throws Exception {
+        walProvider = (ctx) -> new FileWriteAheadLogManager(ctx) {
+            /** */
+            private long prevOrder = -1;
+
+            @Override public WALPointer log(WALRecord rec) throws IgniteCheckedException {
+                if (rec.type() != DATA_RECORD_V2)
+                    return super.log(rec);
+
+
+                DataRecord dataRec = (DataRecord)rec;
+
+                for (int i = 0; i < dataRec.entryCount(); i++) {
+                    assertEquals(CU.cacheId(DEFAULT_CACHE_NAME), dataRec.get(i).cacheId());
+                    assertEquals(KEY_TO_UPD, (int)dataRec.get(i).key().value(null, false));
+                    assertTrue(dataRec.get(i).writeVersion().order() > prevOrder);
+
+                    prevOrder = dataRec.get(i).writeVersion().order();
+
+                    walRecCheckedCntr.incrementAndGet();
+                }
+
+                return super.log(rec);
             }
         };
 
-        CdcMain cdc = createCdc(cnsmr, cfg);
+        IgniteConfiguration cfg = getConfiguration("ignite-0");
+
+        IgniteEx ign = startGrid(cfg);
+
+        ign.cluster().state(ACTIVE);
 
-        IgniteCache<Integer, User> cache = ign.getOrCreateCache("my-cache");
+        IgniteCache<Integer, User> cache = ign.getOrCreateCache(
+            new CacheConfiguration<Integer, User>(DEFAULT_CACHE_NAME)
+                .setAtomicityMode(atomicityMode)
+                .setCacheMode(cacheMode));
 
-        IgniteInternalFuture<?> fut = runAsync(cdc);
+        walRecCheckedCntr.set(0);
 
         // Update the same key several time.
         // Expect {@link CacheEntryVersion#order()} will monotically increase.
         for (int i = 0; i < KEYS_CNT; i++)
             cache.put(KEY_TO_UPD, createUser(i));
 
-        assertTrue(waitForCondition(() -> updCntr.get() == KEYS_CNT, getTestTimeout()));
-
-        fut.cancel();
+        assertTrue(waitForCondition(() -> walRecCheckedCntr.get() == KEYS_CNT, getTestTimeout()));
     }
 
     /** */
-    private void addConflictData(IgniteCache<Integer, User> cache, int from, int to) {
+    private void addConflictData(
+        IgniteEx cli,
+        IgniteCache<Integer, User> cache,
+        int from,
+        int to,
+        TransactionConcurrency concurrency,
+        TransactionIsolation isolation
+    ) {
         try {
-            IgniteEx ign = (IgniteEx)G.allGrids().get(0);
-
-            IgniteInternalCache<Integer, User> intCache = ign.cachex(cache.getName());
+            IgniteInternalCache<Integer, User> intCache = cli.cachex(cache.getName());
 
             Map<KeyCacheObject, GridCacheDrInfo> drMap = new HashMap<>();
 
             for (int i = from; i < to; i++) {
                 KeyCacheObject key = new KeyCacheObjectImpl(i, null, intCache.affinity().partition(i));
-                CacheObject val =
-                    new CacheObjectImpl(createUser(i), null);
+                CacheObject val = new CacheObjectImpl(createUser(i), null);
 
                 val.prepareMarshal(intCache.context().cacheObjectContext());
 
                 drMap.put(key, new GridCacheDrInfo(val, new GridCacheVersion(1, i, 1, OTHER_CLUSTER_ID)));
             }
 
-            intCache.putAllConflict(drMap);
+            if (concurrency != null) {
+                try (Transaction tx = cli.transactions().txStart(concurrency, isolation)) {
+                    intCache.putAllConflict(drMap);
+                    tx.commit();
+                }
+            }
+            else
+                intCache.putAllConflict(drMap);
         }
         catch (IgniteCheckedException e) {
             throw new IgniteException(e);
@@ -231,30 +410,47 @@ public class CdcCacheVersionTest extends AbstractCdcTest {
     }
 
     /** */
-    public static class TestCacheConflictResolutionManager<K, V> extends GridCacheManagerAdapter<K, V>
+    private void removeConflictData(
+        IgniteEx cli,
+        IgniteCache<Integer, User> cache,
+        int from,
+        int to,
+        TransactionConcurrency concurrency,
+        TransactionIsolation isolation
+    ) {
+        try {
+            IgniteInternalCache<Integer, User> intCache = cli.cachex(cache.getName());
+
+            Map<KeyCacheObject, GridCacheVersion> drMap = new HashMap<>();
+
+            for (int i = from; i < to; i++) {
+                drMap.put(
+                    new KeyCacheObjectImpl(i, null, intCache.affinity().partition(i)),
+                    new GridCacheVersion(1, i, 1, OTHER_CLUSTER_ID)
+                );
+            }
+
+            if (concurrency != null) {
+                try (Transaction tx = cli.transactions().txStart(concurrency, isolation)) {
+                    intCache.removeAllConflict(drMap);
+                    tx.commit();
+                }
+            }
+            else
+                intCache.removeAllConflict(drMap);
+        }
+        catch (IgniteCheckedException e) {
+            throw new IgniteException(e);
+        }
+    }
+
+    /** */
+    public class TestCacheConflictResolutionManager<K, V> extends GridCacheManagerAdapter<K, V>
         implements CacheConflictResolutionManager<K, V> {
 
         /** {@inheritDoc} */
         @Override public CacheVersionConflictResolver conflictResolver() {
-            return new CacheVersionConflictResolver() {
-                @Override public <K1, V1> GridCacheVersionConflictContext<K1, V1> resolve(
-                    CacheObjectValueContext ctx,
-                    GridCacheVersionedEntryEx<K1, V1> oldEntry,
-                    GridCacheVersionedEntryEx<K1, V1> newEntry,
-                    boolean atomicVerComparator
-                ) {
-                    GridCacheVersionConflictContext<K1, V1> res =
-                        new GridCacheVersionConflictContext<>(ctx, oldEntry, newEntry);
-
-                    res.useNew();
-
-                    return res;
-                }
-
-                @Override public String toString() {
-                    return "TestCacheConflictResolutionManager";
-                }
-            };
+            return conflictResolutionMgrSupplier.get();
         }
     }
 }