You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ti...@apache.org on 2023/07/19 09:37:02 UTC

[ignite] branch IGNITE-17700__realtime_cdc updated: IGNITE-19622 Collect to buffer only data records (#10845)

This is an automated email from the ASF dual-hosted git repository.

timoninmaxim pushed a commit to branch IGNITE-17700__realtime_cdc
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/IGNITE-17700__realtime_cdc by this push:
     new b609117e168 IGNITE-19622 Collect to buffer only data records (#10845)
b609117e168 is described below

commit b609117e168bb377859982428a1343db36286063
Author: Maksim Timonin <ti...@apache.org>
AuthorDate: Wed Jul 19 12:36:53 2023 +0300

    IGNITE-19622 Collect to buffer only data records (#10845)
---
 .../cache/persistence/cdc/CdcBuffer.java           |   9 +-
 .../cache/persistence/cdc/CdcBufferConsumer.java   |   7 +-
 .../cache/persistence/cdc/CdcProcessor.java        |  72 +++++++++--
 .../cache/persistence/cdc/CdcWorker.java           | 110 +++++++++++++----
 .../persistence/wal/ByteBufferWalIterator.java     |  55 ++++-----
 .../persistence/wal/FileWriteAheadLogManager.java  |   3 +
 .../wal/filehandle/FileHandleManagerImpl.java      |   2 +
 .../wal/filehandle/FileWriteHandleImpl.java        |   1 +
 .../wal/filehandle/FsyncFileWriteHandle.java       |   1 +
 .../persistence/cdc/RealtimeCdcBufferTest.java     | 131 ++++++++++-----------
 .../persistence/wal/ByteBufferWalIteratorTest.java |  30 ++---
 11 files changed, 274 insertions(+), 147 deletions(-)

diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/cdc/CdcBuffer.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/cdc/CdcBuffer.java
index 3f7b3ef8722..aa7785cf7e0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/cdc/CdcBuffer.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/cdc/CdcBuffer.java
@@ -62,6 +62,9 @@ public class CdcBuffer {
     public boolean offer(ByteBuffer data) {
         int bufSize = data.limit() - data.position();
 
+        if (bufSize == 0)
+            return true;
+
         if (size.addAndGet(bufSize) > maxSize) {
             overflowed = true;
 
@@ -72,7 +75,11 @@ public class CdcBuffer {
 
         data.get(cp, 0, bufSize);
 
-        LinkedNode newNode = new LinkedNode(ByteBuffer.wrap(cp));
+        ByteBuffer dataBuf = ByteBuffer
+            .wrap(cp)
+            .order(data.order());
+
+        LinkedNode newNode = new LinkedNode(dataBuf);
         LinkedNode oldNode = producerNode;
 
         producerNode = newNode;
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/cdc/CdcBufferConsumer.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/cdc/CdcBufferConsumer.java
index 5e96fa2c98f..c75ee9b57ae 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/cdc/CdcBufferConsumer.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/cdc/CdcBufferConsumer.java
@@ -17,17 +17,18 @@
 
 package org.apache.ignite.internal.processors.cache.persistence.cdc;
 
-import java.nio.ByteBuffer;
+import java.util.Collection;
+import org.apache.ignite.cdc.CdcEvent;
 
 /** Mock for Realtime CDC buffer consumer. */
 public interface CdcBufferConsumer {
     /**
      * Consumes raw WAL data.
      *
-     * @param data Raw data to consume.
+     * @param cdcEvents CDC events to consume.
      * @return {@code True} if current offset in WAL should be commited.
      */
-    public boolean consume(ByteBuffer data);
+    public boolean consume(Collection<CdcEvent> cdcEvents);
 
     /** */
     public void close();
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/cdc/CdcProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/cdc/CdcProcessor.java
index 9e5cb0dc4d8..2e8975fb1ba 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/cdc/CdcProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/cdc/CdcProcessor.java
@@ -18,13 +18,31 @@
 package org.apache.ignite.internal.processors.cache.persistence.cdc;
 
 import java.nio.ByteBuffer;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteException;
 import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.IgniteSystemProperties;
 import org.apache.ignite.internal.IgniteInterruptedCheckedException;
+import org.apache.ignite.internal.pagemem.wal.WALIterator;
+import org.apache.ignite.internal.pagemem.wal.record.MarshalledRecord;
+import org.apache.ignite.internal.pagemem.wal.record.WALRecord;
 import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
+import org.apache.ignite.internal.processors.cache.persistence.wal.ByteBufferWalIterator;
+import org.apache.ignite.internal.processors.cache.persistence.wal.WALPointer;
+import org.apache.ignite.internal.processors.cache.persistence.wal.serializer.RecordSerializer;
+import org.apache.ignite.internal.processors.cache.persistence.wal.serializer.RecordSerializerFactoryImpl;
 import org.apache.ignite.internal.util.typedef.internal.U;
+import org.jetbrains.annotations.Nullable;
+
+import static org.apache.ignite.IgniteSystemProperties.IGNITE_WAL_SERIALIZER_VERSION;
+import static org.apache.ignite.internal.processors.cache.persistence.wal.serializer.RecordSerializerFactory.LATEST_SERIALIZER_VERSION;
 
 /** CDC processor responsible for collecting data changes in realtime within Ignite node. */
 public class CdcProcessor {
+    /** Serializer latest version to use. TODO: get from WAL manager? */
+    private final int serializerVer =
+        IgniteSystemProperties.getInteger(IGNITE_WAL_SERIALIZER_VERSION, LATEST_SERIALIZER_VERSION);
+
     /** Buffer to store collected data. */
     private final CdcBuffer cdcBuf;
 
@@ -34,13 +52,24 @@ public class CdcProcessor {
     /** Ignite log. */
     private final IgniteLogger log;
 
+    /** */
+    private final RecordSerializer serializer;
+
     /** Whether CDC is enabled. Disables after {@link #cdcBuf} overflows. */
-    private boolean enabled = true;
+    private volatile boolean enabled;
+
+    /** */
+    private WALPointer lastWrittenPtr;
 
     /** */
-    public CdcProcessor(GridCacheSharedContext<?, ?> cctx, IgniteLogger log) {
+    public CdcProcessor(GridCacheSharedContext<?, ?> cctx, IgniteLogger log) throws IgniteCheckedException {
         this.log = log;
 
+        serializer = new RecordSerializerFactoryImpl(
+            cctx, (recType, recPtr) -> recType == WALRecord.RecordType.DATA_RECORD_V2)
+            .marshalledMode(true)
+            .createSerializer(serializerVer);
+
         cdcBuf = new CdcBuffer(cctx.gridConfig().getDataStorageConfiguration().getMaxCdcBufferSize());
         worker = new CdcWorker(cctx, log, cdcBuf);
     }
@@ -52,16 +81,45 @@ public class CdcProcessor {
         if (!enabled)
             return;
 
-        if (log.isDebugEnabled())
-            log.debug("Offerring a data bucket to the CDC buffer [len=" + (dataBuf.limit() - dataBuf.position()) + ']');
+        try (WALIterator walIt = new ByteBufferWalIterator(dataBuf, serializer, lastWrittenPtr)) {
+            while (walIt.hasNext()) {
+                MarshalledRecord rec = (MarshalledRecord)walIt.next().getValue();
+
+                if (log.isDebugEnabled())
+                    log.debug("Offerring a data bucket to the CDC buffer [len=" + rec.buffer().limit() + ']');
+
+                if (!cdcBuf.offer(rec.buffer())) {
+                    enabled = false;
+
+                    log.warning("CDC buffer has overflowed. Stop realtime mode of CDC.");
 
-        if (!cdcBuf.offer(dataBuf)) {
-            enabled = false;
+                    return;
+                }
+            }
 
-            log.warning("CDC buffer has overflowed. Stop realtime mode of CDC.");
+            assert walIt.lastRead().isPresent();
+
+            lastWrittenPtr = walIt.lastRead().get();
+        }
+        catch (IgniteCheckedException e) {
+            throw new IgniteException(e);
         }
     }
 
+    /**
+     * Enable realtime CDC.
+     *
+     * @param lastWrittenPtr Pointer to last written record, excluded from CDC.
+     */
+    public void enable(@Nullable WALPointer lastWrittenPtr) {
+        if (enabled)
+            return;
+
+        this.lastWrittenPtr = lastWrittenPtr;
+
+        enabled = true;
+    }
+
     /** Start CDC worker. */
     public void start() {
         worker.restart();
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/cdc/CdcWorker.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/cdc/CdcWorker.java
index 5fad6492023..07ad3d8ba26 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/cdc/CdcWorker.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/cdc/CdcWorker.java
@@ -17,16 +17,27 @@
 
 package org.apache.ignite.internal.processors.cache.persistence.cdc;
 
+import java.io.IOException;
 import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
 import java.util.concurrent.locks.LockSupport;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.IgniteSystemProperties;
+import org.apache.ignite.cdc.CdcEvent;
 import org.apache.ignite.failure.FailureContext;
+import org.apache.ignite.internal.cdc.CdcEventImpl;
+import org.apache.ignite.internal.pagemem.wal.record.DataEntry;
+import org.apache.ignite.internal.pagemem.wal.record.DataRecord;
 import org.apache.ignite.internal.pagemem.wal.record.RealtimeCdcRecord;
 import org.apache.ignite.internal.pagemem.wal.record.RealtimeCdcStopRecord;
 import org.apache.ignite.internal.pagemem.wal.record.WALRecord;
+import org.apache.ignite.internal.processors.cache.CacheObjectValueContext;
 import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
+import org.apache.ignite.internal.processors.cache.persistence.wal.ByteBufferBackedDataInputImpl;
+import org.apache.ignite.internal.processors.cache.persistence.wal.serializer.RecordDataV2Serializer;
 import org.apache.ignite.internal.util.worker.GridWorker;
 import org.apache.ignite.thread.IgniteThread;
 
@@ -50,6 +61,9 @@ public class CdcWorker extends GridWorker {
     /** */
     private final GridCacheSharedContext<?, ?> cctx;
 
+    /** */
+    private final RecordDataV2Serializer dataRecSer;
+
     /** */
     public CdcWorker(GridCacheSharedContext<?, ?> cctx, IgniteLogger log, CdcBuffer cdcBuf) {
         super(cctx.igniteInstanceName(),
@@ -61,52 +75,96 @@ public class CdcWorker extends GridWorker {
         this.cdcBuf = cdcBuf;
 
         consumer = cctx.gridConfig().getDataStorageConfiguration().getCdcConsumer();
+
+        // TODO: get version from WAL manager?
+        dataRecSer = new RecordDataV2Serializer(cctx);
     }
 
     /** */
     @Override public void body() {
-        while (!isCancelled()) {
-            updateHeartbeat();
+        // TODO: concurrent stop? do I need acquire a lock here?
+        if (cctx.kernalContext().isStopping())
+            return;
 
-            if (cdcBuf.overflowed()) {
-                log(new RealtimeCdcStopRecord());
+        try {
+            while (!isCancelled()) {
+                updateHeartbeat();
 
-                cancel();
+                if (cdcBuf.overflowed()) {
+                    log(new RealtimeCdcStopRecord());
 
-                return;
-            }
+                    cancel();
+
+                    return;
+                }
+
+                ByteBuffer data = cdcBuf.poll();
 
-            ByteBuffer data = cdcBuf.poll();
+                if (data == null) {
+                    LockSupport.parkNanos(cdcBufPollTimeout * 1_000_000);  // millis to nanos.
 
-            if (data == null) {
-                LockSupport.parkNanos(cdcBufPollTimeout * 1_000_000);  // millis to nanos.
+                    continue;
+                }
 
-                continue;
+                if (log.isDebugEnabled())
+                    log.debug("Poll a data bucket from CDC buffer [len=" + (data.limit() - data.position()) + ']');
+
+                // TODO: Consumer must not block this system thread. Or this thread should not be system thread?
+                if (consumer.consume(cdcEvents(data)))
+                    log(new RealtimeCdcRecord());
             }
+        }
+        catch (Exception e) {
+            cctx.kernalContext().failure().process(new FailureContext(CRITICAL_ERROR, e));
+        }
+    }
+
+    /** */
+    private Collection<CdcEvent> cdcEvents(ByteBuffer buf) throws IgniteCheckedException, IOException {
+        List<CdcEvent> cdcEvts = new ArrayList<>();
+
+        while (buf.hasRemaining()) {
+            int recPos = buf.position();
+
+            // -1 is a magic number here. See RecordV2Serializer#recordIO#readWithHeaders, marshaller mode.
+            WALRecord.RecordType type = WALRecord.RecordType.fromIndex(buf.get() - 1);
+
+            assert type == WALRecord.RecordType.DATA_RECORD_V2 : type;
 
-            if (log.isDebugEnabled())
-                log.debug("Poll a data bucket from CDC buffer [len=" + (data.limit() - data.position()) + ']');
+            buf.position(recPos + 1 + 8 + 4);  // type + walSegIdx + fileOff.
+            int size = buf.getInt();
 
-            // TODO: Consumer must not block this system thread. Or this thread should not be system thread?
-            if (consumer.consume(data))
-                log(new RealtimeCdcRecord());
+            DataRecord dataRec = (DataRecord)dataRecSer.readRecord(type, new ByteBufferBackedDataInputImpl().buffer(buf), size);
+
+            for (DataEntry e: dataRec.writeEntries()) {
+                CacheObjectValueContext coctx = cctx.cacheObjectContext(e.cacheId());
+
+                cdcEvts.add(new CdcEventImpl(
+                    e.key().value(coctx, false),
+                    e.value().valueBytes(coctx),
+                    (e.flags() & DataEntry.PRIMARY_FLAG) != 0,
+                    e.partitionId(),
+                    e.writeVersion(),
+                    e.cacheId(),
+                    e.expireTime()
+                ));
+            }
+
+            buf.position(buf.position() + 4);  // Skip CRC.
         }
+
+        return cdcEvts;
     }
 
     /** */
     // TODO: rethink after IGNITE-19637. NULL might return during node start up, then overflowing was during memory restore.
     //       What to do in such case?
-    private void log(WALRecord rec) {
-        try {
-            if (cctx.wal().log(rec) == null) {
-                long maxCdcBufSize = cctx.gridConfig().getDataStorageConfiguration().getMaxCdcBufferSize();
+    private void log(WALRecord rec) throws IgniteCheckedException {
+        if (cctx.wal().log(rec) == null) {
+            long maxCdcBufSize = cctx.gridConfig().getDataStorageConfiguration().getMaxCdcBufferSize();
 
-                log.error("Realtime CDC misses writing WAL record. CDC buffer size might be too low" +
-                    " [rec=" + rec + ", maxCdcBufSize=" + maxCdcBufSize + ']');
-            }
-        }
-        catch (IgniteCheckedException e) {
-            cctx.kernalContext().failure().process(new FailureContext(CRITICAL_ERROR, e));
+            log.error("Realtime CDC failed writing WAL record. CDC buffer size might be too low" +
+                " [rec=" + rec + ", maxCdcBufSize=" + maxCdcBufSize + ']');
         }
     }
 
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/ByteBufferWalIterator.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/ByteBufferWalIterator.java
index 2e11c868429..e1aade1140f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/ByteBufferWalIterator.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/ByteBufferWalIterator.java
@@ -22,55 +22,42 @@ import java.nio.ByteBuffer;
 import java.util.Optional;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.internal.pagemem.wal.record.WALRecord;
-import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
 import org.apache.ignite.internal.processors.cache.persistence.wal.serializer.RecordSerializer;
-import org.apache.ignite.internal.processors.cache.persistence.wal.serializer.RecordSerializerFactoryImpl;
-import org.apache.ignite.lang.IgniteBiPredicate;
 import org.apache.ignite.lang.IgniteBiTuple;
+import org.jetbrains.annotations.Nullable;
 
 import static org.apache.ignite.internal.pagemem.wal.record.WALRecord.RecordType.HEADER_RECORD;
 import static org.apache.ignite.internal.processors.cache.persistence.wal.serializer.RecordV1Serializer.HEADER_RECORD_SIZE;
+import static org.apache.ignite.internal.processors.cache.persistence.wal.serializer.RecordV1Serializer.readPosition;
 
 /** Byte Buffer WAL Iterator */
 public class ByteBufferWalIterator extends AbstractWalRecordsIteratorAdapter {
     /** */
     private static final long serialVersionUID = 0L;
 
-    /** */
+    /** WAL records serializer. */
     private final RecordSerializer serializer;
 
-    /** */
+    /** Buffer input wrapper. */
     private final ByteBufferBackedDataInputImpl dataInput;
 
-    /** */
-    private WALPointer expWalPtr;
+    /** Last read pointer. */
+    private @Nullable WALPointer lastReadPtr;
 
     /** */
     public ByteBufferWalIterator(
-        GridCacheSharedContext<?, ?> cctx,
         ByteBuffer byteBuf,
-        int ver,
-        WALPointer walPointer
+        RecordSerializer serializer,
+        @Nullable WALPointer lastWrittenPtr
     ) throws IgniteCheckedException {
-        this(cctx, byteBuf, ver, walPointer, null);
-    }
+        this.serializer = serializer;
 
-    /** */
-    public ByteBufferWalIterator(
-        GridCacheSharedContext<?, ?> cctx,
-        ByteBuffer byteBuf,
-        int ver,
-        WALPointer expWalPtr,
-        IgniteBiPredicate<WALRecord.RecordType, WALPointer> readTypeFilter
-    ) throws IgniteCheckedException {
-        serializer = new RecordSerializerFactoryImpl(cctx, readTypeFilter).createSerializer(ver);
+        lastReadPtr = lastWrittenPtr;
 
         dataInput = new ByteBufferBackedDataInputImpl();
 
         dataInput.buffer(byteBuf);
 
-        this.expWalPtr = expWalPtr;
-
         advance();
     }
 
@@ -85,11 +72,16 @@ public class ByteBufferWalIterator extends AbstractWalRecordsIteratorAdapter {
             if (curRec == null)
                 skipHeader();
 
-            WALRecord rec = serializer.readRecord(dataInput, expWalPtr);
+            WALPointer nextPtr = new WALPointer(lastReadPtr.index(), lastReadPtr.fileOffset() + lastReadPtr.length(), 0);
+
+            WALRecord rec = serializer.readRecord(dataInput, nextPtr);
+
+            // TODO: concurrency between FileredRecord#size in different WAL consumers? compaction thread vs cdc thread?
+            nextPtr.length(rec.size());
 
-            result = new IgniteBiTuple<>(rec.position(), rec);
+            result = new IgniteBiTuple<>(nextPtr, rec);
 
-            expWalPtr = new WALPointer(expWalPtr.index(), expWalPtr.fileOffset() + rec.size(), 0);
+            lastReadPtr = nextPtr;
         }
         catch (SegmentEofException e) {
             return null;
@@ -110,12 +102,17 @@ public class ByteBufferWalIterator extends AbstractWalRecordsIteratorAdapter {
         WALRecord.RecordType recType = WALRecord.RecordType.fromIndex(type - 1);
 
         if (recType == HEADER_RECORD) {
+            long segIdx = readPosition(dataInput).index();
+
             dataInput.buffer().position(position + HEADER_RECORD_SIZE);
 
-            expWalPtr = new WALPointer(expWalPtr.index(), expWalPtr.fileOffset() + HEADER_RECORD_SIZE, 0);
+            lastReadPtr = new WALPointer(segIdx, 0, HEADER_RECORD_SIZE);
         }
-        else
+        else {
+            assert lastReadPtr != null;
+
             dataInput.buffer().position(position);
+        }
     }
 
     /** {@inheritDoc} */
@@ -127,6 +124,6 @@ public class ByteBufferWalIterator extends AbstractWalRecordsIteratorAdapter {
 
     /** {@inheritDoc} */
     @Override public Optional<WALPointer> lastRead() {
-        throw new UnsupportedOperationException();
+        return Optional.of(lastReadPtr);
     }
 }
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java
index 4e5a9358120..1a929018b6f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java
@@ -822,6 +822,9 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
         assert (isArchiverEnabled() && archiver != null) || (!isArchiverEnabled() && archiver == null) :
             "Trying to restore FileWriteHandle on deactivated write ahead log manager";
 
+        if (cdcProc != null)
+            cdcProc.enable(filePtr);
+
         fileHandleManager.resumeLogging();
 
         updateCurrentHandle(restoreWriteHandle(filePtr), null);
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/filehandle/FileHandleManagerImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/filehandle/FileHandleManagerImpl.java
index e0a779fa61a..dc281831bb6 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/filehandle/FileHandleManagerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/filehandle/FileHandleManagerImpl.java
@@ -387,10 +387,12 @@ public class FileHandleManagerImpl implements FileHandleManager {
 
                             writeBuffer(seg.position(), seg.buffer());
 
+                            // TODO: do not duplicate if processor disabled.
                             if (cdcProc != null) {
                                 ByteBuffer cdcBuf = seg.buffer().duplicate();
                                 cdcBuf.position(bufPos);
                                 cdcBuf.limit(seg.buffer().limit());
+                                cdcBuf.order(seg.buffer().order());
 
                                 cdcProc.collect(cdcBuf);
                             }
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/filehandle/FileWriteHandleImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/filehandle/FileWriteHandleImpl.java
index a1fe746eeb6..44d7c22718b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/filehandle/FileWriteHandleImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/filehandle/FileWriteHandleImpl.java
@@ -422,6 +422,7 @@ class FileWriteHandleImpl extends AbstractFileHandle implements FileWriteHandle
                             ByteBuffer cdcBuf = buf.buf.duplicate();
                             cdcBuf.position(off);
                             cdcBuf.limit(off + len);
+                            cdcBuf.order(buf.buf.order());
 
                             cdcProc.collect(cdcBuf);
                         }
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/filehandle/FsyncFileWriteHandle.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/filehandle/FsyncFileWriteHandle.java
index 59178e497ba..15db2139533 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/filehandle/FsyncFileWriteHandle.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/filehandle/FsyncFileWriteHandle.java
@@ -810,6 +810,7 @@ class FsyncFileWriteHandle extends AbstractFileHandle implements FileWriteHandle
                 ByteBuffer cdcBuf = buf.duplicate();
                 cdcBuf.position(bufPos);
                 cdcBuf.limit(buf.limit());
+                cdcBuf.order(buf.order());
 
                 cdcProc.collect(cdcBuf);
             }
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/cdc/RealtimeCdcBufferTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/cdc/RealtimeCdcBufferTest.java
index c42a31f9711..3afcc5ce754 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/cdc/RealtimeCdcBufferTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/cdc/RealtimeCdcBufferTest.java
@@ -18,14 +18,17 @@
 package org.apache.ignite.internal.processors.cache.persistence.cdc;
 
 import java.io.File;
-import java.nio.ByteBuffer;
-import java.nio.file.Files;
+import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collection;
 import java.util.List;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
 import org.apache.ignite.IgniteCache;
 import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.cdc.CdcEvent;
 import org.apache.ignite.cluster.ClusterState;
 import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.configuration.DataRegionConfiguration;
@@ -39,7 +42,9 @@ import org.apache.ignite.internal.pagemem.wal.record.WALRecord;
 import org.apache.ignite.internal.processors.cache.persistence.wal.WALPointer;
 import org.apache.ignite.internal.processors.cache.persistence.wal.reader.IgniteWalIteratorFactory;
 import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.T2;
 import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteBiPredicate;
 import org.apache.ignite.testframework.ListeningTestLogger;
 import org.apache.ignite.testframework.LogListener;
 import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
@@ -93,7 +98,7 @@ public class RealtimeCdcBufferTest extends GridCommonAbstractTest {
     @Override protected IgniteConfiguration getConfiguration(String instanceName) throws Exception {
         IgniteConfiguration cfg = super.getConfiguration(instanceName);
 
-        consumer = new ByteBufferCdcConsumer(maxCdcBufSize, commitCnt);
+        consumer = new ByteBufferCdcConsumer(commitCnt);
 
         cfg.setDataStorageConfiguration(new DataStorageConfiguration()
             .setMaxCdcBufferSize(maxCdcBufSize)
@@ -134,9 +139,12 @@ public class RealtimeCdcBufferTest extends GridCommonAbstractTest {
     /** */
     @Test
     public void testCdcBufferOverflow() throws Exception {
+        // TODO: Looks like there is a bug in the FSYNC case: WAL misses some records (HEADER_RECORD).
+        assumeFalse(walMode == WALMode.FSYNC);
+
         maxCdcBufSize = 100 * (int)U.KB;
 
-        checkCdcBufferOverflow(10 * (int)U.KB, 100, true);
+        checkCdcBufferOverflow(buildEntries(100, 10 * (int)U.KB), true);
     }
 
     /** */
@@ -144,12 +152,15 @@ public class RealtimeCdcBufferTest extends GridCommonAbstractTest {
     public void testCdcDisabled() throws Exception {
         cdcEnabled = false;
 
-        checkCdcBufferOverflow(10 * (int)U.KB, 100, false);
+        checkCdcBufferOverflow(buildEntries(100, 10 * (int)U.KB), false);
     }
 
     /** */
     @Test
     public void testCdcRecords() throws Exception {
+        // TODO: Looks like there is a bug in the FSYNC case: WAL misses some records (HEADER_RECORD).
+        assumeFalse(walMode == WALMode.FSYNC);
+
         maxCdcBufSize = 100 * (int)U.MB;
         commitCnt = new AtomicInteger();
 
@@ -173,7 +184,7 @@ public class RealtimeCdcBufferTest extends GridCommonAbstractTest {
 
         stopGrid(0);
 
-        try (WALIterator walIt = walIter(walSegments())) {
+        try (WALIterator walIt = walIter(null)) {
             int cdcRecCnt = 0;
 
             while (walIt.hasNext()) {
@@ -188,57 +199,50 @@ public class RealtimeCdcBufferTest extends GridCommonAbstractTest {
     /** */
     @Test
     public void testCdcBufferContent() throws Exception {
-        // TODO: Looks like there is a bug in the FSYNC case: WAL misses some records.
+        // TODO: Looks like there is a bug in the FSYNC case: WAL misses some records (HEADER_RECORD).
         assumeFalse(walMode == WALMode.FSYNC);
 
         maxCdcBufSize = 10 * (int)U.MB;
 
-        stopLatch = new CountDownLatch(1);
-
-        checkCdcBufferOverflow((int)U.KB, 100, false);
-
-        U.awaitQuiet(stopLatch);
+        int entriesCnt = 1;
 
-        File walSegments = walSegments();
+        List<T2<Integer, byte[]>> expEntries = buildEntries((int)U.KB, entriesCnt);
 
-        WALIterator it = walIter(walSegments);
+        checkCdcBufferOverflow(expEntries, false);
 
-        while (it.hasNext())
-            it.next();
+        assertEquals(entriesCnt, expEntries.size());
 
-        WALPointer ptr = it.lastRead().get();
-        int length = ptr.fileOffset() + ptr.length();
+        List<T2<Integer, byte[]>> cdcEntries = consumer.buf.stream()
+            .map(e -> new T2<>((int)e.key(), (byte[])e.value()))
+            .collect(Collectors.toList());
 
-        File seg = Arrays.stream(walSegments.listFiles()).sorted().findFirst().get();
+        assertEquals(expEntries.size(), cdcEntries.size());
 
-        byte[] walSegData = Files.readAllBytes(seg.toPath());
-
-        int step = 100;
-
-        for (int off = 0; off < length; off += step) {
-            int l = off + step < length ? step : length - off;
+        for (int i = 0; i < expEntries.size(); i++) {
+            assertEquals(expEntries.get(i).getKey(), cdcEntries.get(i).getKey());
+            assertTrue(Arrays.equals(expEntries.get(i).getValue(), cdcEntries.get(i).getValue()));
+        }
+    }
 
-            byte[] testWalData = new byte[l];
-            byte[] testCdcData = new byte[l];
+    /** */
+    private List<T2<Integer, byte[]>> buildEntries(int size, int cnt) {
+        List<T2<Integer, byte[]>> entries = new ArrayList<>();
 
-            ByteBuffer buf = ByteBuffer.wrap(walSegData);
-            buf.position(off);
-            buf.get(testWalData, 0, l);
+        for (int i = 0; i < cnt; i++) {
+            byte[] data = new byte[size];
 
-            buf = ByteBuffer.wrap(consumer.buf.array());
-            buf.position(off);
-            buf.get(testCdcData, 0, l);
+            ThreadLocalRandom.current().nextBytes(data);
 
-            assertTrue(
-                "Offset " + off + "/" + length + "\n" +
-                "EXPECT " + Arrays.toString(testWalData) + "\n" +
-                "ACTUAL " + Arrays.toString(testCdcData),
-                Arrays.equals(testWalData, testCdcData));
+            entries.add(new T2<>(i, data));
         }
+
+        return entries;
     }
 
     /** */
-    private void checkCdcBufferOverflow(int entrySize, int entryCnt, boolean shouldOverflow) throws Exception {
+    private void checkCdcBufferOverflow(List<T2<Integer, byte[]>> entries, boolean shouldOverflow) throws Exception {
+        stopLatch = cdcEnabled ? new CountDownLatch(entries.size()) : null;
+
         LogListener lsnr = LogListener.matches("CDC buffer has overflowed. Stop realtime mode of CDC.")
             .times(shouldOverflow ? 1 : 0)
             .build();
@@ -251,29 +255,24 @@ public class RealtimeCdcBufferTest extends GridCommonAbstractTest {
 
         IgniteCache<Integer, byte[]> cache = crd.cache(DEFAULT_CACHE_NAME);
 
-        for (int i = 0; i < entryCnt; i++) {
-            byte[] data = new byte[entrySize];
-
-            Arrays.fill(data, (byte)1);
-
-            cache.put(i, data);
-        }
+        for (T2<Integer, byte[]> e: entries)
+            cache.put(e.getKey(), e.getValue());
 
         forceCheckpoint(crd);
 
+        if (stopLatch != null && !shouldOverflow)
+            U.awaitQuiet(stopLatch);
+
         stopGrid(0);
 
         assertTrue(lsnr.check());
 
-        try (WALIterator walIt = walIter(walSegments())) {
-            int stopRecCnt = 0;
+        try (WALIterator walIt = walIter((recType, recPtr) -> recType == WALRecord.RecordType.REALTIME_STOP_CDC_RECORD)) {
+            AtomicInteger stopRecCnt = new AtomicInteger();
 
-            while (walIt.hasNext()) {
-                if (walIt.next().getValue().type() == WALRecord.RecordType.REALTIME_STOP_CDC_RECORD)
-                    stopRecCnt++;
-            }
+            walIt.forEach((rec) -> stopRecCnt.incrementAndGet());
 
-            assertEquals(shouldOverflow ? 1 : 0, stopRecCnt);
+            assertEquals(shouldOverflow ? 1 : 0, stopRecCnt.get());
         }
     }
 
@@ -285,11 +284,12 @@ public class RealtimeCdcBufferTest extends GridCommonAbstractTest {
     }
 
     /** Get iterator over WAL. */
-    private WALIterator walIter(File walSegments) throws Exception {
+    private WALIterator walIter(@Nullable IgniteBiPredicate<WALRecord.RecordType, WALPointer> filter) throws Exception {
         IgniteWalIteratorFactory factory = new IgniteWalIteratorFactory(log);
 
         IgniteWalIteratorFactory.IteratorParametersBuilder params = new IgniteWalIteratorFactory.IteratorParametersBuilder()
-            .filesOrDirs(walSegments);
+            .filesOrDirs(walSegments())
+            .filter(filter);
 
         return factory.iterator(params);
     }
@@ -297,32 +297,31 @@ public class RealtimeCdcBufferTest extends GridCommonAbstractTest {
     /** */
     private static class ByteBufferCdcConsumer implements CdcBufferConsumer {
         /** */
-        private final ByteBuffer buf;
+        private final List<CdcEvent> buf = new ArrayList<>();
 
         /** */
         private final AtomicInteger commitCnt;
 
         /** */
-        ByteBufferCdcConsumer(int maxCdcBufSize, @Nullable AtomicInteger commitCnt) {
+        ByteBufferCdcConsumer(@Nullable AtomicInteger commitCnt) {
             this.commitCnt = commitCnt;
-
-            buf = ByteBuffer.allocate(maxCdcBufSize);
-
-            Arrays.fill(buf.array(), (byte)0);
-
-            buf.position(0);
         }
 
         /** {@inheritDoc} */
-        @Override public boolean consume(ByteBuffer data) {
-            buf.put(data);
+        @Override public boolean consume(Collection<CdcEvent> data) {
+            buf.addAll(data);
+
+            if (stopLatch != null) {
+                for (int i = 0; i < data.size(); i++)
+                    stopLatch.countDown();
+            }
 
             return commitCnt != null && commitCnt.decrementAndGet() >= 0;
         }
 
         /** */
         @Override public void close() {
-            if (stopLatch != null)
+            for (int i = 0; i < stopLatch.getCount(); i++)
                 stopLatch.countDown();
         }
     }
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/wal/ByteBufferWalIteratorTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/wal/ByteBufferWalIteratorTest.java
index cd6fd9f477d..6309004d687 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/wal/ByteBufferWalIteratorTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/wal/ByteBufferWalIteratorTest.java
@@ -189,8 +189,7 @@ public class ByteBufferWalIteratorTest extends GridCommonAbstractTest {
 
         byteBuf.flip();
 
-        WALIterator walIter = new ByteBufferWalIterator(sharedCtx, byteBuf,
-            RecordSerializerFactory.LATEST_SERIALIZER_VERSION, new WALPointer(idx, 0, 0));
+        WALIterator walIter = new ByteBufferWalIterator(byteBuf, serializer, new WALPointer(idx, 0, 0));
 
         Iterator<DataEntry> dataEntriesIter = entries.iterator();
 
@@ -228,8 +227,7 @@ public class ByteBufferWalIteratorTest extends GridCommonAbstractTest {
 
         byteBuf.flip();
 
-        WALIterator walIter = new ByteBufferWalIterator(sharedCtx, byteBuf,
-            RecordSerializerFactory.LATEST_SERIALIZER_VERSION, new WALPointer(idx, 0, 0));
+        WALIterator walIter = new ByteBufferWalIterator(byteBuf, serializer, new WALPointer(idx, 0, 0));
 
         Iterator<WALRecord> recordsIter = records.iterator();
 
@@ -291,9 +289,13 @@ public class ByteBufferWalIteratorTest extends GridCommonAbstractTest {
 
         byteBuf.flip();
 
-        WALIterator walIter = new ByteBufferWalIterator(sharedCtx, byteBuf,
-            RecordSerializerFactory.LATEST_SERIALIZER_VERSION, new WALPointer(idx, 0, 0),
-            (t, p) -> t.purpose() == WALRecord.RecordPurpose.LOGICAL);
+        RecordSerializer serializer = new RecordSerializerFactoryImpl(
+            sharedCtx,
+            (t, p) -> t.purpose() == WALRecord.RecordPurpose.LOGICAL
+        )
+            .createSerializer(RecordSerializerFactory.LATEST_SERIALIZER_VERSION);
+
+        WALIterator walIter = new ByteBufferWalIterator(byteBuf, serializer, new WALPointer(idx, 0, 0));
 
         Iterator<DataEntry> dataEntriesIter = entries.iterator();
 
@@ -356,8 +358,7 @@ public class ByteBufferWalIteratorTest extends GridCommonAbstractTest {
 
         byteBuf.limit((position1 + position2) >> 1);
 
-        WALIterator walIter = new ByteBufferWalIterator(sharedCtx, byteBuf,
-            RecordSerializerFactory.LATEST_SERIALIZER_VERSION, new WALPointer(idx, 0, 0));
+        WALIterator walIter = new ByteBufferWalIterator(byteBuf, serializer, new WALPointer(idx, 0, 0));
 
         assertTrue(walIter.hasNext());
 
@@ -384,8 +385,7 @@ public class ByteBufferWalIteratorTest extends GridCommonAbstractTest {
 
         byteBuf.flip();
 
-        WALIterator walIter = new ByteBufferWalIterator(sharedCtx, byteBuf,
-            RecordSerializerFactory.LATEST_SERIALIZER_VERSION, new WALPointer(idx, 0, 0));
+        WALIterator walIter = new ByteBufferWalIterator(byteBuf, serializer, new WALPointer(idx, 0, 0));
 
         assertFalse(walIter.hasNext());
 
@@ -425,8 +425,9 @@ public class ByteBufferWalIteratorTest extends GridCommonAbstractTest {
 
         int shift = adaptTest ? -1 : 0;
 
-        ByteBufferWalIterator walIter = new ByteBufferWalIterator(sharedCtx, byteBuf,
-            RecordSerializerFactory.LATEST_SERIALIZER_VERSION, new WALPointer(idx, pos, 0));
+        WALPointer ptr = hasHdr ? null : new WALPointer(idx, pos, 0);
+
+        ByteBufferWalIterator walIter = new ByteBufferWalIterator(byteBuf, serializer, ptr);
 
         Map<WALRecord.RecordType, Integer> counts = new EnumMap<>(WALRecord.RecordType.class);
 
@@ -571,8 +572,7 @@ public class ByteBufferWalIteratorTest extends GridCommonAbstractTest {
 
         positions.add(byteBuf.position());
 
-        ByteBufferWalIterator walIter = new ByteBufferWalIterator(sharedCtx, byteBuf,
-            RecordSerializerFactory.LATEST_SERIALIZER_VERSION, new WALPointer((int)fd.idx(), 0, 0));
+        ByteBufferWalIterator walIter = new ByteBufferWalIterator(byteBuf, serializer, new WALPointer((int)fd.idx(), 0, 0));
 
         positions.add(byteBuf.position());