You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2021/03/19 17:11:09 UTC

[kafka] branch trunk updated: KAFKA-12253: Add tests that cover all of the cases for ReplicatedLog's validateOffsetAndEpoch (#10276)

This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new a19806f  KAFKA-12253: Add tests that cover all of the cases for ReplicatedLog's validateOffsetAndEpoch (#10276)
a19806f is described below

commit a19806f2628e79baa2dbf84ea307787af7ca186f
Author: Rohit Deshpande <ro...@gmail.com>
AuthorDate: Fri Mar 19 10:09:38 2021 -0700

    KAFKA-12253: Add tests that cover all of the cases for ReplicatedLog's validateOffsetAndEpoch (#10276)
    
    Improves test coverage of `validateOffsetAndEpoch`.
    
    Reviewers: José Armando García Sancio <js...@users.noreply.github.com>, Jason Gustafson <ja...@confluent.io>
---
 .../scala/kafka/raft/KafkaMetadataLogTest.scala    | 190 +++++++++++++++++++--
 .../org/apache/kafka/raft/KafkaRaftClient.java     |   4 +-
 .../org/apache/kafka/raft/ValidOffsetAndEpoch.java |  34 +++-
 .../java/org/apache/kafka/raft/MockLogTest.java    | 137 +++++++++++++++
 4 files changed, 341 insertions(+), 24 deletions(-)

diff --git a/core/src/test/scala/kafka/raft/KafkaMetadataLogTest.scala b/core/src/test/scala/kafka/raft/KafkaMetadataLogTest.scala
index bc7d110..ffbba2a 100644
--- a/core/src/test/scala/kafka/raft/KafkaMetadataLogTest.scala
+++ b/core/src/test/scala/kafka/raft/KafkaMetadataLogTest.scala
@@ -30,7 +30,7 @@ import org.apache.kafka.common.protocol.{ObjectSerializationCache, Writable}
 import org.apache.kafka.common.record.{CompressionType, MemoryRecords, SimpleRecord}
 import org.apache.kafka.common.utils.Utils
 import org.apache.kafka.raft.internals.BatchBuilder
-import org.apache.kafka.raft.{KafkaRaftClient, LogAppendInfo, LogOffsetMetadata, OffsetAndEpoch, RecordSerde, ReplicatedLog}
+import org.apache.kafka.raft.{KafkaRaftClient, LogAppendInfo, LogOffsetMetadata, OffsetAndEpoch, RecordSerde, ReplicatedLog, ValidOffsetAndEpoch}
 import org.apache.kafka.snapshot.{SnapshotPath, Snapshots}
 import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, assertNotEquals, assertThrows, assertTrue}
 import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
@@ -132,7 +132,7 @@ final class KafkaMetadataLogTest {
     assertEquals(offset, log.highWatermark.offset)
 
     val newRecords = 10
-    append(log, newRecords, epoch + 1, log.endOffset.offset)
+    append(log, newRecords, epoch + 1)
     // Start offset should not change since a new snapshot was not generated
     assertFalse(log.deleteBeforeSnapshot(new OffsetAndEpoch(offset + newRecords, epoch)))
     assertEquals(offset, log.startOffset)
@@ -154,7 +154,7 @@ final class KafkaMetadataLogTest {
       snapshot.freeze()
     }
 
-    append(log, offset, epoch, log.endOffset.offset)
+    append(log, offset, epoch)
     val newSnapshotId = new OffsetAndEpoch(offset * 2, epoch)
     TestUtils.resource(log.createSnapshot(newSnapshotId)) { snapshot =>
       snapshot.freeze()
@@ -232,7 +232,7 @@ final class KafkaMetadataLogTest {
 
     val greaterEpochSnapshotId = new OffsetAndEpoch(3 * numberOfRecords, epoch + 1)
 
-    append(log, numberOfRecords, epoch, log.endOffset.offset)
+    append(log, numberOfRecords, epoch)
 
     TestUtils.resource(log.createSnapshot(greaterEpochSnapshotId)) { snapshot =>
       snapshot.freeze()
@@ -258,20 +258,20 @@ final class KafkaMetadataLogTest {
       snapshot.freeze()
     }
 
-    append(log, 1, epoch, log.endOffset.offset)
+    append(log, 1, epoch)
     val oldSnapshotId2 = new OffsetAndEpoch(2, epoch)
     TestUtils.resource(log.createSnapshot(oldSnapshotId2)) { snapshot =>
       snapshot.freeze()
     }
 
-    append(log, numberOfRecords - 2, epoch, log.endOffset.offset)
+    append(log, numberOfRecords - 2, epoch)
     val oldSnapshotId3 = new OffsetAndEpoch(numberOfRecords, epoch)
     TestUtils.resource(log.createSnapshot(oldSnapshotId3)) { snapshot =>
       snapshot.freeze()
     }
 
     val greaterSnapshotId = new OffsetAndEpoch(3 * numberOfRecords, epoch)
-    append(log, numberOfRecords, epoch, log.endOffset.offset)
+    append(log, numberOfRecords, epoch)
     TestUtils.resource(log.createSnapshot(greaterSnapshotId)) { snapshot =>
       snapshot.freeze()
     }
@@ -307,7 +307,7 @@ final class KafkaMetadataLogTest {
 
     assertFalse(log.truncateToLatestSnapshot())
 
-    append(log, numberOfRecords, epoch, log.endOffset.offset)
+    append(log, numberOfRecords, epoch)
 
     val olderOffsetSnapshotId = new OffsetAndEpoch(numberOfRecords, epoch)
     TestUtils.resource(log.createSnapshot(olderOffsetSnapshotId)) { snapshot =>
@@ -366,20 +366,20 @@ final class KafkaMetadataLogTest {
       snapshot.freeze()
     }
 
-    append(log, 1, epoch, log.endOffset.offset)
+    append(log, 1, epoch)
     val oldSnapshotId2 = new OffsetAndEpoch(2, epoch)
     TestUtils.resource(log.createSnapshot(oldSnapshotId2)) { snapshot =>
       snapshot.freeze()
     }
 
-    append(log, numberOfRecords - 2, epoch, log.endOffset.offset)
+    append(log, numberOfRecords - 2, epoch)
     val oldSnapshotId3 = new OffsetAndEpoch(numberOfRecords, epoch)
     TestUtils.resource(log.createSnapshot(oldSnapshotId3)) { snapshot =>
       snapshot.freeze()
     }
 
     val greaterSnapshotId = new OffsetAndEpoch(3 * numberOfRecords, epoch)
-    append(log, numberOfRecords, epoch, log.endOffset.offset)
+    append(log, numberOfRecords, epoch)
     TestUtils.resource(log.createSnapshot(greaterSnapshotId)) { snapshot =>
       snapshot.freeze()
     }
@@ -485,6 +485,170 @@ final class KafkaMetadataLogTest {
     batchBuilder.build()
   }
 
+  @Test
+  def testValidateEpochGreaterThanLastKnownEpoch(): Unit = {
+    val log = buildMetadataLog(tempDir, mockTime)
+
+    val numberOfRecords = 1
+    val epoch = 1
+
+    append(log, numberOfRecords, epoch)
+
+    val resultOffsetAndEpoch = log.validateOffsetAndEpoch(numberOfRecords, epoch + 1)
+    assertEquals(ValidOffsetAndEpoch.Kind.DIVERGING, resultOffsetAndEpoch.kind)
+    assertEquals(new OffsetAndEpoch(log.endOffset.offset, epoch), resultOffsetAndEpoch.offsetAndEpoch())
+  }
+
+  @Test
+  def testValidateEpochLessThanOldestSnapshotEpoch(): Unit = {
+    val log = buildMetadataLog(tempDir, mockTime)
+
+    val numberOfRecords = 10
+    val epoch = 1
+
+    append(log, numberOfRecords, epoch)
+    log.updateHighWatermark(new LogOffsetMetadata(numberOfRecords))
+
+    val snapshotId = new OffsetAndEpoch(numberOfRecords, epoch)
+    TestUtils.resource(log.createSnapshot(snapshotId)) { snapshot =>
+      snapshot.freeze()
+    }
+    assertTrue(log.deleteBeforeSnapshot(snapshotId))
+
+    val resultOffsetAndEpoch = log.validateOffsetAndEpoch(numberOfRecords, epoch - 1)
+    assertEquals(ValidOffsetAndEpoch.Kind.SNAPSHOT, resultOffsetAndEpoch.kind)
+    assertEquals(snapshotId, resultOffsetAndEpoch.offsetAndEpoch())
+  }
+
+  @Test
+  def testValidateOffsetLessThanOldestSnapshotOffset(): Unit = {
+    val log = buildMetadataLog(tempDir, mockTime)
+
+    val offset = 2
+    val epoch = 1
+
+    append(log, offset, epoch)
+    log.updateHighWatermark(new LogOffsetMetadata(offset))
+
+    val snapshotId = new OffsetAndEpoch(offset, epoch)
+    TestUtils.resource(log.createSnapshot(snapshotId)) { snapshot =>
+      snapshot.freeze()
+    }
+    assertTrue(log.deleteBeforeSnapshot(snapshotId))
+
+    val resultOffsetAndEpoch = log.validateOffsetAndEpoch(offset - 1, epoch)
+    assertEquals(ValidOffsetAndEpoch.Kind.SNAPSHOT, resultOffsetAndEpoch.kind)
+    assertEquals(snapshotId, resultOffsetAndEpoch.offsetAndEpoch())
+  }
+
+  @Test
+  def testValidateOffsetEqualToOldestSnapshotOffset(): Unit = {
+    val log = buildMetadataLog(tempDir, mockTime)
+
+    val offset = 2
+    val epoch = 1
+
+    append(log, offset, epoch)
+    log.updateHighWatermark(new LogOffsetMetadata(offset))
+
+    val snapshotId = new OffsetAndEpoch(offset, epoch)
+    TestUtils.resource(log.createSnapshot(snapshotId)) { snapshot =>
+      snapshot.freeze()
+    }
+    assertTrue(log.deleteBeforeSnapshot(snapshotId))
+
+    val resultOffsetAndEpoch = log.validateOffsetAndEpoch(offset, epoch)
+    assertEquals(ValidOffsetAndEpoch.Kind.VALID, resultOffsetAndEpoch.kind)
+    assertEquals(snapshotId, resultOffsetAndEpoch.offsetAndEpoch())
+  }
+
+  @Test
+  def testValidateUnknownEpochLessThanLastKnownGreaterThanOldestSnapshot(): Unit = {
+    val offset = 10
+    val numOfRecords = 5
+
+    val log = buildMetadataLog(tempDir, mockTime)
+    log.updateHighWatermark(new LogOffsetMetadata(offset))
+    val snapshotId = new OffsetAndEpoch(offset, 1)
+    TestUtils.resource(log.createSnapshot(snapshotId)) { snapshot =>
+      snapshot.freeze()
+    }
+    log.truncateToLatestSnapshot()
+
+
+    append(log, numOfRecords, epoch = 1)
+    append(log, numOfRecords, epoch = 2)
+    append(log, numOfRecords, epoch = 4)
+
+    // offset is not equal to oldest snapshot's offset
+    val resultOffsetAndEpoch = log.validateOffsetAndEpoch(100, 3)
+    assertEquals(ValidOffsetAndEpoch.Kind.DIVERGING, resultOffsetAndEpoch.kind)
+    assertEquals(new OffsetAndEpoch(20, 2), resultOffsetAndEpoch.offsetAndEpoch())
+  }
+
+  @Test
+  def testValidateEpochLessThanFirstEpochInLog(): Unit = {
+    val offset = 10
+    val numOfRecords = 5
+
+    val log = buildMetadataLog(tempDir, mockTime)
+    log.updateHighWatermark(new LogOffsetMetadata(offset))
+    val snapshotId = new OffsetAndEpoch(offset, 1)
+    TestUtils.resource(log.createSnapshot(snapshotId)) { snapshot =>
+      snapshot.freeze()
+    }
+    log.truncateToLatestSnapshot()
+
+    append(log, numOfRecords, epoch = 3)
+
+    // offset is not equal to oldest snapshot's offset
+    val resultOffsetAndEpoch = log.validateOffsetAndEpoch(100, 2)
+    assertEquals(ValidOffsetAndEpoch.Kind.DIVERGING, resultOffsetAndEpoch.kind)
+    assertEquals(snapshotId, resultOffsetAndEpoch.offsetAndEpoch())
+  }
+
+  @Test
+  def testValidateOffsetGreatThanEndOffset(): Unit = {
+    val log = buildMetadataLog(tempDir, mockTime)
+
+    val numberOfRecords = 1
+    val epoch = 1
+
+    append(log, numberOfRecords, epoch)
+
+    val resultOffsetAndEpoch = log.validateOffsetAndEpoch(numberOfRecords + 1, epoch)
+    assertEquals(ValidOffsetAndEpoch.Kind.DIVERGING, resultOffsetAndEpoch.kind)
+    assertEquals(new OffsetAndEpoch(log.endOffset.offset, epoch), resultOffsetAndEpoch.offsetAndEpoch())
+  }
+
+  @Test
+  def testValidateOffsetLessThanLEO(): Unit = {
+    val log = buildMetadataLog(tempDir, mockTime)
+
+    val numberOfRecords = 10
+    val epoch = 1
+
+    append(log, numberOfRecords, epoch)
+    append(log, numberOfRecords, epoch + 1)
+
+    val resultOffsetAndEpoch = log.validateOffsetAndEpoch(11, epoch)
+    assertEquals(ValidOffsetAndEpoch.Kind.DIVERGING, resultOffsetAndEpoch.kind)
+    assertEquals(new OffsetAndEpoch(10, epoch), resultOffsetAndEpoch.offsetAndEpoch())
+  }
+
+  @Test
+  def testValidateValidEpochAndOffset(): Unit = {
+    val log = buildMetadataLog(tempDir, mockTime)
+
+    val numberOfRecords = 5
+    val epoch = 1
+
+    append(log, numberOfRecords, epoch)
+
+    val resultOffsetAndEpoch = log.validateOffsetAndEpoch(numberOfRecords - 1, epoch)
+    assertEquals(ValidOffsetAndEpoch.Kind.VALID, resultOffsetAndEpoch.kind)
+    assertEquals(new OffsetAndEpoch(numberOfRecords - 1, epoch), resultOffsetAndEpoch.offsetAndEpoch())
+  }
 }
 
 object KafkaMetadataLogTest {
@@ -536,10 +700,10 @@ object KafkaMetadataLogTest {
     log
   }
 
-  def append(log: ReplicatedLog, numberOfRecords: Int, epoch: Int, initialOffset: Long = 0L): LogAppendInfo = {
+  def append(log: ReplicatedLog, numberOfRecords: Int, epoch: Int): LogAppendInfo = {
     log.appendAsLeader(
       MemoryRecords.withRecords(
-        initialOffset,
+        log.endOffset().offset,
         CompressionType.NONE,
         epoch,
         (0 until numberOfRecords).map(number => new SimpleRecord(number.toString.getBytes)): _*
diff --git a/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java b/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
index 7a938c4..e02e20b2 100644
--- a/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
+++ b/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
@@ -911,7 +911,7 @@ public class KafkaRaftClient<T> implements RaftClient<T> {
                 .setLeaderEpoch(quorum.epoch())
                 .setLeaderId(quorum.leaderIdOrSentinel());
 
-            switch (validOffsetAndEpoch.type()) {
+            switch (validOffsetAndEpoch.kind()) {
                 case DIVERGING:
                     partitionData.divergingEpoch()
                         .setEpoch(validOffsetAndEpoch.offsetAndEpoch().epoch)
@@ -1039,7 +1039,7 @@ public class KafkaRaftClient<T> implements RaftClient<T> {
             ValidOffsetAndEpoch validOffsetAndEpoch = log.validateOffsetAndEpoch(fetchOffset, lastFetchedEpoch);
 
             final Records records;
-            if (validOffsetAndEpoch.type() == ValidOffsetAndEpoch.Type.VALID) {
+            if (validOffsetAndEpoch.kind() == ValidOffsetAndEpoch.Kind.VALID) {
                 LogFetchInfo info = log.read(fetchOffset, Isolation.UNCOMMITTED);
 
                 if (state.updateReplicaState(replicaId, currentTimeMs, info.startOffsetMetadata)) {
diff --git a/raft/src/main/java/org/apache/kafka/raft/ValidOffsetAndEpoch.java b/raft/src/main/java/org/apache/kafka/raft/ValidOffsetAndEpoch.java
index 5929e22..8f1d183 100644
--- a/raft/src/main/java/org/apache/kafka/raft/ValidOffsetAndEpoch.java
+++ b/raft/src/main/java/org/apache/kafka/raft/ValidOffsetAndEpoch.java
@@ -16,40 +16,56 @@
  */
 package org.apache.kafka.raft;
 
+import java.util.Objects;
+
 public final class ValidOffsetAndEpoch {
-    final private Type type;
+    final private Kind kind;
     final private OffsetAndEpoch offsetAndEpoch;
 
-    ValidOffsetAndEpoch(Type type, OffsetAndEpoch offsetAndEpoch) {
-        this.type = type;
+    ValidOffsetAndEpoch(Kind kind, OffsetAndEpoch offsetAndEpoch) {
+        this.kind = kind;
         this.offsetAndEpoch = offsetAndEpoch;
     }
 
-    public Type type() {
-        return type;
+    public Kind kind() {
+        return kind;
     }
 
     public OffsetAndEpoch offsetAndEpoch() {
         return offsetAndEpoch;
     }
 
-    public static enum Type {
+    public enum Kind {
         DIVERGING, SNAPSHOT, VALID
     }
 
     public static ValidOffsetAndEpoch diverging(OffsetAndEpoch offsetAndEpoch) {
-        return new ValidOffsetAndEpoch(Type.DIVERGING, offsetAndEpoch);
+        return new ValidOffsetAndEpoch(Kind.DIVERGING, offsetAndEpoch);
     }
 
     public static ValidOffsetAndEpoch snapshot(OffsetAndEpoch offsetAndEpoch) {
-        return new ValidOffsetAndEpoch(Type.SNAPSHOT, offsetAndEpoch);
+        return new ValidOffsetAndEpoch(Kind.SNAPSHOT, offsetAndEpoch);
     }
 
     public static ValidOffsetAndEpoch valid(OffsetAndEpoch offsetAndEpoch) {
-        return new ValidOffsetAndEpoch(Type.VALID, offsetAndEpoch);
+        return new ValidOffsetAndEpoch(Kind.VALID, offsetAndEpoch);
     }
 
     public static ValidOffsetAndEpoch valid() {
         return valid(new OffsetAndEpoch(-1, -1));
     }
+
+    @Override
+    public boolean equals(Object obj) {
+        if (this == obj) return true;
+        if (obj == null || getClass() != obj.getClass()) return false;
+        ValidOffsetAndEpoch that = (ValidOffsetAndEpoch) obj;
+        return kind == that.kind &&
+                offsetAndEpoch.equals(that.offsetAndEpoch);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(kind, offsetAndEpoch);
+    }
 }
diff --git a/raft/src/test/java/org/apache/kafka/raft/MockLogTest.java b/raft/src/test/java/org/apache/kafka/raft/MockLogTest.java
index cd48c61..1c7f131 100644
--- a/raft/src/test/java/org/apache/kafka/raft/MockLogTest.java
+++ b/raft/src/test/java/org/apache/kafka/raft/MockLogTest.java
@@ -657,6 +657,143 @@ public class MockLogTest {
         assertEquals(Optional.empty(), log.readSnapshot(sameEpochSnapshotId));
     }
 
+    @Test
+    public void testValidateEpochGreaterThanLastKnownEpoch() {
+        int numberOfRecords = 1;
+        int epoch = 1;
+
+        appendBatch(numberOfRecords, epoch);
+
+        ValidOffsetAndEpoch resultOffsetAndEpoch = log.validateOffsetAndEpoch(numberOfRecords, epoch + 1);
+        assertEquals(new ValidOffsetAndEpoch(ValidOffsetAndEpoch.Kind.DIVERGING, new OffsetAndEpoch(log.endOffset().offset, epoch)),
+                resultOffsetAndEpoch);
+    }
+
+    @Test
+    public void testValidateEpochLessThanOldestSnapshotEpoch() throws IOException {
+        int offset = 1;
+        int epoch = 1;
+
+        OffsetAndEpoch olderEpochSnapshotId = new OffsetAndEpoch(offset, epoch);
+        try (RawSnapshotWriter snapshot = log.createSnapshot(olderEpochSnapshotId)) {
+            snapshot.freeze();
+        }
+        log.truncateToLatestSnapshot();
+
+        ValidOffsetAndEpoch resultOffsetAndEpoch = log.validateOffsetAndEpoch(offset, epoch - 1);
+        assertEquals(new ValidOffsetAndEpoch(ValidOffsetAndEpoch.Kind.SNAPSHOT, olderEpochSnapshotId),
+                resultOffsetAndEpoch);
+    }
+
+    @Test
+    public void testValidateOffsetLessThanOldestSnapshotOffset() throws IOException {
+        int offset = 2;
+        int epoch = 1;
+
+        OffsetAndEpoch olderEpochSnapshotId = new OffsetAndEpoch(offset, epoch);
+        try (RawSnapshotWriter snapshot = log.createSnapshot(olderEpochSnapshotId)) {
+            snapshot.freeze();
+        }
+        log.truncateToLatestSnapshot();
+
+        ValidOffsetAndEpoch resultOffsetAndEpoch = log.validateOffsetAndEpoch(offset - 1, epoch);
+        assertEquals(new ValidOffsetAndEpoch(ValidOffsetAndEpoch.Kind.SNAPSHOT, olderEpochSnapshotId),
+                resultOffsetAndEpoch);
+    }
+
+    @Test
+    public void testValidateOffsetEqualToOldestSnapshotOffset() throws IOException {
+        int offset = 2;
+        int epoch = 1;
+
+        OffsetAndEpoch olderEpochSnapshotId = new OffsetAndEpoch(offset, epoch);
+        try (RawSnapshotWriter snapshot = log.createSnapshot(olderEpochSnapshotId)) {
+            snapshot.freeze();
+        }
+        log.truncateToLatestSnapshot();
+
+        ValidOffsetAndEpoch resultOffsetAndEpoch = log.validateOffsetAndEpoch(offset, epoch);
+        assertEquals(new ValidOffsetAndEpoch(ValidOffsetAndEpoch.Kind.VALID, olderEpochSnapshotId),
+                resultOffsetAndEpoch);
+    }
+
+    @Test
+    public void testValidateUnknownEpochLessThanLastKnownGreaterThanOldestSnapshot() throws IOException {
+        int numberOfRecords = 5;
+        int offset = 10;
+
+        OffsetAndEpoch olderEpochSnapshotId = new OffsetAndEpoch(offset, 1);
+        try (RawSnapshotWriter snapshot = log.createSnapshot(olderEpochSnapshotId)) {
+            snapshot.freeze();
+        }
+        log.truncateToLatestSnapshot();
+
+        appendBatch(numberOfRecords, 1);
+        appendBatch(numberOfRecords, 2);
+        appendBatch(numberOfRecords, 4);
+
+        // offset is not equal to oldest snapshot's offset
+        ValidOffsetAndEpoch resultOffsetAndEpoch = log.validateOffsetAndEpoch(100, 3);
+        assertEquals(new ValidOffsetAndEpoch(ValidOffsetAndEpoch.Kind.DIVERGING, new OffsetAndEpoch(20, 2)),
+                resultOffsetAndEpoch);
+    }
+
+    @Test
+    public void testValidateEpochLessThanFirstEpochInLog() throws IOException {
+        int numberOfRecords = 5;
+        int offset = 10;
+
+        OffsetAndEpoch olderEpochSnapshotId = new OffsetAndEpoch(offset, 1);
+        try (RawSnapshotWriter snapshot = log.createSnapshot(olderEpochSnapshotId)) {
+            snapshot.freeze();
+        }
+        log.truncateToLatestSnapshot();
+
+        appendBatch(numberOfRecords, 3);
+
+        // offset is not equal to oldest snapshot's offset
+        ValidOffsetAndEpoch resultOffsetAndEpoch = log.validateOffsetAndEpoch(100, 2);
+        assertEquals(new ValidOffsetAndEpoch(ValidOffsetAndEpoch.Kind.DIVERGING, olderEpochSnapshotId),
+                resultOffsetAndEpoch);
+    }
+
+    @Test
+    public void testValidateOffsetGreatThanEndOffset() {
+        int numberOfRecords = 1;
+        int epoch = 1;
+
+        appendBatch(numberOfRecords, epoch);
+
+        ValidOffsetAndEpoch resultOffsetAndEpoch = log.validateOffsetAndEpoch(numberOfRecords + 1, epoch);
+        assertEquals(new ValidOffsetAndEpoch(ValidOffsetAndEpoch.Kind.DIVERGING, new OffsetAndEpoch(log.endOffset().offset, epoch)),
+                resultOffsetAndEpoch);
+    }
+
+    @Test
+    public void testValidateOffsetLessThanLEO() {
+        int numberOfRecords = 10;
+        int epoch = 1;
+
+        appendBatch(numberOfRecords, epoch);
+        appendBatch(numberOfRecords, epoch + 1);
+
+        ValidOffsetAndEpoch resultOffsetAndEpoch = log.validateOffsetAndEpoch(11, epoch);
+        assertEquals(new ValidOffsetAndEpoch(ValidOffsetAndEpoch.Kind.DIVERGING, new OffsetAndEpoch(10, epoch)),
+                resultOffsetAndEpoch);
+    }
+
+    @Test
+    public void testValidateValidEpochAndOffset() {
+        int numberOfRecords = 5;
+        int epoch = 1;
+
+        appendBatch(numberOfRecords, epoch);
+
+        ValidOffsetAndEpoch resultOffsetAndEpoch = log.validateOffsetAndEpoch(numberOfRecords - 1, epoch);
+        assertEquals(new ValidOffsetAndEpoch(ValidOffsetAndEpoch.Kind.VALID, new OffsetAndEpoch(numberOfRecords - 1, epoch)),
+                resultOffsetAndEpoch);
+    }
+
     private Optional<OffsetRange> readOffsets(long startOffset, Isolation isolation) {
         Records records = log.read(startOffset, isolation).records;
         long firstReadOffset = -1L;