You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by or...@apache.org on 2022/11/03 07:07:45 UTC

[camel] branch main updated: CAMEL-18148: implements a WAL strategy for the Resume API

This is an automated email from the ASF dual-hosted git repository.

orpiske pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/main by this push:
     new dbdd44b8b1c CAMEL-18148: implements a WAL strategy for the Resume API
dbdd44b8b1c is described below

commit dbdd44b8b1c7bec926c5f84cdefa4074eca274c3
Author: Otavio Rodolfo Piske <an...@gmail.com>
AuthorDate: Thu Oct 6 15:42:52 2022 +0200

    CAMEL-18148: implements a WAL strategy for the Resume API
---
 bom/camel-bom/pom.xml                              |   5 +
 catalog/camel-allcomponents/pom.xml                |   4 +
 .../org/apache/camel/catalog/others.properties     |   1 +
 .../org/apache/camel/catalog/others/wal.json       |  15 +
 components/camel-wal/pom.xml                       |  71 +++++
 .../services/org/apache/camel/other.properties     |   7 +
 .../camel-wal/src/generated/resources/wal.json     |  15 +
 components/camel-wal/src/main/docs/wal-docs.adoc   |  43 +++
 .../camel/component/wal/DefaultLogSupervisor.java  |  73 +++++
 .../org/apache/camel/component/wal/EntryInfo.java  |  71 +++++
 .../org/apache/camel/component/wal/Header.java     |  59 ++++
 .../org/apache/camel/component/wal/IOUtil.java     | 133 +++++++++
 .../org/apache/camel/component/wal/LogEntry.java   | 124 ++++++++
 .../org/apache/camel/component/wal/LogReader.java  | 241 ++++++++++++++++
 .../apache/camel/component/wal/LogSupervisor.java  |  37 +++
 .../org/apache/camel/component/wal/LogWriter.java  | 277 ++++++++++++++++++
 .../camel/component/wal/PersistedLogEntry.java     |  36 +++
 .../apache/camel/component/wal/TransactionLog.java | 216 ++++++++++++++
 .../component/wal/WriteAheadResumeStrategy.java    | 311 +++++++++++++++++++++
 .../component/wal/exceptions/BufferOverflow.java   |  43 +++
 .../wal/exceptions/BufferTooSmallException.java    |  43 +++
 .../wal/exceptions/InvalidRecordException.java     |  25 ++
 .../apache/camel/component/wal/LogReaderTest.java  |  46 +++
 .../apache/camel/component/wal/LogTestBase.java    |  91 ++++++
 .../camel/component/wal/LogWriterRollOverTest.java |  87 ++++++
 .../LogWriterRollOverUpdateAfterDiscardTest.java   | 107 +++++++
 .../wal/LogWriterRollOverUpdateAsyncBase.java      | 120 ++++++++
 .../wal/LogWriterRollOverUpdateAsyncTest.java      |  55 ++++
 ...riterRollOverUpdateAsyncWithContentionTest.java |  59 ++++
 .../component/wal/LogWriterSyncUpdateTest.java     |  82 ++++++
 .../apache/camel/component/wal/LogWriterTest.java  |  72 +++++
 .../camel/component/wal/TransactionLogTest.java    |  49 ++++
 components/pom.xml                                 |   3 +-
 .../src/test/resources/log4j2.properties           |   6 +
 .../modules/others/examples/json/wal.json          |   1 +
 docs/components/modules/others/nav.adoc            |   1 +
 docs/components/modules/others/pages/wal-docs.adoc |   1 +
 parent/pom.xml                                     |   5 +
 38 files changed, 2634 insertions(+), 1 deletion(-)

diff --git a/bom/camel-bom/pom.xml b/bom/camel-bom/pom.xml
index 8520c9cd7e3..1780b17f367 100644
--- a/bom/camel-bom/pom.xml
+++ b/bom/camel-bom/pom.xml
@@ -2174,6 +2174,11 @@
         <artifactId>camel-vm</artifactId>
         <version>${project.version}</version>
       </dependency>
+      <dependency>
+        <groupId>org.apache.camel</groupId>
+        <artifactId>camel-wal</artifactId>
+        <version>${project.version}</version>
+      </dependency>
       <dependency>
         <groupId>org.apache.camel</groupId>
         <artifactId>camel-weather</artifactId>
diff --git a/catalog/camel-allcomponents/pom.xml b/catalog/camel-allcomponents/pom.xml
index d7c9cbd5696..ea51eac430e 100644
--- a/catalog/camel-allcomponents/pom.xml
+++ b/catalog/camel-allcomponents/pom.xml
@@ -1473,6 +1473,10 @@
 			<groupId>org.apache.camel</groupId>
 			<artifactId>camel-vm</artifactId>
 		</dependency>
+		<dependency>
+			<groupId>org.apache.camel</groupId>
+			<artifactId>camel-wal</artifactId>
+		</dependency>
 		<dependency>
 			<groupId>org.apache.camel</groupId>
 			<artifactId>camel-weather</artifactId>
diff --git a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/others.properties b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/others.properties
index 608bd730d1f..85c7d3bdfee 100644
--- a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/others.properties
+++ b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/others.properties
@@ -57,6 +57,7 @@ test-spring-junit5
 threadpoolfactory-vertx
 tracing
 undertow-spring-security
+wal
 xml-io-dsl
 xml-jaxb-dsl
 yaml-dsl
diff --git a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/others/wal.json b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/others/wal.json
new file mode 100644
index 00000000000..6cbda8ec5d0
--- /dev/null
+++ b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/others/wal.json
@@ -0,0 +1,15 @@
+{
+  "other": {
+    "kind": "other",
+    "name": "wal",
+    "title": "Camel WAL component for the Resume API",
+    "description": "Camel WAL component for the Resume API",
+    "deprecated": false,
+    "firstVersion": "3.20.0",
+    "label": "java",
+    "supportLevel": "Preview",
+    "groupId": "org.apache.camel",
+    "artifactId": "camel-wal",
+    "version": "3.20.0-SNAPSHOT"
+  }
+}
diff --git a/components/camel-wal/pom.xml b/components/camel-wal/pom.xml
new file mode 100644
index 00000000000..736bc5fdf3d
--- /dev/null
+++ b/components/camel-wal/pom.xml
@@ -0,0 +1,71 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements.  See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License.  You may obtain a copy of the License at
+  ~
+  ~      http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <parent>
+        <groupId>org.apache.camel</groupId>
+        <artifactId>components</artifactId>
+        <version>3.20.0-SNAPSHOT</version>
+    </parent>
+    <modelVersion>4.0.0</modelVersion>
+
+    <artifactId>camel-wal</artifactId>
+    <packaging>jar</packaging>
+
+    <name>Camel :: WAL</name>
+    <description>Camel WAL component for the Resume API</description>
+
+    <properties>
+        <firstVersion>3.20.0</firstVersion>
+        <label>java</label>
+        <title>Camel WAL component for the Resume API</title>
+    </properties>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.camel</groupId>
+            <artifactId>camel-support</artifactId>
+        </dependency>
+
+        <dependency>
+            <groupId>org.junit.jupiter</groupId>
+            <artifactId>junit-jupiter</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.junit.jupiter</groupId>
+            <artifactId>junit-jupiter-params</artifactId>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.logging.log4j</groupId>
+            <artifactId>log4j-slf4j-impl</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.logging.log4j</groupId>
+            <artifactId>log4j-1.2-api</artifactId>
+            <scope>test</scope>
+        </dependency>
+    </dependencies>
+
+
+</project>
diff --git a/components/camel-wal/src/generated/resources/META-INF/services/org/apache/camel/other.properties b/components/camel-wal/src/generated/resources/META-INF/services/org/apache/camel/other.properties
new file mode 100644
index 00000000000..2ff122e11f1
--- /dev/null
+++ b/components/camel-wal/src/generated/resources/META-INF/services/org/apache/camel/other.properties
@@ -0,0 +1,7 @@
+# Generated by camel build tools - do NOT edit this file!
+name=wal
+groupId=org.apache.camel
+artifactId=camel-wal
+version=3.20.0-SNAPSHOT
+projectName=Camel :: WAL
+projectDescription=Camel WAL component for the Resume API
diff --git a/components/camel-wal/src/generated/resources/wal.json b/components/camel-wal/src/generated/resources/wal.json
new file mode 100644
index 00000000000..6cbda8ec5d0
--- /dev/null
+++ b/components/camel-wal/src/generated/resources/wal.json
@@ -0,0 +1,15 @@
+{
+  "other": {
+    "kind": "other",
+    "name": "wal",
+    "title": "Camel WAL component for the Resume API",
+    "description": "Camel WAL component for the Resume API",
+    "deprecated": false,
+    "firstVersion": "3.20.0",
+    "label": "java",
+    "supportLevel": "Preview",
+    "groupId": "org.apache.camel",
+    "artifactId": "camel-wal",
+    "version": "3.20.0-SNAPSHOT"
+  }
+}
diff --git a/components/camel-wal/src/main/docs/wal-docs.adoc b/components/camel-wal/src/main/docs/wal-docs.adoc
new file mode 100644
index 00000000000..98dedde8a47
--- /dev/null
+++ b/components/camel-wal/src/main/docs/wal-docs.adoc
@@ -0,0 +1,43 @@
+= WAL Component
+:doctitle: Write Ahead Log Strategy
+:shortname: wal
+:artifactid: camel-wal
+:description: A write-ahead resume strategy
+:since: 1.1
+:supportlevel: Stable
+:core:
+//Manually maintained attributes
+
+*Since Camel {since}*
+
+*{component-header}*
+
+The WAL component provides a resume strategy that uses a write-ahead log to
+
+A resume strategy that uses a write-ahead strategy to keep a transaction log of the in-processing and processed records. This strategy works by wrapping another strategy. This increases the reliability of the resume API by ensuring that records are saved locally before being sent to the remote data storage used by the resume strategy, thus guaranteeing that records can re recovered in case of crash of that system.
+
+
+== Usage
+
+Because this strategy wraps another one, then the other one should be created first and then passed as an-argument to this strategy when creating it.
+
+[source,java]
+----
+SomeOtherResumeStrategy resumeStrategy = new SomeOtherResumeStrategy();
+final String logFile = System.getProperty("wal.log.file");
+
+WriteAheadResumeStrategy writeAheadResumeStrategy = new WriteAheadResumeStrategy(new File(logFile), resumeStrategy);
+----
+
+Subsequently, this strategy should be registered to the registry instead
+
+[source,java]
+----
+getCamelContext().getRegistry().bind(ResumeStrategy.DEFAULT_NAME, writeAheadResumeStrategy);
+...
+
+from("file:{{input.dir}}?noop=true&recursive=true&preSort=true")
+    .resumable(ResumeStrategy.DEFAULT_NAME)
+    .process(this::process)
+    .to("file:{{output.dir}}");
+----
diff --git a/components/camel-wal/src/main/java/org/apache/camel/component/wal/DefaultLogSupervisor.java b/components/camel-wal/src/main/java/org/apache/camel/component/wal/DefaultLogSupervisor.java
new file mode 100644
index 00000000000..34a8156d247
--- /dev/null
+++ b/components/camel-wal/src/main/java/org/apache/camel/component/wal/DefaultLogSupervisor.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.component.wal;
+
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A log supervisor that runs in the background executing a task in the background. It is used to flush the data to disk
+ * at regular intervals
+ */
+public class DefaultLogSupervisor implements LogSupervisor {
+    private static final Logger LOG = LoggerFactory.getLogger(DefaultLogSupervisor.class);
+    private long interval;
+
+    private ScheduledExecutorService scheduledExecutorService;
+
+    /**
+     * Constructs a new log supervisor
+     * 
+     * @param interval the interval between executions of the task
+     */
+    public DefaultLogSupervisor(long interval) {
+        this(interval, Executors.newScheduledThreadPool(1));
+    }
+
+    /**
+     * Constructs a new log supervisor
+     * 
+     * @param interval                 the interval between executions of the task
+     * @param scheduledExecutorService the executor service to use for running the task
+     */
+    public DefaultLogSupervisor(long interval, ScheduledExecutorService scheduledExecutorService) {
+        this.interval = interval;
+        this.scheduledExecutorService = scheduledExecutorService;
+    }
+
+    @Override
+    public void start(Runnable runnable) {
+        scheduledExecutorService.scheduleAtFixedRate(runnable, 0, interval, TimeUnit.MILLISECONDS);
+    }
+
+    @Override
+    public void stop() {
+        scheduledExecutorService.shutdown();
+        try {
+            if (!scheduledExecutorService.awaitTermination(1, TimeUnit.SECONDS)) {
+                scheduledExecutorService.shutdownNow();
+            }
+        } catch (InterruptedException e) {
+            LOG.error("Failed to shutdown log flusher: {}", e.getMessage(), e);
+        }
+    }
+}
diff --git a/components/camel-wal/src/main/java/org/apache/camel/component/wal/EntryInfo.java b/components/camel-wal/src/main/java/org/apache/camel/component/wal/EntryInfo.java
new file mode 100644
index 00000000000..c752efbe4fe
--- /dev/null
+++ b/components/camel-wal/src/main/java/org/apache/camel/component/wal/EntryInfo.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.component.wal;
+
+/**
+ * Contains information about a log entry
+ */
+public class EntryInfo {
+    private final long position;
+
+    private EntryInfo(long position) {
+        this.position = position;
+    }
+
+    public long getPosition() {
+        return position;
+    }
+
+    /**
+     * Contains information about a log entry that is hot on the cache
+     */
+    public static class CachedEntryInfo extends EntryInfo {
+        private final TransactionLog.LayerInfo layerInfo;
+
+        CachedEntryInfo(long position, TransactionLog.LayerInfo layerInfo) {
+            super(position);
+            this.layerInfo = layerInfo;
+        }
+
+        public TransactionLog.LayerInfo getLayerInfo() {
+            return layerInfo;
+        }
+
+    }
+
+    /**
+     * Creates a new entry info instance for entries persisted at the given position
+     * 
+     * @param  position the position of the entry
+     * @return          a new entry info
+     */
+    public static EntryInfo createForPersisted(long position) {
+        return new EntryInfo(position);
+    }
+
+    /**
+     * Creates a new entry info instance for entries cached at the given position and layer
+     * 
+     * @param  position  the position of the entry
+     * @param  layerInfo the layer on the transaction cache
+     * @return           a new entry info
+     */
+    public static CachedEntryInfo createForCached(long position, TransactionLog.LayerInfo layerInfo) {
+        return new CachedEntryInfo(position, layerInfo);
+    }
+}
diff --git a/components/camel-wal/src/main/java/org/apache/camel/component/wal/Header.java b/components/camel-wal/src/main/java/org/apache/camel/component/wal/Header.java
new file mode 100644
index 00000000000..80bf59a0ee3
--- /dev/null
+++ b/components/camel-wal/src/main/java/org/apache/camel/component/wal/Header.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.component.wal;
+
+/**
+ * This container represents a header of a transaction log file
+ */
+public final class Header {
+    public static final int FORMAT_NAME_SIZE = 8;
+    public static final String FORMAT_NAME = "camel-wa";
+    public static final int CURRENT_FILE_VERSION = 1;
+    public static final Header WA_DEFAULT_V1;
+    public static final int BYTES;
+    private final String formatName;
+    private final int fileVersion;
+
+    static {
+        WA_DEFAULT_V1 = new Header(FORMAT_NAME, CURRENT_FILE_VERSION);
+
+        BYTES = FORMAT_NAME_SIZE + Integer.BYTES;
+    }
+
+    Header(final String formatName, int fileVersion) {
+        if (formatName == null || formatName.isEmpty()) {
+            throw new IllegalArgumentException("The format name is not valid: it's either empty or null");
+        }
+
+        if (formatName.length() > FORMAT_NAME_SIZE) {
+            throw new IllegalArgumentException(
+                    "The format name '" + formatName + "' is too short. Its length must be less than " + FORMAT_NAME_SIZE);
+        }
+
+        this.formatName = formatName;
+        this.fileVersion = fileVersion;
+    }
+
+    public String getFormatName() {
+        return formatName;
+    }
+
+    public int getFileVersion() {
+        return fileVersion;
+    }
+}
diff --git a/components/camel-wal/src/main/java/org/apache/camel/component/wal/IOUtil.java b/components/camel-wal/src/main/java/org/apache/camel/component/wal/IOUtil.java
new file mode 100644
index 00000000000..986742b5fcc
--- /dev/null
+++ b/components/camel-wal/src/main/java/org/apache/camel/component/wal/IOUtil.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.component.wal;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+
+import org.apache.camel.component.wal.exceptions.BufferOverflow;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * I/O utilities for the write-ahead log
+ */
+final class IOUtil {
+    private static final Logger LOG = LoggerFactory.getLogger(IOUtil.class);
+
+    private IOUtil() {
+
+    }
+
+    /**
+     * Writes to the channel at a given position, clearing the source after completion
+     * 
+     * @param  fileChannel the channel to write to
+     * @param  byteBuffer  the buffer containing the bytes to write to the channel
+     * @param  position    the position to write to
+     * @return             the number of bytes written
+     * @throws IOException for any lower-level I/O failure
+     */
+    static long write(FileChannel fileChannel, ByteBuffer byteBuffer, long position) throws IOException {
+        long bytesWritten = 0;
+        byteBuffer.flip();
+
+        while (byteBuffer.hasRemaining()) {
+            bytesWritten += fileChannel.write(byteBuffer, position + bytesWritten);
+        }
+
+        byteBuffer.flip();
+        byteBuffer.clear();
+
+        return bytesWritten;
+    }
+
+    /**
+     * Writes to the channel by appending the data at the end and clearing the source after completion
+     * 
+     * @param  fileChannel the channel to write to
+     * @param  byteBuffer  the buffer containing the bytes to write to the channel
+     * @return             the number of bytes written
+     * @throws IOException for any lower-level I/O failure
+     */
+    static long write(FileChannel fileChannel, ByteBuffer byteBuffer) throws IOException {
+        long bytesWritten = 0;
+        byteBuffer.flip();
+
+        while (byteBuffer.hasRemaining()) {
+            bytesWritten += fileChannel.write(byteBuffer);
+        }
+
+        byteBuffer.flip();
+        byteBuffer.clear();
+
+        return bytesWritten;
+    }
+
+    /**
+     * Serializes a entry to the buffer
+     * 
+     * @param  buffer         the buffer where the entry will be serialized too
+     * @param  entry          the entry to serialize
+     * @throws BufferOverflow if the buffer is too small for the entry
+     */
+    static void serialize(ByteBuffer buffer, LogEntry entry) throws BufferOverflow {
+        serialize(buffer, entry.getEntryState().getCode(), entry.getKeyMetadata(), entry.getKey(), entry.getValueMetadata(),
+                entry.getValue());
+    }
+
+    /**
+     * Serializes a entry to the buffer
+     * 
+     * @param  buffer         the buffer where the entry will be serialized too
+     * @param  entryState     the entry state
+     * @param  keyMetadata    the entry metadata
+     * @param  key            the entry key
+     * @param  valueMetadata  the entry value metadata
+     * @param  value          the entry value
+     * @throws BufferOverflow if the buffer is too small for the entry
+     */
+    static void serialize(
+            ByteBuffer buffer, int entryState, int keyMetadata, byte[] key, int valueMetadata, byte[] value)
+            throws BufferOverflow {
+        checkBufferCapacity(buffer,
+                Integer.BYTES + Integer.BYTES + key.length + Integer.BYTES + Integer.BYTES + value.length);
+
+        buffer.putInt(entryState);
+        buffer.putInt(keyMetadata);
+        buffer.putInt(key.length);
+        buffer.put(key);
+        buffer.putInt(valueMetadata);
+        buffer.putInt(value.length);
+        buffer.put(value);
+    }
+
+    private static void checkBufferCapacity(ByteBuffer byteBuffer, int requestedSize) throws BufferOverflow {
+        final int remaining = byteBuffer.remaining();
+
+        if (remaining < requestedSize) {
+            if (LOG.isTraceEnabled()) {
+                LOG.trace("There is not enough space on the buffer for an offset entry: {} bytes remaining, {} bytes needed",
+                        remaining, requestedSize);
+            }
+
+            throw new BufferOverflow(remaining, requestedSize);
+        }
+    }
+}
diff --git a/components/camel-wal/src/main/java/org/apache/camel/component/wal/LogEntry.java b/components/camel-wal/src/main/java/org/apache/camel/component/wal/LogEntry.java
new file mode 100644
index 00000000000..88faeb2f134
--- /dev/null
+++ b/components/camel-wal/src/main/java/org/apache/camel/component/wal/LogEntry.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.component.wal;
+
+/**
+ * Represents a single entry in the log of transactions
+ */
+class LogEntry {
+    public enum EntryState {
+        IGNORED(-1),
+        NEW(1),
+        PROCESSED(10),
+        FAILED(20);
+
+        private final int code;
+
+        EntryState(int code) {
+            this.code = code;
+        }
+
+        public int getCode() {
+            return code;
+        }
+
+        public static EntryState fromInt(int value) {
+            switch (value) {
+                case -1:
+                    return IGNORED;
+                case 1:
+                    return NEW;
+                case 10:
+                    return PROCESSED;
+                case 20:
+                    return FAILED;
+                default:
+                    throw new IllegalArgumentException("Invalid state with value " + value);
+            }
+        }
+
+    }
+
+    private EntryState entryState;
+    private int keyMetadata;
+    private final byte[] key;
+    private int valueMetadata;
+    private final byte[] value;
+
+    LogEntry(EntryState entryState, int keyMetadata, byte[] key, int valueMetadata, byte[] value) {
+        this.entryState = entryState;
+        this.keyMetadata = keyMetadata;
+        this.key = key;
+        this.valueMetadata = valueMetadata;
+        this.value = value;
+    }
+
+    public EntryState getEntryState() {
+        return entryState;
+    }
+
+    public int getKeyMetadata() {
+        return keyMetadata;
+    }
+
+    public void setKeyMetadata(int keyMetadata) {
+        this.keyMetadata = keyMetadata;
+    }
+
+    public int getValueMetadata() {
+        return valueMetadata;
+    }
+
+    public void setValueMetadata(int valueMetadata) {
+        this.valueMetadata = valueMetadata;
+    }
+
+    public byte[] getKey() {
+        return key;
+    }
+
+    public byte[] getValue() {
+        return value;
+    }
+
+    public void setEntryState(EntryState entryState) {
+        this.entryState = entryState;
+    }
+
+    /**
+     * Returns the size of this record (sum of the sizes of: entry state, key metadata, key length, key, value metadata,
+     * value length and value)
+     * 
+     * @return the size in bytes
+     */
+    public int size() {
+        return size(key, value);
+    }
+
+    /**
+     * Returns the size of a record (sum of the sizes of: entry state, key metadata, key length, key, value metadata,
+     * value length and value)
+     *
+     * @param  key   the entry key
+     * @param  value the entry value
+     * @return       the size in bytes
+     */
+    public static int size(byte[] key, byte[] value) {
+        return Integer.BYTES + Integer.BYTES + Integer.BYTES + key.length + Integer.BYTES + Integer.BYTES + value.length;
+    }
+}
diff --git a/components/camel-wal/src/main/java/org/apache/camel/component/wal/LogReader.java b/components/camel-wal/src/main/java/org/apache/camel/component/wal/LogReader.java
new file mode 100644
index 00000000000..db071f31347
--- /dev/null
+++ b/components/camel-wal/src/main/java/org/apache/camel/component/wal/LogReader.java
@@ -0,0 +1,241 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.component.wal;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+import java.nio.file.StandardOpenOption;
+
+import org.apache.camel.component.wal.exceptions.BufferTooSmallException;
+import org.apache.camel.component.wal.exceptions.InvalidRecordException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A reader for write-ahead log files
+ */
+public class LogReader implements AutoCloseable {
+    public static final int DEFAULT_CAPACITY = 1024 * 512;
+    private static final Logger LOG = LoggerFactory.getLogger(LogReader.class);
+
+    private final FileChannel fileChannel;
+    private final ByteBuffer ioBuffer;
+    private final Header header;
+
+    /**
+     * Constructor
+     *
+     * @param  logFile     the report file name
+     * @throws IOException in case of I/O errors
+     */
+    public LogReader(final File logFile) throws IOException {
+        this(logFile, DEFAULT_CAPACITY);
+    }
+
+    /**
+     * Constructor
+     *
+     * @param  logFile     the report file name
+     * @throws IOException in case of I/O errors
+     */
+    public LogReader(final File logFile, int capacity) throws IOException {
+        this.fileChannel = FileChannel.open(logFile.toPath(), StandardOpenOption.READ);
+        ioBuffer = ByteBuffer.allocateDirect(capacity);
+
+        header = readHeader();
+    }
+
+    /**
+     * Gets the file header
+     *
+     * @return the file header
+     */
+    public Header getHeader() {
+        return header;
+    }
+
+    /**
+     * Reads the header from the file
+     * 
+     * @return             the header or null if the file is empty
+     * @throws IOException in case of lower-level I/O errors
+     */
+    private Header readHeader() throws IOException {
+        if (fileChannel.size() == 0) {
+            return null;
+        }
+
+        ioBuffer.clear();
+        int bytesRead = fileChannel.read(ioBuffer);
+
+        if (bytesRead <= 0) {
+            throw new IllegalArgumentException("The file does not contain a valid header");
+        }
+
+        LOG.trace("Read {} bytes from the file channel", bytesRead);
+        ioBuffer.flip();
+
+        byte[] name = new byte[Header.FORMAT_NAME_SIZE];
+        ioBuffer.get(name, 0, Header.FORMAT_NAME_SIZE);
+        LOG.trace("File format name: '{}'", new String(name));
+
+        int fileVersion = ioBuffer.getInt();
+        LOG.trace("File format version: '{}'", fileVersion);
+
+        return new Header(new String(name), fileVersion);
+    }
+
+    /**
+     * Read an entry from the file.
+     * 
+     * @return             A log entry from the file or null when reaching the end-of-file or if the file is empty
+     * @throws IOException if unable to read the entry
+     */
+    public PersistedLogEntry readEntry() throws IOException {
+        if (header == null) {
+            return null;
+        }
+
+        logBufferInfo();
+
+        if (ioBuffer.hasRemaining()) {
+            return doReadEntry();
+        }
+
+        if (LOG.isTraceEnabled()) {
+            LOG.trace("Read it all from the buffer. Fetching again from the channel");
+        }
+
+        if (!reload()) {
+            return null;
+        }
+
+        return doReadEntry();
+    }
+
+    /**
+     * A lower-level routine to read a single entry from the transaction log
+     * 
+     * @return             A log entry from the file or null when reaching the end-of-file or if the file is empty
+     * @throws IOException if unable to read the entry
+     */
+    private PersistedLogEntry doReadEntry() throws IOException {
+        if (ioBuffer.remaining() < Integer.BYTES) {
+            if (!reload()) {
+                return null;
+            }
+        }
+
+        int state = ioBuffer.getInt();
+
+        Slot keySlot = readSlot();
+        Slot valueSlot = readSlot();
+
+        EntryInfo entryInfo = EntryInfo.createForPersisted(fileChannel.position());
+
+        return new PersistedLogEntry(
+                entryInfo, LogEntry.EntryState.fromInt(state), keySlot.metadata, keySlot.data,
+                valueSlot.metadata, valueSlot.data);
+    }
+
+    /**
+     * Reads a data slot (i.e.: containing a key or a value)
+     * 
+     * @return             the data slot
+     * @throws IOException if the record is invalid or the data too large for the buffer
+     */
+    private Slot readSlot() throws IOException {
+        Slot slot = new Slot();
+
+        // The buffer needs to have enough space for the metadata and length.
+        if (ioBuffer.remaining() < (Integer.BYTES * 2)) {
+            if (!reload()) {
+                throw new InvalidRecordException("A data slot within a record is incomplete or malformed");
+            }
+        }
+        slot.metadata = ioBuffer.getInt();
+        slot.length = ioBuffer.getInt();
+
+        if (ioBuffer.capacity() < slot.length) {
+            throw new BufferTooSmallException(ioBuffer.capacity(), slot.length);
+        }
+
+        if (ioBuffer.remaining() < slot.length) {
+            if (!reload()) {
+                throw new InvalidRecordException("A data slot within a record is incomplete or malformed");
+            }
+        }
+
+        slot.data = new byte[slot.length];
+        ioBuffer.get(slot.data);
+
+        return slot;
+    }
+
+    /**
+     * Reloads data into the intermediate buffer, compacting it on the process
+     * 
+     * @return             true if has read data into the buffer (reloaded) or false otherwise
+     * @throws IOException in case of lower-level I/O errors
+     */
+    private boolean reload() throws IOException {
+        try {
+            ioBuffer.compact();
+
+            int read = fileChannel.read(ioBuffer);
+            if (read > 0) {
+                return true;
+            }
+        } finally {
+            ioBuffer.flip();
+        }
+        return false;
+    }
+
+    private void logBufferInfo() {
+        if (LOG.isTraceEnabled()) {
+            LOG.trace("Remaining: {}", ioBuffer.remaining());
+            LOG.trace("Position: {}", ioBuffer.position());
+            LOG.trace("Has Remaining: {}", ioBuffer.hasRemaining());
+        }
+    }
+
+    /**
+     * Close the reader and release resources
+     */
+    @Override
+    public void close() {
+        try {
+            fileChannel.close();
+        } catch (IOException e) {
+            LOG.error(e.getMessage(), e);
+        }
+    }
+
+    /**
+     * A wrapper for a data slot
+     */
+    private static class Slot {
+        int metadata;
+        int length;
+
+        byte[] data;
+    }
+}
diff --git a/components/camel-wal/src/main/java/org/apache/camel/component/wal/LogSupervisor.java b/components/camel-wal/src/main/java/org/apache/camel/component/wal/LogSupervisor.java
new file mode 100644
index 00000000000..2a5bc90600d
--- /dev/null
+++ b/components/camel-wal/src/main/java/org/apache/camel/component/wal/LogSupervisor.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.component.wal;
+
+/**
+ * A supervisor can be used to specify custom supervising activities (such as flushing to disk, recycling, etc) for the
+ * log writer
+ */
+public interface LogSupervisor {
+
+    /**
+     * Starts the flush policy
+     * 
+     * @param runnable the code to be executed by the log supervisor
+     */
+    void start(Runnable runnable);
+
+    /**
+     * Stops the policy
+     */
+    void stop();
+}
diff --git a/components/camel-wal/src/main/java/org/apache/camel/component/wal/LogWriter.java b/components/camel-wal/src/main/java/org/apache/camel/component/wal/LogWriter.java
new file mode 100644
index 00000000000..92375ecb92d
--- /dev/null
+++ b/components/camel-wal/src/main/java/org/apache/camel/component/wal/LogWriter.java
@@ -0,0 +1,277 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.component.wal;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+import java.nio.file.StandardOpenOption;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import org.apache.camel.RuntimeCamelException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A writer for write-ahead log files
+ */
+public final class LogWriter implements AutoCloseable {
+    /**
+     * The default buffer capacity: 512 KiB
+     */
+    public static final int DEFAULT_CAPACITY = 1024 * 512;
+    private static final Logger LOG = LoggerFactory.getLogger(LogWriter.class);
+
+    private final FileChannel fileChannel;
+
+    private final LogSupervisor flushPolicy;
+    private final TransactionLog transactionLog;
+
+    private long startOfRecords;
+
+    /**
+     * Constructs a new log writer with the default capacity {@link LogWriter#DEFAULT_CAPACITY} (512 KiB). If the file
+     * already exists, it will be truncated.
+     *
+     * @param  logFile       the transaction log file
+     * @param  logSupervisor the log supervisor {@link LogSupervisor} for the writer
+     * @throws IOException   in case of I/O errors
+     */
+    public LogWriter(File logFile, LogSupervisor logSupervisor) throws IOException {
+        this(logFile, logSupervisor, DEFAULT_CAPACITY);
+    }
+
+    /**
+     * Constructs a new log writer with the default capacity {@link LogWriter#DEFAULT_CAPACITY} (512 KiB). If the file
+     * already exists, it will be truncated.
+     *
+     * @param  logFile        the transaction log file
+     * @param  logSupervisor  the log supervisor {@link LogSupervisor} for the writer
+     * @param  maxRecordCount the maximum number of records to keep in the file. Beyond this count, entries will be
+     *                        rolled-over.
+     * @throws IOException    in case of I/O errors
+     */
+    LogWriter(File logFile, LogSupervisor logSupervisor, int maxRecordCount) throws IOException {
+        this.fileChannel = FileChannel.open(logFile.toPath(), StandardOpenOption.CREATE, StandardOpenOption.WRITE,
+                StandardOpenOption.TRUNCATE_EXISTING);
+
+        final Header header = Header.WA_DEFAULT_V1;
+        writeHeader(header);
+
+        this.flushPolicy = logSupervisor;
+        this.transactionLog = new TransactionLog(maxRecordCount);
+        this.flushPolicy.start(this::tryFlush);
+    }
+
+    /**
+     * Flushes the data to disk
+     *
+     * @throws IOException in case of I/O errors
+     */
+    void flush() throws IOException {
+        fileChannel.force(true);
+    }
+
+    private synchronized void tryFlush() {
+        try {
+            flush();
+        } catch (IOException e) {
+            LOG.error("Unable to save record: {}", e.getMessage(), e);
+            throw new RuntimeException(e);
+        }
+    }
+
+    public void reset() throws IOException {
+        fileChannel.truncate(startOfRecords);
+        fileChannel.position(startOfRecords);
+    }
+
+    @Override
+    public void close() {
+        try {
+            flushPolicy.stop();
+            flush();
+
+            fileChannel.close();
+        } catch (IOException e) {
+            LOG.error(e.getMessage(), e);
+        }
+    }
+
+    private void writeHeader(final Header header) throws IOException {
+        ByteBuffer headerBuffer = ByteBuffer.allocate(Header.BYTES);
+
+        headerBuffer.put(header.getFormatName().getBytes());
+        headerBuffer.putInt(header.getFileVersion());
+
+        IOUtil.write(fileChannel, headerBuffer);
+
+        startOfRecords = fileChannel.position();
+    }
+
+    /**
+     * Appends an entry to the transaction log file
+     *
+     * @param  entry       the entry to write to the transaction log
+     * @return             An entry info instance with the metadata for the appended log entry
+     * @throws IOException for lower-level I/O errors
+     */
+    public EntryInfo.CachedEntryInfo append(LogEntry entry) throws IOException {
+        final TransactionLog.LayerInfo layerInfo = transactionLog.add(entry);
+        if (layerInfo.getLayer() == 0) {
+            return persist(layerInfo, entry);
+        }
+
+        if (layerInfo.isRollingOver()) {
+            reset();
+        }
+
+        LOG.trace("Writing at position {}", fileChannel.position());
+        EntryInfo.CachedEntryInfo spear = persist(layerInfo, entry);
+
+        final List<EntryInfo> collect = transactionLog.stream()
+                .filter(c -> c != null && c.layerInfo.getLayer() != transactionLog.currentLayer())
+                .map(e -> tryPersist(layerInfo, e.logEntry)).collect(Collectors.toList());
+
+        if (collect.size() > 0) {
+            final EntryInfo lastOnLayer = collect.get(0);
+
+            LOG.trace("Current pos is: {}", fileChannel.position());
+            LOG.trace("Next pos should be: {}", lastOnLayer.getPosition());
+
+            fileChannel.position(lastOnLayer.getPosition());
+            LOG.trace("Current pos now is: {}", fileChannel.position());
+        }
+
+        return spear;
+    }
+
+    /**
+     * Persists an entry to the log
+     * 
+     * @param  layerInfo   the in-memory layer information about the record being persisted
+     * @param  entry       the entry to persist
+     * @param  position    the position in the channel where the entry will be persisted
+     * @throws IOException in case of lower-level I/O errors
+     */
+    private void persist(TransactionLog.LayerInfo layerInfo, LogEntry entry, long position) throws IOException {
+        ByteBuffer updateBuffer = ByteBuffer.allocate(entry.size());
+
+        IOUtil.serialize(updateBuffer, entry);
+
+        if (LOG.isTraceEnabled()) {
+            LOG.trace("Position: {} for record {} with key {}", position, layerInfo, new String(entry.getKey()));
+        }
+
+        long size = IOUtil.write(fileChannel, updateBuffer, position);
+
+        if (size == 0) {
+            LOG.warn("No bytes written for the given record!");
+        }
+    }
+
+    /**
+     * Persists an entry to the log
+     * 
+     * @param  layerInfo   the in-memory layer information about the record being persisted
+     * @param  entry       the entry to persist
+     * @return             an {@link EntryInfo} instance with details of the entry that was just persisted
+     * @throws IOException in case of lower-level I/O errors
+     */
+    private EntryInfo.CachedEntryInfo persist(TransactionLog.LayerInfo layerInfo, LogEntry entry) throws IOException {
+        final byte[] key = entry.getKey();
+        final byte[] value = entry.getValue();
+
+        ByteBuffer writeBuffer = ByteBuffer.allocate(LogEntry.size(key, value));
+        IOUtil.serialize(writeBuffer, entry);
+
+        long recordPosition = fileChannel.position();
+        IOUtil.write(fileChannel, writeBuffer);
+
+        return EntryInfo.createForCached(recordPosition, layerInfo);
+    }
+
+    /**
+     * A wrapper for {@link LogWriter#persist(TransactionLog.LayerInfo, LogEntry)} that throws runtime errors on failure
+     * 
+     * @param  layerInfo the in-memory layer information about the record being persisted
+     * @param  entry     the entry to persist
+     * @return           an {@link EntryInfo} instance with details of the entry that was just persisted
+     */
+    private EntryInfo tryPersist(TransactionLog.LayerInfo layerInfo, LogEntry entry) {
+        try {
+            return persist(layerInfo, entry);
+        } catch (IOException e) {
+            throw new RuntimeCamelException(e);
+        }
+    }
+
+    /**
+     * Updates the state of af entry (i.e.: to mark them after they have seen successfully processed)
+     * 
+     * @param  entryInfo   the entry information about the entry being updated
+     * @param  state       the state to update the entry to
+     * @throws IOException in case of lower-level I/O errors
+     */
+    public void updateState(EntryInfo.CachedEntryInfo entryInfo, LogEntry.EntryState state) throws IOException {
+        final TransactionLog.LayerInfo layerInfo = entryInfo.getLayerInfo();
+
+        /*
+         If it has layer information, then it's a hot record kept in the cache. In this case, just
+         update the cache and let the LogSupervisor flush to disk.
+        
+         Trying to update a persisted entry here is not acceptable
+         */
+        assert layerInfo != null;
+
+        final LogEntry logEntry = transactionLog.update(layerInfo, state);
+
+        if (logEntry != null) {
+            persist(layerInfo, logEntry, entryInfo.getPosition());
+        }
+    }
+
+    /**
+     * Updates the state of af entry that has been already persisted to disk. Wraps any lower-level I/O errors in
+     * runtime exceptions
+     * 
+     * @param  entry       the entry to update
+     * @param  state       the state to update the entry to
+     * @throws IOException if the buffer is too small for the entry or in case of lower-level I/O errors
+     */
+    public void updateState(PersistedLogEntry entry, LogEntry.EntryState state) throws IOException {
+        ByteBuffer updateBuffer = ByteBuffer.allocate(entry.size());
+
+        IOUtil.serialize(updateBuffer, state.getCode(), entry.getKeyMetadata(), entry.getKey(), entry.getValueMetadata(),
+                entry.getValue());
+
+        final EntryInfo entryInfo = entry.getEntryInfo();
+        if (LOG.isTraceEnabled()) {
+            LOG.trace("Position: {} with key {}", entryInfo.getPosition(), new String(entry.getKey()));
+        }
+
+        long size = IOUtil.write(fileChannel, updateBuffer, entryInfo.getPosition());
+
+        if (size == 0) {
+            LOG.warn("No bytes written for the given record!");
+        }
+    }
+
+}
diff --git a/components/camel-wal/src/main/java/org/apache/camel/component/wal/PersistedLogEntry.java b/components/camel-wal/src/main/java/org/apache/camel/component/wal/PersistedLogEntry.java
new file mode 100644
index 00000000000..81ae2c9b8dd
--- /dev/null
+++ b/components/camel-wal/src/main/java/org/apache/camel/component/wal/PersistedLogEntry.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.component.wal;
+
+/**
+ * This class represents a log entry that has been persisted to disk
+ */
+class PersistedLogEntry extends LogEntry {
+    private final EntryInfo entryInfo;
+
+    public PersistedLogEntry(EntryInfo entryInfo, EntryState entryState, int keyMetadata, byte[] key, int valueMetadata,
+                             byte[] value) {
+        super(entryState, keyMetadata, key, valueMetadata, value);
+
+        this.entryInfo = entryInfo;
+    }
+
+    public EntryInfo getEntryInfo() {
+        return entryInfo;
+    }
+}
diff --git a/components/camel-wal/src/main/java/org/apache/camel/component/wal/TransactionLog.java b/components/camel-wal/src/main/java/org/apache/camel/component/wal/TransactionLog.java
new file mode 100644
index 00000000000..c37aba04b61
--- /dev/null
+++ b/components/camel-wal/src/main/java/org/apache/camel/component/wal/TransactionLog.java
@@ -0,0 +1,216 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.component.wal;
+
+import java.util.Arrays;
+import java.util.stream.Stream;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This represents an in-memory transaction log. It's the source from where the log entries are saved to the channel
+ * and, subsequently, flushed to disk. It is backed by a layered circular-buffer (i.e.: a regular circular buffer that
+ * increments a layer index for every loop iteration). The entries are rolled-over whenever the capacity reach its
+ * maximum value and updates to those rolled-over records will be silently discarded (even if in-error). The layer
+ * information is not persisted to disk as it is relevant only for determining whether to update or discard records.
+ */
+class TransactionLog {
+    private static final Logger LOG = LoggerFactory.getLogger(TransactionLog.class);
+
+    /**
+     * Contains the layer information for log entries
+     */
+    public static class LayerInfo {
+        private final int index;
+        private final int layer;
+        private final boolean isRollingOver;
+
+        public LayerInfo(int index, int layer, boolean isRollingOver) {
+            this.index = index;
+            this.layer = layer;
+            this.isRollingOver = isRollingOver;
+        }
+
+        public int getIndex() {
+            return index;
+        }
+
+        public int getLayer() {
+            return layer;
+        }
+
+        public boolean isRollingOver() {
+            return isRollingOver;
+        }
+
+        @Override
+        public String toString() {
+            return "LayerInfo{" +
+                   "index=" + index +
+                   ", layer=" + layer +
+                   ", isRollingOver=" + isRollingOver +
+                   '}';
+        }
+    }
+
+    /**
+     * A container for an in-memory entry that can be used to determine the layer where the record is as well as obtain
+     * it's entry.
+     */
+    static class EntryContainer {
+        LayerInfo layerInfo;
+        LogEntry logEntry;
+
+        public EntryContainer(LayerInfo layerInfo, LogEntry logEntry) {
+            this.layerInfo = layerInfo;
+            this.logEntry = logEntry;
+        }
+    }
+
+    private final int maxCapacity;
+    private final EntryContainer[] logEntries;
+
+    private int currentIndex;
+    private int currentLayer;
+
+    /**
+     * Creates a new transaction log with the given capacity
+     * 
+     * @param capacity the capacity of the circular buffer that backs this in-memory transaction log
+     */
+    public TransactionLog(int capacity) {
+        this.maxCapacity = capacity;
+        logEntries = new EntryContainer[capacity];
+    }
+
+    /**
+     * Adds a new entry to the log
+     * 
+     * @param  logEntry the entry to add
+     * @return          The information about the layer in the buffer where the entry was added
+     */
+    public LayerInfo add(LogEntry logEntry) {
+        boolean rollingOver = false;
+        if (currentIndex >= maxCapacity) {
+            currentLayer++;
+            currentIndex = 0;
+            rollingOver = true;
+        }
+
+        final EntryContainer entryContainer
+                = new EntryContainer(new LayerInfo(currentIndex, currentLayer, rollingOver), logEntry);
+        logEntries[currentIndex] = entryContainer;
+        currentIndex++;
+
+        return entryContainer.layerInfo;
+    }
+
+    /**
+     * Given the information about an entry, it determines whether it can be updated or not
+     * 
+     * @param  transactionLogLayer the layer in the transaction log
+     * @param  transactionLogIndex the index in the transaction log
+     * @param  entryInfoLayer      the layer for the entry
+     * @param  entryInfoIndex      the index for the entry
+     * @return                     true if it can be updated or false otherwise
+     */
+    static boolean canUpdate(int transactionLogLayer, int transactionLogIndex, int entryInfoLayer, int entryInfoIndex) {
+        if (transactionLogLayer == entryInfoLayer) {
+            // Must make sure to not update beyond the current index
+            if (transactionLogIndex >= entryInfoIndex) {
+                return true;
+            }
+        }
+
+        if (transactionLogLayer > entryInfoLayer) {
+            if (transactionLogIndex < entryInfoIndex) {
+                return true;
+            }
+        }
+
+        return false;
+    }
+
+    /**
+     * Given the layer information for an entry, it determines if it can be updated via
+     * {@link TransactionLog#canUpdate(int, int, int, int)}
+     * 
+     * @param  layerInfo the layer information
+     * @return           true if it can be updated or false otherwise
+     */
+    private boolean canUpdate(LayerInfo layerInfo) {
+        return canUpdate(currentLayer, currentIndex, layerInfo.getLayer(), layerInfo.getIndex());
+    }
+
+    /**
+     * Tries to update an entry in the in-memory transaction log
+     * 
+     * @param  layerInfo the layer information for the entry
+     * @param  state     the state to update the entry to
+     * @return           the updated entry or null if the record was rolled-over and discarded
+     */
+    public LogEntry update(LayerInfo layerInfo, LogEntry.EntryState state) {
+        if (layerInfo == null) {
+            if (state != LogEntry.EntryState.PROCESSED) {
+                LOG.warn(
+                        "Discarded an unprocessed record because the layer information is not available: it may have been rolled over");
+            }
+
+            return null;
+        }
+
+        if (canUpdate(layerInfo)) {
+            LOG.debug("Updating record with layer: {}", layerInfo);
+            EntryContainer container = logEntries[layerInfo.getIndex()];
+
+            container.logEntry.setEntryState(state);
+
+            return container.logEntry;
+        } else {
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("Discarded a record because it has been rolled-over. Record layer: {}", layerInfo);
+            }
+
+            if (state == LogEntry.EntryState.FAILED) {
+                LOG.warn(
+                        "An update of failed record was discarded because it has been rolled over (it may have taken too long to report back)");
+            }
+        }
+
+        return null;
+    }
+
+    /**
+     * Returns a sequential stream of the entries
+     * 
+     * @return a sequential stream of the entries
+     */
+    public Stream<EntryContainer> stream() {
+        return Arrays.stream(logEntries);
+    }
+
+    /**
+     * Gets the current layer
+     * 
+     * @return the current layer
+     */
+    public int currentLayer() {
+        return currentLayer;
+    }
+}
diff --git a/components/camel-wal/src/main/java/org/apache/camel/component/wal/WriteAheadResumeStrategy.java b/components/camel-wal/src/main/java/org/apache/camel/component/wal/WriteAheadResumeStrategy.java
new file mode 100644
index 00000000000..3bd1b501777
--- /dev/null
+++ b/components/camel-wal/src/main/java/org/apache/camel/component/wal/WriteAheadResumeStrategy.java
@@ -0,0 +1,311 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.component.wal;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import org.apache.camel.resume.Deserializable;
+import org.apache.camel.resume.Offset;
+import org.apache.camel.resume.OffsetKey;
+import org.apache.camel.resume.Resumable;
+import org.apache.camel.resume.ResumeAdapter;
+import org.apache.camel.resume.ResumeStrategy;
+import org.apache.camel.support.resume.OffsetKeys;
+import org.apache.camel.support.resume.Offsets;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A resume strategy that uses a write-ahead strategy to keep a transaction log of the in-processing and processed
+ * records. This strategy works by wrapping another strategy. This increases the reliability of the resume API by
+ * ensuring that records are saved locally before being sent to the remote data storage used by the resume strategy,
+ * thus guaranteeing that records can re recovered in case of crash of that system.
+ *
+ * Among other things, it implements data recovery on startup, so that records cached locally, are automatically
+ * recovered
+ */
+public class WriteAheadResumeStrategy implements ResumeStrategy {
+
+    /**
+     * An update callback that works for this strategy as well as for the delegate resume strategy that is wrapped in
+     * the WriteAheadResumeStrategy
+     */
+    private static class DelegateCallback implements UpdateCallBack {
+        private final UpdateCallBack updateCallBack;
+        private final UpdateCallBack flushCallBack;
+
+        public DelegateCallback(UpdateCallBack updateCallBack, UpdateCallBack flushCallBack) {
+            this.updateCallBack = updateCallBack;
+            this.flushCallBack = flushCallBack;
+        }
+
+        @Override
+        public void onUpdate(Throwable throwable) {
+            flushCallBack.onUpdate(throwable);
+            updateCallBack.onUpdate(throwable);
+        }
+    }
+
+    private static final Logger LOG = LoggerFactory.getLogger(WriteAheadResumeStrategy.class);
+    private final File logFile;
+    private final LogWriter logWriter;
+    private final ResumeStrategy resumeStrategy;
+
+    /**
+     * Creates a new write-ahead resume strategy
+     * 
+     * @param  logFile        the transaction log file
+     * @param  resumeStrategy a
+     * @throws IOException
+     */
+    public WriteAheadResumeStrategy(File logFile, ResumeStrategy resumeStrategy) throws IOException {
+        this.logFile = logFile;
+        this.resumeStrategy = resumeStrategy;
+
+        DefaultLogSupervisor flushPolicy = new DefaultLogSupervisor(100);
+        logWriter = new LogWriter(logFile, flushPolicy);
+    }
+
+    @Override
+    public void setAdapter(ResumeAdapter adapter) {
+        resumeStrategy.setAdapter(adapter);
+    }
+
+    @Override
+    public ResumeAdapter getAdapter() {
+        return resumeStrategy.getAdapter();
+    }
+
+    @Override
+    public <T extends Resumable> void updateLastOffset(T offset) throws Exception {
+        updateLastOffset(offset, null);
+    }
+
+    @Override
+    public <T extends Resumable> void updateLastOffset(T offset, UpdateCallBack updateCallBack) throws Exception {
+        OffsetKey<?> offsetKey = offset.getOffsetKey();
+        Offset<?> offsetValue = offset.getLastOffset();
+
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("Updating offset on Kafka with key {} to {}", offsetKey.getValue(), offsetValue.getValue());
+        }
+
+        updateLastOffset(offsetKey, offsetValue, updateCallBack);
+    }
+
+    /**
+     * Handles the result of an offset update for cached entries (i.e.: those kept on the in-memory transaction log)
+     * 
+     * @param entryInfo the information about the entry that was updated
+     * @param t         a instance of any throwable class that was thrown by the delegate resume strategy during update,
+     *                  if none, then can be null
+     */
+    private void handleResult(EntryInfo.CachedEntryInfo entryInfo, Throwable t) {
+        try {
+            if (t == null) {
+                logWriter.updateState(entryInfo, LogEntry.EntryState.PROCESSED);
+            } else {
+                logWriter.updateState(entryInfo, LogEntry.EntryState.FAILED);
+            }
+        } catch (IOException e) {
+            if (t == null) {
+                LOG.error("Unable to update state: {}", e.getMessage(), e);
+            } else {
+                LOG.error("Unable to mark the record as failed: {}", e.getMessage(), e);
+            }
+        }
+    }
+
+    /**
+     * Handles the result of an offset update for persisted entries (i.e.: those already saved to permanent storage)
+     * 
+     * @param entry the information about the entry that was updated
+     * @param t     a instance of any throwable class that was thrown by the delegate resume strategy during update, if
+     *              none, then can be null
+     */
+    private void handleResult(PersistedLogEntry entry, Throwable t) {
+        try {
+            if (t == null) {
+                logWriter.updateState(entry, LogEntry.EntryState.PROCESSED);
+            } else {
+                logWriter.updateState(entry, LogEntry.EntryState.FAILED);
+            }
+        } catch (IOException e) {
+            if (t == null) {
+                LOG.error("Unable to update state: {}", e.getMessage(), e);
+            } else {
+                LOG.error("Unable to mark the record as failed: {}", e.getMessage(), e);
+            }
+        }
+    }
+
+    @Override
+    public void updateLastOffset(OffsetKey<?> offsetKey, Offset<?> offsetValue) throws Exception {
+        updateLastOffset(offsetKey, offsetValue, null);
+    }
+
+    @Override
+    public void updateLastOffset(OffsetKey<?> offsetKey, Offset<?> offsetValue, UpdateCallBack updateCallBack)
+            throws Exception {
+        ByteBuffer keyBuffer = offsetKey.serialize();
+        ByteBuffer valueBuffer = offsetValue.serialize();
+
+        EntryInfo.CachedEntryInfo entryInfo;
+        try {
+            LogEntry entry = new LogEntry(
+                    LogEntry.EntryState.NEW, 0,
+                    keyBuffer.array(), 0, valueBuffer.array());
+
+            entryInfo = logWriter.append(entry);
+        } catch (IOException e) {
+            LOG.error("Unable to append a new record to the transaction log. The system will try to update the record " +
+                      "on the delegate strategy before forcing the failure");
+
+            tryUpdateDelegate(offsetKey, offsetValue, (EntryInfo.CachedEntryInfo) null, updateCallBack);
+            throw e;
+        }
+
+        tryUpdateDelegate(offsetKey, offsetValue, entryInfo, updateCallBack);
+    }
+
+    /**
+     * Tries to update the offset in the delegate strategy, ensuring the entry on log reflects the success or failure of
+     * the update request
+     * 
+     * @param  offsetKey      the offset key to update
+     * @param  offsetValue    the offset value to update
+     * @param  entryInfo      the information about the entry being updated
+     * @param  updateCallBack a callback to be executed after the updated has occurred (null if not available)
+     * @throws Exception
+     */
+    private void tryUpdateDelegate(
+            OffsetKey<?> offsetKey, Offset<?> offsetValue, EntryInfo.CachedEntryInfo entryInfo, UpdateCallBack updateCallBack)
+            throws Exception {
+        try {
+            UpdateCallBack delegateCallback = resolveUpdateCallBack(entryInfo, updateCallBack);
+
+            resumeStrategy.updateLastOffset(offsetKey, offsetValue, delegateCallback);
+        } catch (Throwable throwable) {
+            if (entryInfo != null) {
+                logWriter.updateState(entryInfo, LogEntry.EntryState.FAILED);
+            } else {
+                LOG.warn("Not updating the state on the transaction log before there's no entry information: it's likely " +
+                         "that a previous attempt to append the record has failed and the system is now in error");
+            }
+
+            throw throwable;
+        }
+    }
+
+    /**
+     * Tries to update the offset in the delegate strategy, ensuring the entry on log reflects the success or failure of
+     * the update request
+     * 
+     * @param  offsetKey      the offset key to update
+     * @param  offsetValue    the offset value to update
+     * @param  entry          the entry being updated
+     * @param  updateCallBack a callback to be executed after the updated has occurred (null if not available)
+     * @throws Exception
+     */
+    private void tryUpdateDelegate(
+            OffsetKey<?> offsetKey, Offset<?> offsetValue, PersistedLogEntry entry, UpdateCallBack updateCallBack)
+            throws Exception {
+        try {
+            UpdateCallBack delegateCallback = resolveUpdateCallBack(entry, updateCallBack);
+
+            resumeStrategy.updateLastOffset(offsetKey, offsetValue, delegateCallback);
+        } catch (Throwable throwable) {
+            logWriter.updateState(entry, LogEntry.EntryState.FAILED);
+
+            throw throwable;
+        }
+    }
+
+    private UpdateCallBack resolveUpdateCallBack(EntryInfo.CachedEntryInfo entryInfo, UpdateCallBack updateCallBack) {
+        if (updateCallBack == null) {
+            return t -> handleResult(entryInfo, t);
+        } else {
+            return new DelegateCallback(updateCallBack, t -> handleResult(entryInfo, t));
+        }
+    }
+
+    private UpdateCallBack resolveUpdateCallBack(PersistedLogEntry entry, UpdateCallBack updateCallBack) {
+        if (updateCallBack == null) {
+            return t -> handleResult(entry, t);
+        } else {
+            return new DelegateCallback(updateCallBack, t -> handleResult(entry, t));
+        }
+    }
+
+    @Override
+    public void loadCache() throws Exception {
+        LOG.debug("Loading cache for the delegate strategy");
+        resumeStrategy.loadCache();
+        LOG.debug("Done loading cache for the delegate strategy");
+
+        try (LogReader reader = new LogReader(logFile)) {
+
+            int updatedCount = 0;
+            LOG.trace("Starting to read log entries");
+            PersistedLogEntry logEntry;
+            do {
+                logEntry = reader.readEntry();
+                if (logEntry != null) {
+                    final LogEntry.EntryState entryState = logEntry.getEntryState();
+                    if (entryState == LogEntry.EntryState.NEW || entryState == LogEntry.EntryState.FAILED) {
+                        final ResumeAdapter adapter = resumeStrategy.getAdapter();
+
+                        if (adapter instanceof Deserializable) {
+                            Deserializable deserializable = (Deserializable) adapter;
+
+                            Object oKey = deserializable.deserializeKey(ByteBuffer.wrap(logEntry.getKey()));
+                            Object value = deserializable.deserializeValue(ByteBuffer.wrap(logEntry.getValue()));
+
+                            tryUpdateDelegate(OffsetKeys.of(oKey), Offsets.of(value), logEntry, null);
+                            updatedCount++;
+                        }
+                    }
+                }
+
+            } while (logEntry != null);
+            LOG.trace("Finished reading log entries");
+
+            if (updatedCount == 0) {
+                logWriter.reset();
+            }
+        }
+    }
+
+    @Override
+    public void start() {
+        resumeStrategy.start();
+    }
+
+    @Override
+    public void stop() {
+        LOG.trace("Stopping the delegate strategy");
+        resumeStrategy.stop();
+        LOG.trace("Done stopping the delegate strategy");
+
+        LOG.trace("Closing the writer");
+        logWriter.close();
+        LOG.trace("Writer is closed");
+    }
+}
diff --git a/components/camel-wal/src/main/java/org/apache/camel/component/wal/exceptions/BufferOverflow.java b/components/camel-wal/src/main/java/org/apache/camel/component/wal/exceptions/BufferOverflow.java
new file mode 100644
index 00000000000..dc23fabafa9
--- /dev/null
+++ b/components/camel-wal/src/main/java/org/apache/camel/component/wal/exceptions/BufferOverflow.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.component.wal.exceptions;
+
+import java.io.IOException;
+
+import static java.lang.String.format;
+
+public class BufferOverflow extends IOException {
+    private final int remaining;
+    private final int requested;
+
+    public BufferOverflow(int remaining, int requested) {
+        super(format("There is not enough space on the buffer for an offset entry: %d bytes remaining, %d bytes needed",
+                remaining, requested));
+
+        this.remaining = remaining;
+        this.requested = requested;
+    }
+
+    public int getRemaining() {
+        return remaining;
+    }
+
+    public int getRequested() {
+        return requested;
+    }
+}
diff --git a/components/camel-wal/src/main/java/org/apache/camel/component/wal/exceptions/BufferTooSmallException.java b/components/camel-wal/src/main/java/org/apache/camel/component/wal/exceptions/BufferTooSmallException.java
new file mode 100644
index 00000000000..b38ef6b9e9c
--- /dev/null
+++ b/components/camel-wal/src/main/java/org/apache/camel/component/wal/exceptions/BufferTooSmallException.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.component.wal.exceptions;
+
+import java.io.IOException;
+
+import static java.lang.String.format;
+
+public class BufferTooSmallException extends IOException {
+    private final int remaining;
+    private final int requested;
+
+    public BufferTooSmallException(int remaining, int requested) {
+        super(format("There is not enough space on the buffer for an offset entry: %d bytes remaining, %d bytes needed",
+                remaining, requested));
+
+        this.remaining = remaining;
+        this.requested = requested;
+    }
+
+    public int getRemaining() {
+        return remaining;
+    }
+
+    public int getRequested() {
+        return requested;
+    }
+}
diff --git a/components/camel-wal/src/main/java/org/apache/camel/component/wal/exceptions/InvalidRecordException.java b/components/camel-wal/src/main/java/org/apache/camel/component/wal/exceptions/InvalidRecordException.java
new file mode 100644
index 00000000000..272bb9ce3a0
--- /dev/null
+++ b/components/camel-wal/src/main/java/org/apache/camel/component/wal/exceptions/InvalidRecordException.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.component.wal.exceptions;
+
+public class InvalidRecordException extends RuntimeException {
+
+    public InvalidRecordException(String message) {
+        super(message);
+    }
+}
diff --git a/components/camel-wal/src/test/java/org/apache/camel/component/wal/LogReaderTest.java b/components/camel-wal/src/test/java/org/apache/camel/component/wal/LogReaderTest.java
new file mode 100644
index 00000000000..656e2116fbb
--- /dev/null
+++ b/components/camel-wal/src/test/java/org/apache/camel/component/wal/LogReaderTest.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.component.wal;
+
+import java.io.File;
+import java.io.IOException;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Assumptions;
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+public class LogReaderTest extends LogTestBase {
+
+    @Test
+    public void testHeader() {
+        Assertions.assertDoesNotThrow(() -> generateDataFilePredictable());
+        File reportFile = new File(testDir, "test.data");
+        Assumptions.assumeTrue(reportFile.exists());
+
+        try (LogReader reader = new LogReader(reportFile)) {
+
+            Header fileHeader = reader.getHeader();
+            assertEquals(Header.FORMAT_NAME, fileHeader.getFormatName().trim());
+            assertEquals(Header.CURRENT_FILE_VERSION, fileHeader.getFileVersion());
+        } catch (IOException e) {
+            Assertions.fail(e.getMessage());
+        }
+    }
+}
diff --git a/components/camel-wal/src/test/java/org/apache/camel/component/wal/LogTestBase.java b/components/camel-wal/src/test/java/org/apache/camel/component/wal/LogTestBase.java
new file mode 100644
index 00000000000..7f7a7d0d94a
--- /dev/null
+++ b/components/camel-wal/src/test/java/org/apache/camel/component/wal/LogTestBase.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.component.wal;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.time.Instant;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Consumer;
+
+import org.junit.jupiter.api.io.TempDir;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+class LogTestBase {
+    protected static final long RECORD_COUNT = TimeUnit.HOURS.toSeconds(1);
+    private static final Logger LOG = LoggerFactory.getLogger(LogTestBase.class);
+
+    @TempDir
+    protected File testDir;
+
+    protected static LogEntry createNewLogEntry(List<Instant> values, int i) {
+        String keyData = "record-" + i;
+        ByteBuffer value = ByteBuffer.allocate(Long.BYTES);
+        Instant now = Instant.now();
+        value.putLong(now.toEpochMilli());
+
+        if (values != null) {
+            values.add(now);
+        }
+
+        LogEntry entry = new LogEntry(
+                LogEntry.EntryState.NEW, 0,
+                keyData.getBytes(), 0, value.array());
+        return entry;
+    }
+
+    protected List<Instant> generateDataFilePredictable(
+            Consumer<EntryInfo.CachedEntryInfo> offsetConsumer, LogWriter logWriter, long total)
+            throws IOException {
+        List<Instant> values = new ArrayList<>();
+
+        LOG.debug("Number of records to write: {}", total);
+        for (int i = 0; i < total; i++) {
+            LogEntry entry = createNewLogEntry(values, i);
+
+            final EntryInfo.CachedEntryInfo entryInfo = logWriter.append(entry);
+            if (offsetConsumer != null) {
+                offsetConsumer.accept(entryInfo);
+            }
+        }
+
+        return values;
+    }
+
+    protected List<Instant> generateDataFilePredictable(Consumer<EntryInfo.CachedEntryInfo> offsetConsumer, LogWriter logWriter)
+            throws IOException {
+        return generateDataFilePredictable(offsetConsumer, logWriter, RECORD_COUNT);
+    }
+
+    protected List<Instant> generateDataFilePredictable(Consumer<EntryInfo.CachedEntryInfo> offsetConsumer) throws IOException {
+        File reportFile = new File(testDir, "test.data");
+        final DefaultLogSupervisor scheduledFlushPolicy = new DefaultLogSupervisor(100);
+        try (LogWriter logWriter = new LogWriter(reportFile, scheduledFlushPolicy)) {
+            return generateDataFilePredictable(offsetConsumer, logWriter);
+        }
+    }
+
+    protected List<Instant> generateDataFilePredictable() throws IOException {
+        return generateDataFilePredictable(null);
+    }
+
+}
diff --git a/components/camel-wal/src/test/java/org/apache/camel/component/wal/LogWriterRollOverTest.java b/components/camel-wal/src/test/java/org/apache/camel/component/wal/LogWriterRollOverTest.java
new file mode 100644
index 00000000000..f79d6b4969c
--- /dev/null
+++ b/components/camel-wal/src/test/java/org/apache/camel/component/wal/LogWriterRollOverTest.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.component.wal;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.concurrent.TimeUnit;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class LogWriterRollOverTest extends LogTestBase {
+    private static final Logger LOG = LoggerFactory.getLogger(LogWriterRollOverTest.class);
+
+    @ParameterizedTest
+    @ValueSource(ints = { 1, 5, 10, 100, 3599, 3600 })
+    public void testReadWriteRecordsWithRollOver(int maxRecordCount) throws IOException {
+        readWriteTest(maxRecordCount, maxRecordCount);
+    }
+
+    @Test
+    public void testReadWriteRecordsWithRollOverDoesNotExceedSize() throws IOException {
+        int maxRecordCount = (int) RECORD_COUNT + 1;
+        readWriteTest((int) RECORD_COUNT, maxRecordCount);
+    }
+
+    private void readWriteTest(int expectedRecordCount, int maxRecordCount) throws IOException {
+        File reportFile = new File(testDir, "test.data");
+
+        try (LogWriter logWriter = new LogWriter(reportFile, new DefaultLogSupervisor(100), maxRecordCount)) {
+            Assertions.assertDoesNotThrow(() -> generateDataFilePredictable(null, logWriter));
+        }
+
+        long total = TimeUnit.HOURS.toSeconds(1);
+        try (LogReader reader = new LogReader(reportFile, (int) total * 100)) {
+
+            Header fileHeader = reader.getHeader();
+            assertEquals(Header.FORMAT_NAME, fileHeader.getFormatName().trim());
+            assertEquals(Header.CURRENT_FILE_VERSION, fileHeader.getFileVersion());
+
+            int count = 0;
+            PersistedLogEntry entry = reader.readEntry();
+            while (entry != null) {
+                assertEquals(LogEntry.EntryState.NEW, entry.getEntryState());
+                assertEquals(0, entry.getKeyMetadata());
+                assertEquals(0, entry.getValueMetadata());
+
+                String key = new String(entry.getKey());
+                LOG.debug("Received record: {}", key);
+                Assertions.assertTrue(key.startsWith("record-"));
+
+                ByteBuffer buffer = ByteBuffer.wrap(entry.getValue());
+
+                Assertions.assertTrue(buffer.getLong() > 0);
+
+                count++;
+
+                entry = reader.readEntry();
+            }
+
+            Assertions.assertEquals(expectedRecordCount, count, "The number of records don't match");
+        }
+    }
+}
diff --git a/components/camel-wal/src/test/java/org/apache/camel/component/wal/LogWriterRollOverUpdateAfterDiscardTest.java b/components/camel-wal/src/test/java/org/apache/camel/component/wal/LogWriterRollOverUpdateAfterDiscardTest.java
new file mode 100644
index 00000000000..305e7f054b8
--- /dev/null
+++ b/components/camel-wal/src/test/java/org/apache/camel/component/wal/LogWriterRollOverUpdateAfterDiscardTest.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.component.wal;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executors;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class LogWriterRollOverUpdateAfterDiscardTest extends LogTestBase {
+    private static final Logger LOG = LoggerFactory.getLogger(LogWriterRollOverUpdateAfterDiscardTest.class);
+
+    LogWriter logWriter;
+    File reportFile;
+    final List<EntryInfo.CachedEntryInfo> entryInfos = new ArrayList<>();
+
+    @BeforeEach
+    void setup() throws IOException, ExecutionException, InterruptedException {
+        reportFile = new File(testDir, "test.data");
+
+        logWriter = new LogWriter(reportFile, new DefaultLogSupervisor(100), 100);
+
+        generateDataFilePredictable(entryInfos::add, logWriter);
+
+        logWriter.flush();
+
+        Executors.newSingleThreadExecutor().submit(this::markRecordsAsCommitted).get();
+    }
+
+    private void markRecordsAsCommitted() {
+        for (EntryInfo.CachedEntryInfo entryInfo : entryInfos) {
+            try {
+                logWriter.updateState(entryInfo, LogEntry.EntryState.PROCESSED);
+            } catch (IOException e) {
+                throw new RuntimeException(e);
+            }
+        }
+    }
+
+    @AfterEach
+    void tearDown() throws IOException {
+        logWriter.flush();
+        logWriter.close();
+    }
+
+    @Test
+    void testReadWriteUpdateRecordsWithRollOver() throws IOException {
+        try (LogReader reader = new LogReader(reportFile, (int) RECORD_COUNT * 100)) {
+
+            Header fileHeader = reader.getHeader();
+            assertEquals(Header.FORMAT_NAME, fileHeader.getFormatName().trim());
+            assertEquals(Header.CURRENT_FILE_VERSION, fileHeader.getFileVersion());
+
+            int count = 0;
+            PersistedLogEntry entry = reader.readEntry();
+            while (entry != null) {
+                LOG.debug("Read state: {}", entry.getEntryState());
+                assertEquals(LogEntry.EntryState.PROCESSED, entry.getEntryState());
+                assertEquals(0, entry.getKeyMetadata());
+                assertEquals(0, entry.getValueMetadata());
+
+                String key = new String(entry.getKey());
+                LOG.debug("Read record: {}", key);
+                Assertions.assertTrue(key.startsWith("record-"));
+
+                ByteBuffer buffer = ByteBuffer.wrap(entry.getValue());
+
+                Assertions.assertTrue(buffer.getLong() > 0);
+
+                count++;
+
+                entry = reader.readEntry();
+            }
+
+            Assertions.assertEquals(100, count, "The number of records don't match");
+        }
+    }
+
+}
diff --git a/components/camel-wal/src/test/java/org/apache/camel/component/wal/LogWriterRollOverUpdateAsyncBase.java b/components/camel-wal/src/test/java/org/apache/camel/component/wal/LogWriterRollOverUpdateAsyncBase.java
new file mode 100644
index 00000000000..c0f292e1e2d
--- /dev/null
+++ b/components/camel-wal/src/test/java/org/apache/camel/component/wal/LogWriterRollOverUpdateAsyncBase.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.component.wal;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+abstract class LogWriterRollOverUpdateAsyncBase extends LogTestBase {
+    private static final Logger LOG = LoggerFactory.getLogger(LogWriterRollOverUpdateAsyncBase.class);
+    protected BlockingQueue<EntryInfo.CachedEntryInfo> entryInfos;
+    protected CountDownLatch latch = new CountDownLatch(1);
+    LogWriter logWriter;
+    File reportFile;
+
+    @BeforeEach
+    void setup() throws IOException {
+        reportFile = new File(testDir, "test.data");
+
+        logWriter = new LogWriter(reportFile, new DefaultLogSupervisor(100), 100);
+
+    }
+
+    private void markRecordsAsCommitted() {
+        for (int i = 0; i < RECORD_COUNT; i++) {
+
+            try {
+                LOG.trace("Updating ...");
+                final EntryInfo.CachedEntryInfo entryInfo = entryInfos.take();
+
+                logWriter.updateState(entryInfo, LogEntry.EntryState.PROCESSED);
+            } catch (IOException | InterruptedException e) {
+                LOG.error("Failed to update state: {}", e.getMessage(), e);
+                throw new RuntimeException(e);
+            }
+        }
+    }
+
+    abstract void asyncGenerate();
+
+    @AfterEach
+    void tearDown() throws IOException {
+        logWriter.flush();
+        logWriter.close();
+        entryInfos.clear();
+    }
+
+    protected void runTest(int queueCapacity) throws IOException, InterruptedException {
+        entryInfos = new ArrayBlockingQueue<>(queueCapacity);
+
+        final ExecutorService executorService = Executors.newFixedThreadPool(2);
+
+        final Future<?> generateTask = executorService.submit(this::asyncGenerate);
+
+        final Future<?> updateTask = executorService.submit(this::markRecordsAsCommitted);
+
+        Assertions.assertTrue(latch.await(1, TimeUnit.MINUTES), "Failed to generate records within 1 minute");
+
+        try (LogReader reader = new LogReader(reportFile, (int) RECORD_COUNT * 100)) {
+
+            Header fileHeader = reader.getHeader();
+            assertEquals(Header.FORMAT_NAME, fileHeader.getFormatName().trim());
+            assertEquals(Header.CURRENT_FILE_VERSION, fileHeader.getFileVersion());
+
+            int count = 0;
+            PersistedLogEntry entry = reader.readEntry();
+            while (entry != null) {
+                LOG.debug("Read state: {}", entry.getEntryState());
+                assertEquals(LogEntry.EntryState.PROCESSED, entry.getEntryState());
+                assertEquals(0, entry.getKeyMetadata());
+                assertEquals(0, entry.getValueMetadata());
+
+                String key = new String(entry.getKey());
+                LOG.debug("Read record: {}", key);
+                Assertions.assertTrue(key.startsWith("record-"));
+
+                ByteBuffer buffer = ByteBuffer.wrap(entry.getValue());
+
+                Assertions.assertTrue(buffer.getLong() > 0);
+
+                count++;
+
+                entry = reader.readEntry();
+            }
+
+            Assertions.assertEquals(100, count, "The number of records don't match");
+        }
+    }
+
+}
diff --git a/components/camel-wal/src/test/java/org/apache/camel/component/wal/LogWriterRollOverUpdateAsyncTest.java b/components/camel-wal/src/test/java/org/apache/camel/component/wal/LogWriterRollOverUpdateAsyncTest.java
new file mode 100644
index 00000000000..60ebd791940
--- /dev/null
+++ b/components/camel-wal/src/test/java/org/apache/camel/component/wal/LogWriterRollOverUpdateAsyncTest.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.component.wal;
+
+import java.io.IOException;
+
+import org.junit.jupiter.api.DisplayName;
+import org.junit.jupiter.api.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class LogWriterRollOverUpdateAsyncTest extends LogWriterRollOverUpdateAsyncBase {
+    private static final Logger LOG = LoggerFactory.getLogger(LogWriterRollOverUpdateAsyncTest.class);
+
+    protected void asyncGenerate() {
+        try {
+            LOG.trace("Generating ...");
+            generateDataFilePredictable(e -> {
+                LOG.debug("Putting into the queue: {}", e);
+                entryInfos.add(e);
+            }, logWriter);
+            LOG.trace("Done generating records");
+        } catch (IOException e) {
+            LOG.error("Failed to generate records: {}", e.getMessage(), e);
+            throw new RuntimeException(e);
+        } finally {
+            latch.countDown();
+        }
+    }
+
+    /*
+     * This
+     */
+    @DisplayName("Test the async update process with no (significant) contention")
+    @Test
+    void testReadWriteUpdateRecordsWithRollOver() throws IOException, InterruptedException {
+        runTest((int) RECORD_COUNT + 1);
+    }
+
+}
diff --git a/components/camel-wal/src/test/java/org/apache/camel/component/wal/LogWriterRollOverUpdateAsyncWithContentionTest.java b/components/camel-wal/src/test/java/org/apache/camel/component/wal/LogWriterRollOverUpdateAsyncWithContentionTest.java
new file mode 100644
index 00000000000..0e19adee043
--- /dev/null
+++ b/components/camel-wal/src/test/java/org/apache/camel/component/wal/LogWriterRollOverUpdateAsyncWithContentionTest.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.component.wal;
+
+import java.io.IOException;
+
+import org.junit.jupiter.api.DisplayName;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class LogWriterRollOverUpdateAsyncWithContentionTest extends LogWriterRollOverUpdateAsyncBase {
+    private static final Logger LOG = LoggerFactory.getLogger(LogWriterRollOverUpdateAsyncWithContentionTest.class);
+
+    protected void asyncGenerate() {
+        try {
+            LOG.trace("Generating ...");
+            generateDataFilePredictable(e -> {
+                try {
+                    LOG.debug("Putting into the queue: {}", e);
+                    entryInfos.put(e);
+                } catch (InterruptedException ex) {
+                    LOG.error("Interrupted while putting record into the queue: {}", ex.getMessage(), ex);
+                    throw new RuntimeException(ex);
+                }
+            }, logWriter);
+            LOG.trace("Done generating records");
+        } catch (IOException e) {
+            LOG.error("Failed to generate records: {}", e.getMessage(), e);
+            throw new RuntimeException(e);
+        } finally {
+            latch.countDown();
+        }
+    }
+
+    @DisplayName("Test the async update process with different levels of contention")
+    @ParameterizedTest
+    @ValueSource(ints = { 1, 2, 5, 10, 200, 500, 3000, 4000 })
+    void testReadWriteUpdateRecordsWithRollOver(int queueCapacity) throws IOException, InterruptedException {
+        runTest(queueCapacity);
+    }
+
+}
diff --git a/components/camel-wal/src/test/java/org/apache/camel/component/wal/LogWriterSyncUpdateTest.java b/components/camel-wal/src/test/java/org/apache/camel/component/wal/LogWriterSyncUpdateTest.java
new file mode 100644
index 00000000000..97bcb1036ad
--- /dev/null
+++ b/components/camel-wal/src/test/java/org/apache/camel/component/wal/LogWriterSyncUpdateTest.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.component.wal;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.time.Instant;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Assumptions;
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+public class LogWriterSyncUpdateTest extends LogTestBase {
+
+    @Test
+    public void testReadWriteUpdateRecords() throws IOException {
+        final List<EntryInfo.CachedEntryInfo> entryInfos = new ArrayList<>();
+        final DefaultLogSupervisor scheduledFlushPolicy = new DefaultLogSupervisor(100);
+        File reportFile = new File(testDir, "test.data");
+        final List<Instant> values;
+
+        try (LogWriter logWriter = new LogWriter(reportFile, scheduledFlushPolicy)) {
+            values = Assertions.assertDoesNotThrow(() -> generateDataFilePredictable(entryInfos::add, logWriter));
+
+            for (EntryInfo.CachedEntryInfo entryInfo : entryInfos) {
+                logWriter.updateState(entryInfo, LogEntry.EntryState.PROCESSED);
+            }
+        }
+        Assumptions.assumeTrue(reportFile.exists());
+
+        long total = TimeUnit.HOURS.toSeconds(1);
+        try (LogReader reader = new LogReader(reportFile, (int) total * 100)) {
+
+            Header fileHeader = reader.getHeader();
+            assertEquals(Header.FORMAT_NAME, fileHeader.getFormatName().trim());
+            assertEquals(Header.CURRENT_FILE_VERSION, fileHeader.getFileVersion());
+
+            int count = 0;
+            PersistedLogEntry entry = reader.readEntry();
+            while (entry != null) {
+                if (entry != null) {
+                    assertEquals(LogEntry.EntryState.PROCESSED, entry.getEntryState());
+                    assertEquals(0, entry.getKeyMetadata());
+                    assertEquals(0, entry.getValueMetadata());
+
+                    String key = new String(entry.getKey());
+                    Assertions.assertEquals("record-" + count, key);
+
+                    ByteBuffer buffer = ByteBuffer.wrap(entry.getValue());
+                    Assertions.assertEquals(values.get(count).toEpochMilli(), buffer.getLong());
+
+                    count++;
+                }
+
+                entry = reader.readEntry();
+            }
+
+            Assertions.assertEquals(TimeUnit.HOURS.toSeconds(1), count, "The number of records don't match");
+        }
+    }
+}
diff --git a/components/camel-wal/src/test/java/org/apache/camel/component/wal/LogWriterTest.java b/components/camel-wal/src/test/java/org/apache/camel/component/wal/LogWriterTest.java
new file mode 100644
index 00000000000..9c302b70064
--- /dev/null
+++ b/components/camel-wal/src/test/java/org/apache/camel/component/wal/LogWriterTest.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.component.wal;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.time.Instant;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Assumptions;
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+public class LogWriterTest extends LogTestBase {
+
+    @Test
+    public void testReadWriteRecords() throws IOException {
+        final List<Instant> values = Assertions.assertDoesNotThrow(() -> generateDataFilePredictable());
+        File reportFile = new File(testDir, "test.data");
+        Assumptions.assumeTrue(reportFile.exists());
+
+        long total = TimeUnit.HOURS.toSeconds(1);
+        try (LogReader reader = new LogReader(reportFile, (int) total * 100)) {
+
+            Header fileHeader = reader.getHeader();
+            assertEquals(Header.FORMAT_NAME, fileHeader.getFormatName().trim());
+            assertEquals(Header.CURRENT_FILE_VERSION, fileHeader.getFileVersion());
+
+            int count = 0;
+            PersistedLogEntry entry = reader.readEntry();
+            while (entry != null) {
+                if (entry != null) {
+                    assertEquals(LogEntry.EntryState.NEW, entry.getEntryState());
+                    assertEquals(0, entry.getKeyMetadata());
+                    assertEquals(0, entry.getValueMetadata());
+
+                    String key = new String(entry.getKey());
+                    Assertions.assertEquals("record-" + count, key);
+
+                    ByteBuffer buffer = ByteBuffer.wrap(entry.getValue());
+                    Assertions.assertEquals(values.get(count).toEpochMilli(), buffer.getLong());
+
+                    count++;
+                }
+
+                entry = reader.readEntry();
+            }
+
+            Assertions.assertEquals(TimeUnit.HOURS.toSeconds(1), count, "The number of records don't match");
+        }
+    }
+
+}
diff --git a/components/camel-wal/src/test/java/org/apache/camel/component/wal/TransactionLogTest.java b/components/camel-wal/src/test/java/org/apache/camel/component/wal/TransactionLogTest.java
new file mode 100644
index 00000000000..b9d3a1c8623
--- /dev/null
+++ b/components/camel-wal/src/test/java/org/apache/camel/component/wal/TransactionLogTest.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.component.wal;
+
+import org.junit.jupiter.api.DisplayName;
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class TransactionLogTest {
+    @DisplayName("Tests that can update records on the same layer")
+    @Test
+    void testCanUpdateSameLayer() {
+        assertTrue(TransactionLog.canUpdate(0, 10, 0, 1));
+        assertTrue(TransactionLog.canUpdate(0, 10, 0, 10));
+        assertFalse(TransactionLog.canUpdate(0, 10, 0, 11));
+    }
+
+    @DisplayName("Tests that prevent updating a record that has been rolled-over")
+    @Test
+    void testCannotUpdateRolledOverRecord() {
+        assertFalse(TransactionLog.canUpdate(1, 3, 0, 1));
+        assertFalse(TransactionLog.canUpdate(1, 3, 0, 3));
+    }
+
+    @DisplayName("Tests that can update records after a roll-over has started")
+    @Test
+    void testCanUpdateDifferentLayerWithRollOver() {
+        assertTrue(TransactionLog.canUpdate(1, 10, 0, 11));
+        assertTrue(TransactionLog.canUpdate(1, 10, 1, 10));
+        assertFalse(TransactionLog.canUpdate(1, 10, 0, 10));
+    }
+}
diff --git a/components/pom.xml b/components/pom.xml
index a51d8904b9a..f4722eff281 100644
--- a/components/pom.xml
+++ b/components/pom.xml
@@ -346,7 +346,8 @@
         <module>camel-rest-swagger</module>
         <!-- build jooq last as it cause component builds to be slower afterwards -->
         <module>camel-jooq</module>
-  </modules>
+        <module>camel-wal</module>
+    </modules>
 
     <properties>
         <camel.osgi.manifest>${project.build.outputDirectory}/META-INF/MANIFEST.MF</camel.osgi.manifest>
diff --git a/core/camel-support/src/test/resources/log4j2.properties b/core/camel-support/src/test/resources/log4j2.properties
index 2f560dd6305..76fc1706599 100644
--- a/core/camel-support/src/test/resources/log4j2.properties
+++ b/core/camel-support/src/test/resources/log4j2.properties
@@ -30,3 +30,9 @@ rootLogger.level = INFO
 
 rootLogger.appenderRef.file.ref = file
 #rootLogger.appenderRef.console.ref = console
+
+logger.camelSupport.name=org.apache.camel.support
+logger.camelSupport.level=INFO
+
+logger.camelSupportResume.name=org.apache.camel.support
+logger.camelSupportResume.level=DEBUG
diff --git a/docs/components/modules/others/examples/json/wal.json b/docs/components/modules/others/examples/json/wal.json
new file mode 120000
index 00000000000..bf9e11bebc0
--- /dev/null
+++ b/docs/components/modules/others/examples/json/wal.json
@@ -0,0 +1 @@
+../../../../../../components/camel-wal/src/generated/resources/wal.json
\ No newline at end of file
diff --git a/docs/components/modules/others/nav.adoc b/docs/components/modules/others/nav.adoc
index 5b233eb1585..7563eaa6f6a 100644
--- a/docs/components/modules/others/nav.adoc
+++ b/docs/components/modules/others/nav.adoc
@@ -63,4 +63,5 @@
 ** xref:threadpoolfactory-vertx.adoc[ThreadPoolFactory Vert.x]
 ** xref:tracing.adoc[Tracing]
 ** xref:undertow-spring-security.adoc[Undertow Spring Security]
+** xref:wal-docs.adoc[Write Ahead Log Strategy]
 ** xref:zipkin.adoc[Zipkin]
diff --git a/docs/components/modules/others/pages/wal-docs.adoc b/docs/components/modules/others/pages/wal-docs.adoc
new file mode 120000
index 00000000000..1a98f1860eb
--- /dev/null
+++ b/docs/components/modules/others/pages/wal-docs.adoc
@@ -0,0 +1 @@
+../../../../../components/camel-wal/src/main/docs/wal-docs.adoc
\ No newline at end of file
diff --git a/parent/pom.xml b/parent/pom.xml
index 2b9f28c6ca3..f4cbc13cc62 100644
--- a/parent/pom.xml
+++ b/parent/pom.xml
@@ -2699,6 +2699,11 @@
 				<artifactId>camel-vm</artifactId>
 				<version>${project.version}</version>
 			</dependency>
+			<dependency>
+				<groupId>org.apache.camel</groupId>
+				<artifactId>camel-wal</artifactId>
+				<version>${project.version}</version>
+			</dependency>
 			<dependency>
 				<groupId>org.apache.camel</groupId>
 				<artifactId>camel-weather</artifactId>