You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@ignite.apache.org by GitBox <gi...@apache.org> on 2022/03/11 08:16:38 UTC

[GitHub] [ignite] timoninmaxim commented on a change in pull request #9345: IGNITE-15117 CDC for in-memory caches

timoninmaxim commented on a change in pull request #9345:
URL: https://github.com/apache/ignite/pull/9345#discussion_r824425547



##########
File path: modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupContext.java
##########
@@ -1095,6 +1096,14 @@ public boolean persistenceEnabled() {
         return persistenceEnabled;
     }
 
+    /**
+     * @return Persistence or CDC enabled flag.
+     */
+    public boolean cdcEnabled() {

Review comment:
       Name is misleading, as it can return `true` for disabled CDC.

##########
File path: modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/BPlusTreePageMemoryImplTest.java
##########
@@ -77,15 +78,18 @@
         cctx.add(new GridMetricManager(cctx));
         cctx.add(new GridSystemViewManager(cctx));
 
+        IgniteWriteAheadLogManager walMgr = new NoOpWALManager();
+
         GridCacheSharedContext<Object, Object> sharedCtx = new GridCacheSharedContext<>(
             cctx,
             null,
             null,
             null,
             new NoOpPageStoreManager(),
-            new NoOpWALManager(),
+            walMgr,
+            walMgr,

Review comment:
       I think, we should setup `cdcWalMgr` with `null` for most of tests.

##########
File path: modules/core/src/test/java/org/apache/ignite/cdc/AbstractCdcTest.java
##########
@@ -102,7 +103,7 @@ protected CdcMain createCdc(
 
         cdcCfg.setConsumer(cnsmr);
         cdcCfg.setKeepBinary(keepBinary());
-        cdcCfg.setMetricExporterSpi(metricExporters());

Review comment:
       Now `metricExporters()` is obsolete method, let's remove it.

##########
File path: modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/IndexStoragePageMemoryImplTest.java
##########
@@ -92,15 +93,18 @@
         cctx.add(new GridMetricManager(cctx));
         cctx.add(new GridSystemViewManager(cctx));
 
+        IgniteWriteAheadLogManager walMgr = new NoOpWALManager();
+
         GridCacheSharedContext<Object, Object> sharedCtx = new GridCacheSharedContext<>(
             cctx,
             null,
             null,
             null,
             new NoOpPageStoreManager(),
-            new NoOpWALManager(),
+            walMgr,
+            walMgr,

Review comment:
       I think, we should setup `cdcWalMgr` with `null` for most of tests.

##########
File path: modules/core/src/test/java/org/apache/ignite/cdc/WalForCdcTest.java
##########
@@ -0,0 +1,250 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.cdc;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Consumer;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.cache.CacheMode;
+import org.apache.ignite.cluster.ClusterState;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.DataRegionConfiguration;
+import org.apache.ignite.configuration.DataStorageConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.configuration.WALMode;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.pagemem.wal.WALIterator;
+import org.apache.ignite.internal.pagemem.wal.record.DataRecord;
+import org.apache.ignite.internal.pagemem.wal.record.WALRecord;
+import org.apache.ignite.internal.processors.cache.persistence.file.RandomAccessFileIOFactory;
+import org.apache.ignite.internal.processors.cache.persistence.wal.WALPointer;
+import org.apache.ignite.internal.processors.cache.persistence.wal.reader.IgniteWalIteratorFactory;
+import org.apache.ignite.internal.processors.cache.persistence.wal.reader.IgniteWalIteratorFactory.IteratorParametersBuilder;
+import org.apache.ignite.internal.util.typedef.internal.CU;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteBiTuple;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC;
+import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
+import static org.apache.ignite.cache.CacheMode.PARTITIONED;
+import static org.apache.ignite.cache.CacheMode.REPLICATED;
+import static org.apache.ignite.cdc.CdcSelfTest.WAL_ARCHIVE_TIMEOUT;
+import static org.apache.ignite.testframework.GridTestUtils.getFieldValue;
+import static org.apache.ignite.testframework.GridTestUtils.waitForCondition;
+
+/** Check only {@link DataRecord} written to the WAL for in-memory cache. */
+@RunWith(Parameterized.class)
+public class WalForCdcTest extends GridCommonAbstractTest {
+    /** */
+    private static final int RECORD_COUNT = 10;
+
+    /** */
+    @Parameterized.Parameter
+    public CacheMode mode;
+
+    /** */
+    @Parameterized.Parameter(1)
+    public CacheAtomicityMode atomicityMode;
+
+    /** */
+    private boolean persistenceEnabled;
+
+    /** */
+    private boolean cdcEnabled;
+
+    /** */
+    @Parameterized.Parameters(name = "mode={0}, atomicityMode={1}")
+    public static Collection<?> parameters() {
+        List<Object[]> params = new ArrayList<>();
+
+        for (CacheMode mode : Arrays.asList(REPLICATED, PARTITIONED))
+            for (CacheAtomicityMode atomicityMode : Arrays.asList(ATOMIC, TRANSACTIONAL))
+                params.add(new Object[] {mode, atomicityMode});
+
+        return params;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
+
+        cfg.setDataStorageConfiguration(new DataStorageConfiguration()
+            .setWalMode(WALMode.FSYNC)
+            .setWalForceArchiveTimeout(WAL_ARCHIVE_TIMEOUT)
+            .setDefaultDataRegionConfiguration(new DataRegionConfiguration()
+                .setPersistenceEnabled(persistenceEnabled)
+                .setCdcEnabled(cdcEnabled)));
+
+        cfg.setConsistentId(igniteInstanceName);
+
+        return cfg;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTest() throws Exception {
+        stopAllGrids();
+
+        cleanPersistenceDir();
+
+        cdcEnabled = true;
+        persistenceEnabled = false;
+    }
+
+    /** */
+    @Test
+    public void testOnlyDataRecordWritten() throws Exception {
+        IgniteEx ignite = startGrid(0);
+
+        ignite.cluster().state(ClusterState.ACTIVE);
+
+        AtomicInteger cntr = new AtomicInteger();
+
+        // Check only `DataRecords` written in WAL for in-memory cache with CDC enabled.
+        doTestWal(ignite, cache -> {
+            for (int i = 0; i < RECORD_COUNT; i++)
+                cache.put(keyForNode(ignite.affinity(DEFAULT_CACHE_NAME), cntr, ignite.localNode()), i);
+        });
+
+        // Check no WAL written during rebalance.
+        IgniteEx ignite1 = startGrid(1);
+
+        awaitPartitionMapExchange(false, true, null);
+
+        // Can't use `waitForCondition` because if test passed
+        // then no `DataRecords` loged therefore no segment archivation.
+        Thread.sleep(3 * WAL_ARCHIVE_TIMEOUT);
+
+        int walRecCnt = checkDataRecords(ignite1);
+
+        assertEquals(0, walRecCnt);
+
+        // Check `DataRecords` written on second node after rebalance.
+        doTestWal(ignite1, cache -> {
+            for (int i = 0; i < RECORD_COUNT; i++)
+                cache.put(keyForNode(ignite1.affinity(DEFAULT_CACHE_NAME), cntr, ignite1.localNode()), i);
+        });
+    }
+
+    /** */
+    @Test
+    public void testWalDisable() throws Exception {
+        persistenceEnabled = true;
+
+        IgniteEx ignite = startGrid(0);
+
+        ignite.cluster().state(ClusterState.ACTIVE);
+
+        doTestWal(ignite, cache -> {
+            for (int i = 0; i < RECORD_COUNT / 2; i++)
+                cache.put(i, i);
+
+            ignite.cluster().disableWal(DEFAULT_CACHE_NAME);
+
+            for (int i = 0; i < RECORD_COUNT; i++)
+                cache.put(i, i);
+
+            ignite.cluster().enableWal(DEFAULT_CACHE_NAME);
+
+            for (int i = RECORD_COUNT / 2; i < RECORD_COUNT; i++)
+                cache.put(i, i);
+        });
+    }
+
+    /** */
+    @Test
+    public void testWalDisabledIfPersistenceAndCdcDisabled() throws Exception {
+        persistenceEnabled = false;
+        cdcEnabled = false;
+
+        IgniteEx ignite = startGrid(0);
+
+        ignite.cluster().state(ClusterState.ACTIVE);
+
+        ignite.getOrCreateCache(new CacheConfiguration<Integer, Integer>(DEFAULT_CACHE_NAME)
+                .setCacheMode(mode)
+                .setAtomicityMode(atomicityMode));
+
+        assertNull(ignite.context().cache().context().wal());
+        assertNull(getFieldValue(ignite.context().cache().context(), "cdcWalMgr"));
+    }
+
+    /** */
+    private void doTestWal(IgniteEx ignite, Consumer<IgniteCache<Integer, Integer>> putData) throws Exception {
+        IgniteCache<Integer, Integer> cache = ignite.getOrCreateCache(
+            new CacheConfiguration<Integer, Integer>(DEFAULT_CACHE_NAME)
+                .setCacheMode(mode)
+                .setAtomicityMode(atomicityMode));
+
+        long archiveIdx = ignite.context().cache().context().wal(true).lastArchivedSegment();

Review comment:
       WDYT, if we add default method `wal()` --> `wal(false)`?

##########
File path: modules/core/src/test/java/org/apache/ignite/cdc/WalForCdcTest.java
##########
@@ -0,0 +1,250 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.cdc;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Consumer;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.cache.CacheMode;
+import org.apache.ignite.cluster.ClusterState;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.DataRegionConfiguration;
+import org.apache.ignite.configuration.DataStorageConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.configuration.WALMode;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.pagemem.wal.WALIterator;
+import org.apache.ignite.internal.pagemem.wal.record.DataRecord;
+import org.apache.ignite.internal.pagemem.wal.record.WALRecord;
+import org.apache.ignite.internal.processors.cache.persistence.file.RandomAccessFileIOFactory;
+import org.apache.ignite.internal.processors.cache.persistence.wal.WALPointer;
+import org.apache.ignite.internal.processors.cache.persistence.wal.reader.IgniteWalIteratorFactory;
+import org.apache.ignite.internal.processors.cache.persistence.wal.reader.IgniteWalIteratorFactory.IteratorParametersBuilder;
+import org.apache.ignite.internal.util.typedef.internal.CU;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteBiTuple;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC;
+import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
+import static org.apache.ignite.cache.CacheMode.PARTITIONED;
+import static org.apache.ignite.cache.CacheMode.REPLICATED;
+import static org.apache.ignite.cdc.CdcSelfTest.WAL_ARCHIVE_TIMEOUT;
+import static org.apache.ignite.testframework.GridTestUtils.getFieldValue;
+import static org.apache.ignite.testframework.GridTestUtils.waitForCondition;
+
+/** Check only {@link DataRecord} written to the WAL for in-memory cache. */
+@RunWith(Parameterized.class)
+public class WalForCdcTest extends GridCommonAbstractTest {
+    /** */
+    private static final int RECORD_COUNT = 10;
+
+    /** */
+    @Parameterized.Parameter
+    public CacheMode mode;
+
+    /** */
+    @Parameterized.Parameter(1)
+    public CacheAtomicityMode atomicityMode;
+
+    /** */
+    private boolean persistenceEnabled;
+
+    /** */
+    private boolean cdcEnabled;
+
+    /** */
+    @Parameterized.Parameters(name = "mode={0}, atomicityMode={1}")
+    public static Collection<?> parameters() {
+        List<Object[]> params = new ArrayList<>();
+
+        for (CacheMode mode : Arrays.asList(REPLICATED, PARTITIONED))
+            for (CacheAtomicityMode atomicityMode : Arrays.asList(ATOMIC, TRANSACTIONAL))
+                params.add(new Object[] {mode, atomicityMode});
+
+        return params;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
+
+        cfg.setDataStorageConfiguration(new DataStorageConfiguration()
+            .setWalMode(WALMode.FSYNC)

Review comment:
       Should we test it with other modes too?

##########
File path: modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalRebalanceTest.java
##########
@@ -879,7 +879,7 @@ public void testSwitchHistoricalRebalanceToFullWhileIteratingOverWAL() throws Ex
             supplier1 -> {
                 try {
                     // Corrupt wal record in order to fail historical rebalance from supplier1 node.
-                    IgniteWriteAheadLogManager walMgr = supplier1.context().cache().context().wal();
+                    IgniteWriteAheadLogManager walMgr = supplier1.context().cache().context().wal(true);

Review comment:
       Let's parameterize this test, or don't test CDC here at all.

##########
File path: modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/BPlusTreeReuseListPageMemoryImplTest.java
##########
@@ -76,15 +77,18 @@
         cctx.add(new GridMetricManager(cctx));
         cctx.add(new GridSystemViewManager(cctx));
 
+        IgniteWriteAheadLogManager walMgr = new NoOpWALManager();
+
         GridCacheSharedContext<Object, Object> sharedCtx = new GridCacheSharedContext<>(
             cctx,
             null,
             null,
             null,
             new NoOpPageStoreManager(),
-            new NoOpWALManager(),
+            walMgr,
+            walMgr,

Review comment:
       I think, we should setup `cdcWalMgr` with `null` for most of tests.

##########
File path: modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalIteratorSwitchSegmentTest.java
##########
@@ -463,6 +464,7 @@ private void checkSwitchReadingSegmentDuringIteration(int serVer) throws Excepti
             null,
             null,
             walMgr,
+            walMgr,

Review comment:
       It looks like this test can be parameterized with CDC or not, WDYT? Or, we should not test CDC here at all.

##########
File path: modules/core/src/test/java/org/apache/ignite/cdc/WalForCdcTest.java
##########
@@ -0,0 +1,250 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.cdc;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Consumer;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.cache.CacheMode;
+import org.apache.ignite.cluster.ClusterState;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.DataRegionConfiguration;
+import org.apache.ignite.configuration.DataStorageConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.configuration.WALMode;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.pagemem.wal.WALIterator;
+import org.apache.ignite.internal.pagemem.wal.record.DataRecord;
+import org.apache.ignite.internal.pagemem.wal.record.WALRecord;
+import org.apache.ignite.internal.processors.cache.persistence.file.RandomAccessFileIOFactory;
+import org.apache.ignite.internal.processors.cache.persistence.wal.WALPointer;
+import org.apache.ignite.internal.processors.cache.persistence.wal.reader.IgniteWalIteratorFactory;
+import org.apache.ignite.internal.processors.cache.persistence.wal.reader.IgniteWalIteratorFactory.IteratorParametersBuilder;
+import org.apache.ignite.internal.util.typedef.internal.CU;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteBiTuple;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC;
+import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
+import static org.apache.ignite.cache.CacheMode.PARTITIONED;
+import static org.apache.ignite.cache.CacheMode.REPLICATED;
+import static org.apache.ignite.cdc.CdcSelfTest.WAL_ARCHIVE_TIMEOUT;
+import static org.apache.ignite.testframework.GridTestUtils.getFieldValue;
+import static org.apache.ignite.testframework.GridTestUtils.waitForCondition;
+
+/** Check only {@link DataRecord} written to the WAL for in-memory cache. */
+@RunWith(Parameterized.class)
+public class WalForCdcTest extends GridCommonAbstractTest {
+    /** */
+    private static final int RECORD_COUNT = 10;
+
+    /** */
+    @Parameterized.Parameter
+    public CacheMode mode;
+
+    /** */
+    @Parameterized.Parameter(1)
+    public CacheAtomicityMode atomicityMode;
+
+    /** */
+    private boolean persistenceEnabled;
+
+    /** */
+    private boolean cdcEnabled;
+
+    /** */
+    @Parameterized.Parameters(name = "mode={0}, atomicityMode={1}")
+    public static Collection<?> parameters() {
+        List<Object[]> params = new ArrayList<>();
+
+        for (CacheMode mode : Arrays.asList(REPLICATED, PARTITIONED))
+            for (CacheAtomicityMode atomicityMode : Arrays.asList(ATOMIC, TRANSACTIONAL))
+                params.add(new Object[] {mode, atomicityMode});
+
+        return params;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
+
+        cfg.setDataStorageConfiguration(new DataStorageConfiguration()
+            .setWalMode(WALMode.FSYNC)
+            .setWalForceArchiveTimeout(WAL_ARCHIVE_TIMEOUT)
+            .setDefaultDataRegionConfiguration(new DataRegionConfiguration()
+                .setPersistenceEnabled(persistenceEnabled)
+                .setCdcEnabled(cdcEnabled)));
+
+        cfg.setConsistentId(igniteInstanceName);
+
+        return cfg;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTest() throws Exception {
+        stopAllGrids();
+
+        cleanPersistenceDir();
+
+        cdcEnabled = true;
+        persistenceEnabled = false;
+    }
+
+    /** */
+    @Test
+    public void testOnlyDataRecordWritten() throws Exception {
+        IgniteEx ignite = startGrid(0);
+
+        ignite.cluster().state(ClusterState.ACTIVE);
+
+        AtomicInteger cntr = new AtomicInteger();
+
+        // Check only `DataRecords` written in WAL for in-memory cache with CDC enabled.
+        doTestWal(ignite, cache -> {
+            for (int i = 0; i < RECORD_COUNT; i++)
+                cache.put(keyForNode(ignite.affinity(DEFAULT_CACHE_NAME), cntr, ignite.localNode()), i);
+        });
+
+        // Check no WAL written during rebalance.
+        IgniteEx ignite1 = startGrid(1);
+
+        awaitPartitionMapExchange(false, true, null);
+
+        // Can't use `waitForCondition` because if test passed
+        // then no `DataRecords` loged therefore no segment archivation.
+        Thread.sleep(3 * WAL_ARCHIVE_TIMEOUT);
+
+        int walRecCnt = checkDataRecords(ignite1);
+
+        assertEquals(0, walRecCnt);
+
+        // Check `DataRecords` written on second node after rebalance.
+        doTestWal(ignite1, cache -> {

Review comment:
       Let's add a check for ignite0 too, we will test that rebalance doesn't break CDC on existing nodes.

##########
File path: modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
##########
@@ -1213,8 +1213,8 @@ else if (res.resultType() == ResultType.REMOVED_NOT_NULL) {
                 counters.accumulateSizeDelta(cctx.cacheId(), partition(), -1);
             }
 
-            if (cctx.group().persistenceEnabled() && cctx.group().walEnabled()) {
-                logPtr = cctx.shared().wal().log(new MvccDataRecord(new MvccDataEntry(
+            if (cctx.group().cdcEnabled() && cctx.group().walEnabled()) {
+                logPtr = cctx.shared().wal(true).log(new MvccDataRecord(new MvccDataEntry(

Review comment:
       Let's replace `wal(true)` with `wal(cctx.group().cdcEnabled())`. 
   
   Otherwise, it's a little bit misleading to setup the flag `forCdc` to `true` for cases when CDC is actually disabled. 
   
   Also, `cctx.group().cdcEnabled()` should return whether CDC enabled or not, and shouldn't check persistence flag inside.

##########
File path: modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java
##########
@@ -801,17 +802,32 @@ else if (op == READ) {
 
                         cctx.mvccCaching().onTxFinished(this, true);
 
-                        if (!near() && !F.isEmpty(dataEntries) && cctx.wal() != null) {
+                        boolean walEnabled = false;
+
+                        // Log only there are at least one persistent or cdc enabled group.
+                        if (!near() && !F.isEmpty(dataEntries) && cctx.wal(true) != null) {
+                            for (int i = 0; i < dataEntries.size(); i++) {
+                                CacheGroupContext grpCtx = dataEntries.get(i).get2().context().group();
+
+                                if (grpCtx.cdcEnabled() && grpCtx.walEnabled()) {

Review comment:
       Previous logic didn't check `grpCtx.walEnabled()`. Why is it needed now?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org