You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2021/03/19 17:11:09 UTC
[kafka] branch trunk updated: KAFKA-12253: Add tests that cover all
of the cases for ReplicatedLog's validateOffsetAndEpoch (#10276)
This is an automated email from the ASF dual-hosted git repository.
jgus pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new a19806f KAFKA-12253: Add tests that cover all of the cases for ReplicatedLog's validateOffsetAndEpoch (#10276)
a19806f is described below
commit a19806f2628e79baa2dbf84ea307787af7ca186f
Author: Rohit Deshpande <ro...@gmail.com>
AuthorDate: Fri Mar 19 10:09:38 2021 -0700
KAFKA-12253: Add tests that cover all of the cases for ReplicatedLog's validateOffsetAndEpoch (#10276)
Improves test coverage of `validateOffsetAndEpoch`.
Reviewers: José Armando García Sancio <js...@users.noreply.github.com>, Jason Gustafson <ja...@confluent.io>
---
.../scala/kafka/raft/KafkaMetadataLogTest.scala | 190 +++++++++++++++++++--
.../org/apache/kafka/raft/KafkaRaftClient.java | 4 +-
.../org/apache/kafka/raft/ValidOffsetAndEpoch.java | 34 +++-
.../java/org/apache/kafka/raft/MockLogTest.java | 137 +++++++++++++++
4 files changed, 341 insertions(+), 24 deletions(-)
diff --git a/core/src/test/scala/kafka/raft/KafkaMetadataLogTest.scala b/core/src/test/scala/kafka/raft/KafkaMetadataLogTest.scala
index bc7d110..ffbba2a 100644
--- a/core/src/test/scala/kafka/raft/KafkaMetadataLogTest.scala
+++ b/core/src/test/scala/kafka/raft/KafkaMetadataLogTest.scala
@@ -30,7 +30,7 @@ import org.apache.kafka.common.protocol.{ObjectSerializationCache, Writable}
import org.apache.kafka.common.record.{CompressionType, MemoryRecords, SimpleRecord}
import org.apache.kafka.common.utils.Utils
import org.apache.kafka.raft.internals.BatchBuilder
-import org.apache.kafka.raft.{KafkaRaftClient, LogAppendInfo, LogOffsetMetadata, OffsetAndEpoch, RecordSerde, ReplicatedLog}
+import org.apache.kafka.raft.{KafkaRaftClient, LogAppendInfo, LogOffsetMetadata, OffsetAndEpoch, RecordSerde, ReplicatedLog, ValidOffsetAndEpoch}
import org.apache.kafka.snapshot.{SnapshotPath, Snapshots}
import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, assertNotEquals, assertThrows, assertTrue}
import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
@@ -132,7 +132,7 @@ final class KafkaMetadataLogTest {
assertEquals(offset, log.highWatermark.offset)
val newRecords = 10
- append(log, newRecords, epoch + 1, log.endOffset.offset)
+ append(log, newRecords, epoch + 1)
// Start offset should not change since a new snapshot was not generated
assertFalse(log.deleteBeforeSnapshot(new OffsetAndEpoch(offset + newRecords, epoch)))
assertEquals(offset, log.startOffset)
@@ -154,7 +154,7 @@ final class KafkaMetadataLogTest {
snapshot.freeze()
}
- append(log, offset, epoch, log.endOffset.offset)
+ append(log, offset, epoch)
val newSnapshotId = new OffsetAndEpoch(offset * 2, epoch)
TestUtils.resource(log.createSnapshot(newSnapshotId)) { snapshot =>
snapshot.freeze()
@@ -232,7 +232,7 @@ final class KafkaMetadataLogTest {
val greaterEpochSnapshotId = new OffsetAndEpoch(3 * numberOfRecords, epoch + 1)
- append(log, numberOfRecords, epoch, log.endOffset.offset)
+ append(log, numberOfRecords, epoch)
TestUtils.resource(log.createSnapshot(greaterEpochSnapshotId)) { snapshot =>
snapshot.freeze()
@@ -258,20 +258,20 @@ final class KafkaMetadataLogTest {
snapshot.freeze()
}
- append(log, 1, epoch, log.endOffset.offset)
+ append(log, 1, epoch)
val oldSnapshotId2 = new OffsetAndEpoch(2, epoch)
TestUtils.resource(log.createSnapshot(oldSnapshotId2)) { snapshot =>
snapshot.freeze()
}
- append(log, numberOfRecords - 2, epoch, log.endOffset.offset)
+ append(log, numberOfRecords - 2, epoch)
val oldSnapshotId3 = new OffsetAndEpoch(numberOfRecords, epoch)
TestUtils.resource(log.createSnapshot(oldSnapshotId3)) { snapshot =>
snapshot.freeze()
}
val greaterSnapshotId = new OffsetAndEpoch(3 * numberOfRecords, epoch)
- append(log, numberOfRecords, epoch, log.endOffset.offset)
+ append(log, numberOfRecords, epoch)
TestUtils.resource(log.createSnapshot(greaterSnapshotId)) { snapshot =>
snapshot.freeze()
}
@@ -307,7 +307,7 @@ final class KafkaMetadataLogTest {
assertFalse(log.truncateToLatestSnapshot())
- append(log, numberOfRecords, epoch, log.endOffset.offset)
+ append(log, numberOfRecords, epoch)
val olderOffsetSnapshotId = new OffsetAndEpoch(numberOfRecords, epoch)
TestUtils.resource(log.createSnapshot(olderOffsetSnapshotId)) { snapshot =>
@@ -366,20 +366,20 @@ final class KafkaMetadataLogTest {
snapshot.freeze()
}
- append(log, 1, epoch, log.endOffset.offset)
+ append(log, 1, epoch)
val oldSnapshotId2 = new OffsetAndEpoch(2, epoch)
TestUtils.resource(log.createSnapshot(oldSnapshotId2)) { snapshot =>
snapshot.freeze()
}
- append(log, numberOfRecords - 2, epoch, log.endOffset.offset)
+ append(log, numberOfRecords - 2, epoch)
val oldSnapshotId3 = new OffsetAndEpoch(numberOfRecords, epoch)
TestUtils.resource(log.createSnapshot(oldSnapshotId3)) { snapshot =>
snapshot.freeze()
}
val greaterSnapshotId = new OffsetAndEpoch(3 * numberOfRecords, epoch)
- append(log, numberOfRecords, epoch, log.endOffset.offset)
+ append(log, numberOfRecords, epoch)
TestUtils.resource(log.createSnapshot(greaterSnapshotId)) { snapshot =>
snapshot.freeze()
}
@@ -485,6 +485,170 @@ final class KafkaMetadataLogTest {
batchBuilder.build()
}
+ @Test
+ def testValidateEpochGreaterThanLastKnownEpoch(): Unit = {
+ val log = buildMetadataLog(tempDir, mockTime)
+
+ val numberOfRecords = 1
+ val epoch = 1
+
+ append(log, numberOfRecords, epoch)
+
+ val resultOffsetAndEpoch = log.validateOffsetAndEpoch(numberOfRecords, epoch + 1)
+ assertEquals(ValidOffsetAndEpoch.Kind.DIVERGING, resultOffsetAndEpoch.kind)
+ assertEquals(new OffsetAndEpoch(log.endOffset.offset, epoch), resultOffsetAndEpoch.offsetAndEpoch())
+ }
+
+ @Test
+ def testValidateEpochLessThanOldestSnapshotEpoch(): Unit = {
+ val log = buildMetadataLog(tempDir, mockTime)
+
+ val numberOfRecords = 10
+ val epoch = 1
+
+ append(log, numberOfRecords, epoch)
+ log.updateHighWatermark(new LogOffsetMetadata(numberOfRecords))
+
+ val snapshotId = new OffsetAndEpoch(numberOfRecords, epoch)
+ TestUtils.resource(log.createSnapshot(snapshotId)) { snapshot =>
+ snapshot.freeze()
+ }
+ assertTrue(log.deleteBeforeSnapshot(snapshotId))
+
+ val resultOffsetAndEpoch = log.validateOffsetAndEpoch(numberOfRecords, epoch - 1)
+ assertEquals(ValidOffsetAndEpoch.Kind.SNAPSHOT, resultOffsetAndEpoch.kind)
+ assertEquals(snapshotId, resultOffsetAndEpoch.offsetAndEpoch())
+ }
+
+ @Test
+ def testValidateOffsetLessThanOldestSnapshotOffset(): Unit = {
+ val log = buildMetadataLog(tempDir, mockTime)
+
+ val offset = 2
+ val epoch = 1
+
+ append(log, offset, epoch)
+ log.updateHighWatermark(new LogOffsetMetadata(offset))
+
+ val snapshotId = new OffsetAndEpoch(offset, epoch)
+ TestUtils.resource(log.createSnapshot(snapshotId)) { snapshot =>
+ snapshot.freeze()
+ }
+ assertTrue(log.deleteBeforeSnapshot(snapshotId))
+
+ val resultOffsetAndEpoch = log.validateOffsetAndEpoch(offset - 1, epoch)
+ assertEquals(ValidOffsetAndEpoch.Kind.SNAPSHOT, resultOffsetAndEpoch.kind)
+ assertEquals(snapshotId, resultOffsetAndEpoch.offsetAndEpoch())
+ }
+
+ @Test
+ def testValidateOffsetEqualToOldestSnapshotOffset(): Unit = {
+ val log = buildMetadataLog(tempDir, mockTime)
+
+ val offset = 2
+ val epoch = 1
+
+ append(log, offset, epoch)
+ log.updateHighWatermark(new LogOffsetMetadata(offset))
+
+ val snapshotId = new OffsetAndEpoch(offset, epoch)
+ TestUtils.resource(log.createSnapshot(snapshotId)) { snapshot =>
+ snapshot.freeze()
+ }
+ assertTrue(log.deleteBeforeSnapshot(snapshotId))
+
+ val resultOffsetAndEpoch = log.validateOffsetAndEpoch(offset, epoch)
+ assertEquals(ValidOffsetAndEpoch.Kind.VALID, resultOffsetAndEpoch.kind)
+ assertEquals(snapshotId, resultOffsetAndEpoch.offsetAndEpoch())
+ }
+
+ @Test
+ def testValidateUnknownEpochLessThanLastKnownGreaterThanOldestSnapshot(): Unit = {
+ val offset = 10
+ val numOfRecords = 5
+
+ val log = buildMetadataLog(tempDir, mockTime)
+ log.updateHighWatermark(new LogOffsetMetadata(offset))
+ val snapshotId = new OffsetAndEpoch(offset, 1)
+ TestUtils.resource(log.createSnapshot(snapshotId)) { snapshot =>
+ snapshot.freeze()
+ }
+ log.truncateToLatestSnapshot()
+
+
+ append(log, numOfRecords, epoch = 1)
+ append(log, numOfRecords, epoch = 2)
+ append(log, numOfRecords, epoch = 4)
+
+ // offset is not equal to oldest snapshot's offset
+ val resultOffsetAndEpoch = log.validateOffsetAndEpoch(100, 3)
+ assertEquals(ValidOffsetAndEpoch.Kind.DIVERGING, resultOffsetAndEpoch.kind)
+ assertEquals(new OffsetAndEpoch(20, 2), resultOffsetAndEpoch.offsetAndEpoch())
+ }
+
+ @Test
+ def testValidateEpochLessThanFirstEpochInLog(): Unit = {
+ val offset = 10
+ val numOfRecords = 5
+
+ val log = buildMetadataLog(tempDir, mockTime)
+ log.updateHighWatermark(new LogOffsetMetadata(offset))
+ val snapshotId = new OffsetAndEpoch(offset, 1)
+ TestUtils.resource(log.createSnapshot(snapshotId)) { snapshot =>
+ snapshot.freeze()
+ }
+ log.truncateToLatestSnapshot()
+
+ append(log, numOfRecords, epoch = 3)
+
+ // offset is not equal to oldest snapshot's offset
+ val resultOffsetAndEpoch = log.validateOffsetAndEpoch(100, 2)
+ assertEquals(ValidOffsetAndEpoch.Kind.DIVERGING, resultOffsetAndEpoch.kind)
+ assertEquals(snapshotId, resultOffsetAndEpoch.offsetAndEpoch())
+ }
+
+ @Test
+ def testValidateOffsetGreatThanEndOffset(): Unit = {
+ val log = buildMetadataLog(tempDir, mockTime)
+
+ val numberOfRecords = 1
+ val epoch = 1
+
+ append(log, numberOfRecords, epoch)
+
+ val resultOffsetAndEpoch = log.validateOffsetAndEpoch(numberOfRecords + 1, epoch)
+ assertEquals(ValidOffsetAndEpoch.Kind.DIVERGING, resultOffsetAndEpoch.kind)
+ assertEquals(new OffsetAndEpoch(log.endOffset.offset, epoch), resultOffsetAndEpoch.offsetAndEpoch())
+ }
+
+ @Test
+ def testValidateOffsetLessThanLEO(): Unit = {
+ val log = buildMetadataLog(tempDir, mockTime)
+
+ val numberOfRecords = 10
+ val epoch = 1
+
+ append(log, numberOfRecords, epoch)
+ append(log, numberOfRecords, epoch + 1)
+
+ val resultOffsetAndEpoch = log.validateOffsetAndEpoch(11, epoch)
+ assertEquals(ValidOffsetAndEpoch.Kind.DIVERGING, resultOffsetAndEpoch.kind)
+ assertEquals(new OffsetAndEpoch(10, epoch), resultOffsetAndEpoch.offsetAndEpoch())
+ }
+
+ @Test
+ def testValidateValidEpochAndOffset(): Unit = {
+ val log = buildMetadataLog(tempDir, mockTime)
+
+ val numberOfRecords = 5
+ val epoch = 1
+
+ append(log, numberOfRecords, epoch)
+
+ val resultOffsetAndEpoch = log.validateOffsetAndEpoch(numberOfRecords - 1, epoch)
+ assertEquals(ValidOffsetAndEpoch.Kind.VALID, resultOffsetAndEpoch.kind)
+ assertEquals(new OffsetAndEpoch(numberOfRecords - 1, epoch), resultOffsetAndEpoch.offsetAndEpoch())
+ }
}
object KafkaMetadataLogTest {
@@ -536,10 +700,10 @@ object KafkaMetadataLogTest {
log
}
- def append(log: ReplicatedLog, numberOfRecords: Int, epoch: Int, initialOffset: Long = 0L): LogAppendInfo = {
+ def append(log: ReplicatedLog, numberOfRecords: Int, epoch: Int): LogAppendInfo = {
log.appendAsLeader(
MemoryRecords.withRecords(
- initialOffset,
+ log.endOffset().offset,
CompressionType.NONE,
epoch,
(0 until numberOfRecords).map(number => new SimpleRecord(number.toString.getBytes)): _*
diff --git a/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java b/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
index 7a938c4..e02e20b2 100644
--- a/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
+++ b/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
@@ -911,7 +911,7 @@ public class KafkaRaftClient<T> implements RaftClient<T> {
.setLeaderEpoch(quorum.epoch())
.setLeaderId(quorum.leaderIdOrSentinel());
- switch (validOffsetAndEpoch.type()) {
+ switch (validOffsetAndEpoch.kind()) {
case DIVERGING:
partitionData.divergingEpoch()
.setEpoch(validOffsetAndEpoch.offsetAndEpoch().epoch)
@@ -1039,7 +1039,7 @@ public class KafkaRaftClient<T> implements RaftClient<T> {
ValidOffsetAndEpoch validOffsetAndEpoch = log.validateOffsetAndEpoch(fetchOffset, lastFetchedEpoch);
final Records records;
- if (validOffsetAndEpoch.type() == ValidOffsetAndEpoch.Type.VALID) {
+ if (validOffsetAndEpoch.kind() == ValidOffsetAndEpoch.Kind.VALID) {
LogFetchInfo info = log.read(fetchOffset, Isolation.UNCOMMITTED);
if (state.updateReplicaState(replicaId, currentTimeMs, info.startOffsetMetadata)) {
diff --git a/raft/src/main/java/org/apache/kafka/raft/ValidOffsetAndEpoch.java b/raft/src/main/java/org/apache/kafka/raft/ValidOffsetAndEpoch.java
index 5929e22..8f1d183 100644
--- a/raft/src/main/java/org/apache/kafka/raft/ValidOffsetAndEpoch.java
+++ b/raft/src/main/java/org/apache/kafka/raft/ValidOffsetAndEpoch.java
@@ -16,40 +16,56 @@
*/
package org.apache.kafka.raft;
+import java.util.Objects;
+
public final class ValidOffsetAndEpoch {
- final private Type type;
+ final private Kind kind;
final private OffsetAndEpoch offsetAndEpoch;
- ValidOffsetAndEpoch(Type type, OffsetAndEpoch offsetAndEpoch) {
- this.type = type;
+ ValidOffsetAndEpoch(Kind kind, OffsetAndEpoch offsetAndEpoch) {
+ this.kind = kind;
this.offsetAndEpoch = offsetAndEpoch;
}
- public Type type() {
- return type;
+ public Kind kind() {
+ return kind;
}
public OffsetAndEpoch offsetAndEpoch() {
return offsetAndEpoch;
}
- public static enum Type {
+ public enum Kind {
DIVERGING, SNAPSHOT, VALID
}
public static ValidOffsetAndEpoch diverging(OffsetAndEpoch offsetAndEpoch) {
- return new ValidOffsetAndEpoch(Type.DIVERGING, offsetAndEpoch);
+ return new ValidOffsetAndEpoch(Kind.DIVERGING, offsetAndEpoch);
}
public static ValidOffsetAndEpoch snapshot(OffsetAndEpoch offsetAndEpoch) {
- return new ValidOffsetAndEpoch(Type.SNAPSHOT, offsetAndEpoch);
+ return new ValidOffsetAndEpoch(Kind.SNAPSHOT, offsetAndEpoch);
}
public static ValidOffsetAndEpoch valid(OffsetAndEpoch offsetAndEpoch) {
- return new ValidOffsetAndEpoch(Type.VALID, offsetAndEpoch);
+ return new ValidOffsetAndEpoch(Kind.VALID, offsetAndEpoch);
}
public static ValidOffsetAndEpoch valid() {
return valid(new OffsetAndEpoch(-1, -1));
}
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj) return true;
+ if (obj == null || getClass() != obj.getClass()) return false;
+ ValidOffsetAndEpoch that = (ValidOffsetAndEpoch) obj;
+ return kind == that.kind &&
+ offsetAndEpoch.equals(that.offsetAndEpoch);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(kind, offsetAndEpoch);
+ }
}
diff --git a/raft/src/test/java/org/apache/kafka/raft/MockLogTest.java b/raft/src/test/java/org/apache/kafka/raft/MockLogTest.java
index cd48c61..1c7f131 100644
--- a/raft/src/test/java/org/apache/kafka/raft/MockLogTest.java
+++ b/raft/src/test/java/org/apache/kafka/raft/MockLogTest.java
@@ -657,6 +657,143 @@ public class MockLogTest {
assertEquals(Optional.empty(), log.readSnapshot(sameEpochSnapshotId));
}
+ @Test
+ public void testValidateEpochGreaterThanLastKnownEpoch() {
+ int numberOfRecords = 1;
+ int epoch = 1;
+
+ appendBatch(numberOfRecords, epoch);
+
+ ValidOffsetAndEpoch resultOffsetAndEpoch = log.validateOffsetAndEpoch(numberOfRecords, epoch + 1);
+ assertEquals(new ValidOffsetAndEpoch(ValidOffsetAndEpoch.Kind.DIVERGING, new OffsetAndEpoch(log.endOffset().offset, epoch)),
+ resultOffsetAndEpoch);
+ }
+
+ @Test
+ public void testValidateEpochLessThanOldestSnapshotEpoch() throws IOException {
+ int offset = 1;
+ int epoch = 1;
+
+ OffsetAndEpoch olderEpochSnapshotId = new OffsetAndEpoch(offset, epoch);
+ try (RawSnapshotWriter snapshot = log.createSnapshot(olderEpochSnapshotId)) {
+ snapshot.freeze();
+ }
+ log.truncateToLatestSnapshot();
+
+ ValidOffsetAndEpoch resultOffsetAndEpoch = log.validateOffsetAndEpoch(offset, epoch - 1);
+ assertEquals(new ValidOffsetAndEpoch(ValidOffsetAndEpoch.Kind.SNAPSHOT, olderEpochSnapshotId),
+ resultOffsetAndEpoch);
+ }
+
+ @Test
+ public void testValidateOffsetLessThanOldestSnapshotOffset() throws IOException {
+ int offset = 2;
+ int epoch = 1;
+
+ OffsetAndEpoch olderEpochSnapshotId = new OffsetAndEpoch(offset, epoch);
+ try (RawSnapshotWriter snapshot = log.createSnapshot(olderEpochSnapshotId)) {
+ snapshot.freeze();
+ }
+ log.truncateToLatestSnapshot();
+
+ ValidOffsetAndEpoch resultOffsetAndEpoch = log.validateOffsetAndEpoch(offset - 1, epoch);
+ assertEquals(new ValidOffsetAndEpoch(ValidOffsetAndEpoch.Kind.SNAPSHOT, olderEpochSnapshotId),
+ resultOffsetAndEpoch);
+ }
+
+ @Test
+ public void testValidateOffsetEqualToOldestSnapshotOffset() throws IOException {
+ int offset = 2;
+ int epoch = 1;
+
+ OffsetAndEpoch olderEpochSnapshotId = new OffsetAndEpoch(offset, epoch);
+ try (RawSnapshotWriter snapshot = log.createSnapshot(olderEpochSnapshotId)) {
+ snapshot.freeze();
+ }
+ log.truncateToLatestSnapshot();
+
+ ValidOffsetAndEpoch resultOffsetAndEpoch = log.validateOffsetAndEpoch(offset, epoch);
+ assertEquals(new ValidOffsetAndEpoch(ValidOffsetAndEpoch.Kind.VALID, olderEpochSnapshotId),
+ resultOffsetAndEpoch);
+ }
+
+ @Test
+ public void testValidateUnknownEpochLessThanLastKnownGreaterThanOldestSnapshot() throws IOException {
+ int numberOfRecords = 5;
+ int offset = 10;
+
+ OffsetAndEpoch olderEpochSnapshotId = new OffsetAndEpoch(offset, 1);
+ try (RawSnapshotWriter snapshot = log.createSnapshot(olderEpochSnapshotId)) {
+ snapshot.freeze();
+ }
+ log.truncateToLatestSnapshot();
+
+ appendBatch(numberOfRecords, 1);
+ appendBatch(numberOfRecords, 2);
+ appendBatch(numberOfRecords, 4);
+
+ // offset is not equal to oldest snapshot's offset
+ ValidOffsetAndEpoch resultOffsetAndEpoch = log.validateOffsetAndEpoch(100, 3);
+ assertEquals(new ValidOffsetAndEpoch(ValidOffsetAndEpoch.Kind.DIVERGING, new OffsetAndEpoch(20, 2)),
+ resultOffsetAndEpoch);
+ }
+
+ @Test
+ public void testValidateEpochLessThanFirstEpochInLog() throws IOException {
+ int numberOfRecords = 5;
+ int offset = 10;
+
+ OffsetAndEpoch olderEpochSnapshotId = new OffsetAndEpoch(offset, 1);
+ try (RawSnapshotWriter snapshot = log.createSnapshot(olderEpochSnapshotId)) {
+ snapshot.freeze();
+ }
+ log.truncateToLatestSnapshot();
+
+ appendBatch(numberOfRecords, 3);
+
+ // offset is not equal to oldest snapshot's offset
+ ValidOffsetAndEpoch resultOffsetAndEpoch = log.validateOffsetAndEpoch(100, 2);
+ assertEquals(new ValidOffsetAndEpoch(ValidOffsetAndEpoch.Kind.DIVERGING, olderEpochSnapshotId),
+ resultOffsetAndEpoch);
+ }
+
+ @Test
+ public void testValidateOffsetGreatThanEndOffset() {
+ int numberOfRecords = 1;
+ int epoch = 1;
+
+ appendBatch(numberOfRecords, epoch);
+
+ ValidOffsetAndEpoch resultOffsetAndEpoch = log.validateOffsetAndEpoch(numberOfRecords + 1, epoch);
+ assertEquals(new ValidOffsetAndEpoch(ValidOffsetAndEpoch.Kind.DIVERGING, new OffsetAndEpoch(log.endOffset().offset, epoch)),
+ resultOffsetAndEpoch);
+ }
+
+ @Test
+ public void testValidateOffsetLessThanLEO() {
+ int numberOfRecords = 10;
+ int epoch = 1;
+
+ appendBatch(numberOfRecords, epoch);
+ appendBatch(numberOfRecords, epoch + 1);
+
+ ValidOffsetAndEpoch resultOffsetAndEpoch = log.validateOffsetAndEpoch(11, epoch);
+ assertEquals(new ValidOffsetAndEpoch(ValidOffsetAndEpoch.Kind.DIVERGING, new OffsetAndEpoch(10, epoch)),
+ resultOffsetAndEpoch);
+ }
+
+ @Test
+ public void testValidateValidEpochAndOffset() {
+ int numberOfRecords = 5;
+ int epoch = 1;
+
+ appendBatch(numberOfRecords, epoch);
+
+ ValidOffsetAndEpoch resultOffsetAndEpoch = log.validateOffsetAndEpoch(numberOfRecords - 1, epoch);
+ assertEquals(new ValidOffsetAndEpoch(ValidOffsetAndEpoch.Kind.VALID, new OffsetAndEpoch(numberOfRecords - 1, epoch)),
+ resultOffsetAndEpoch);
+ }
+
private Optional<OffsetRange> readOffsets(long startOffset, Isolation isolation) {
Records records = log.read(startOffset, isolation).records;
long firstReadOffset = -1L;