You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ij...@apache.org on 2017/04/07 17:30:11 UTC
[2/2] kafka git commit: MINOR: Log append validation improvements
MINOR: Log append validation improvements
- Consistent validation across different code paths in LogValidator
- Validate baseOffset for message format V2
- Flesh out LogValidatorTest to check producerId, baseSequence, producerEpoch and partitionLeaderEpoch.
Author: Ismael Juma <is...@juma.me.uk>
Reviewers: Jun Rao <ju...@gmail.com>
Closes #2802 from ijuma/validate-base-offset
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/5cf64f06
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/5cf64f06
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/5cf64f06
Branch: refs/heads/trunk
Commit: 5cf64f06a877a181d12a2ae2390516ba1a572135
Parents: a4c5068
Author: Ismael Juma <is...@juma.me.uk>
Authored: Fri Apr 7 18:29:55 2017 +0100
Committer: Ismael Juma <is...@juma.me.uk>
Committed: Fri Apr 7 18:29:55 2017 +0100
----------------------------------------------------------------------
.../record/AbstractLegacyRecordBatch.java | 2 +-
.../kafka/common/record/MemoryRecords.java | 31 +-
.../common/record/MemoryRecordsBuilder.java | 33 +-
.../apache/kafka/common/record/RecordBatch.java | 2 +-
.../kafka/common/requests/EpochEndOffset.java | 4 +-
.../common/record/MemoryRecordsBuilderTest.java | 33 +-
.../kafka/common/record/MemoryRecordsTest.java | 6 +-
core/src/main/scala/kafka/log/Log.scala | 11 +-
.../src/main/scala/kafka/log/LogValidator.scala | 153 ++++----
.../scala/kafka/log/ProducerIdMapping.scala | 2 +-
.../server/epoch/LeaderEpochFileCache.scala | 5 +-
.../scala/unit/kafka/log/LogCleanerTest.scala | 21 +-
.../src/test/scala/unit/kafka/log/LogTest.scala | 214 ++++++-----
.../scala/unit/kafka/log/LogValidatorTest.scala | 368 +++++++++++++------
...rivenReplicationProtocolAcceptanceTest.scala | 3 +-
15 files changed, 529 insertions(+), 359 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/5cf64f06/clients/src/main/java/org/apache/kafka/common/record/AbstractLegacyRecordBatch.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/AbstractLegacyRecordBatch.java b/clients/src/main/java/org/apache/kafka/common/record/AbstractLegacyRecordBatch.java
index 7e09b93..ddb2bc7 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/AbstractLegacyRecordBatch.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/AbstractLegacyRecordBatch.java
@@ -201,7 +201,7 @@ public abstract class AbstractLegacyRecordBatch extends AbstractRecordBatch impl
@Override
public int partitionLeaderEpoch() {
- return RecordBatch.UNKNOWN_PARTITION_LEADER_EPOCH;
+ return RecordBatch.NO_PARTITION_LEADER_EPOCH;
}
@Override
http://git-wip-us.apache.org/repos/asf/kafka/blob/5cf64f06/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
index b3beed5..9a20a97 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
@@ -303,7 +303,7 @@ public class MemoryRecords extends AbstractRecords {
long baseOffset,
long logAppendTime) {
return builder(buffer, magic, compressionType, timestampType, baseOffset, logAppendTime,
- RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, RecordBatch.NO_SEQUENCE, RecordBatch.UNKNOWN_PARTITION_LEADER_EPOCH);
+ RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, RecordBatch.NO_SEQUENCE, RecordBatch.NO_PARTITION_LEADER_EPOCH);
}
public static MemoryRecordsBuilder builder(ByteBuffer buffer,
@@ -323,11 +323,11 @@ public class MemoryRecords extends AbstractRecords {
TimestampType timestampType,
long baseOffset,
long logAppendTime,
- long pid,
- short epoch,
+ long producerId,
+ short producerEpoch,
int baseSequence) {
return builder(buffer, magic, compressionType, timestampType, baseOffset, logAppendTime,
- pid, epoch, baseSequence, RecordBatch.UNKNOWN_PARTITION_LEADER_EPOCH);
+ producerId, producerEpoch, baseSequence, RecordBatch.NO_PARTITION_LEADER_EPOCH);
}
public static MemoryRecordsBuilder builder(ByteBuffer buffer,
@@ -336,12 +336,12 @@ public class MemoryRecords extends AbstractRecords {
TimestampType timestampType,
long baseOffset,
long logAppendTime,
- long pid,
- short epoch,
+ long producerId,
+ short producerEpoch,
int baseSequence,
int partitionLeaderEpoch) {
return new MemoryRecordsBuilder(buffer, magic, compressionType, timestampType, baseOffset,
- logAppendTime, pid, epoch, baseSequence, false, partitionLeaderEpoch,
+ logAppendTime, producerId, producerEpoch, baseSequence, false, partitionLeaderEpoch,
buffer.remaining());
}
@@ -357,21 +357,22 @@ public class MemoryRecords extends AbstractRecords {
return withRecords(RecordBatch.CURRENT_MAGIC_VALUE, initialOffset, compressionType, TimestampType.CREATE_TIME, records);
}
- public static MemoryRecords withRecords(long initialOffset, CompressionType compressionType, Long pid,
- short epoch, int baseSequence, SimpleRecord... records) {
- return withRecords(RecordBatch.CURRENT_MAGIC_VALUE, initialOffset, compressionType, TimestampType.CREATE_TIME,
- pid, epoch, baseSequence, records);
+ public static MemoryRecords withRecords(byte magic, long initialOffset, CompressionType compressionType,
+ long producerId, short producerEpoch, int baseSequence,
+ int partitionLeaderEpoch, SimpleRecord... records) {
+ return withRecords(magic, initialOffset, compressionType, TimestampType.CREATE_TIME, producerId, producerEpoch,
+ baseSequence, partitionLeaderEpoch, records);
}
public static MemoryRecords withRecords(byte magic, long initialOffset, CompressionType compressionType,
TimestampType timestampType, SimpleRecord... records) {
return withRecords(magic, initialOffset, compressionType, timestampType, RecordBatch.NO_PRODUCER_ID,
- RecordBatch.NO_PRODUCER_EPOCH, RecordBatch.NO_SEQUENCE, records);
+ RecordBatch.NO_PRODUCER_EPOCH, RecordBatch.NO_SEQUENCE, RecordBatch.NO_PARTITION_LEADER_EPOCH, records);
}
private static MemoryRecords withRecords(byte magic, long initialOffset, CompressionType compressionType,
- TimestampType timestampType, long pid, short epoch, int baseSequence,
- SimpleRecord ... records) {
+ TimestampType timestampType, long producerId, short producerEpoch,
+ int baseSequence, int partitionLeaderEpoch, SimpleRecord ... records) {
if (records.length == 0)
return MemoryRecords.EMPTY;
int sizeEstimate = AbstractRecords.estimateSizeInBytes(magic, compressionType, Arrays.asList(records));
@@ -380,7 +381,7 @@ public class MemoryRecords extends AbstractRecords {
if (timestampType == TimestampType.LOG_APPEND_TIME)
logAppendTime = System.currentTimeMillis();
MemoryRecordsBuilder builder = builder(buffer, magic, compressionType, timestampType, initialOffset,
- logAppendTime, pid, epoch, baseSequence);
+ logAppendTime, producerId, producerEpoch, baseSequence, partitionLeaderEpoch);
for (SimpleRecord record : records)
builder.append(record);
return builder.build();
http://git-wip-us.apache.org/repos/asf/kafka/blob/5cf64f06/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java
index 7f66193..208db5b 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java
@@ -189,17 +189,36 @@ public class MemoryRecordsBuilder {
}
/**
- * Get the max timestamp and its offset. If the log append time is used, then the offset will
- * be either the first offset in the set if no compression is used or the last offset otherwise.
+ * Get the max timestamp and its offset. The details of the offset returned are a bit subtle.
+ *
+ * If the log append time is used, the offset will be the last offset unless no compression is used and
+ * the message format version is 0 or 1, in which case, it will be the first offset.
+ *
+ * If create time is used, the offset will be the last offset unless no compression is used and the message
+ * format version is 0 or 1, in which case, it will be the offset of the record with the max timestamp.
+ *
* @return The max timestamp and its offset
*/
public RecordsInfo info() {
- if (timestampType == TimestampType.LOG_APPEND_TIME)
- return new RecordsInfo(logAppendTime, lastOffset);
- else if (maxTimestamp == RecordBatch.NO_TIMESTAMP)
+ if (timestampType == TimestampType.LOG_APPEND_TIME) {
+ long shallowOffsetOfMaxTimestamp;
+ // Use the last offset when dealing with record batches
+ if (compressionType != CompressionType.NONE || magic >= RecordBatch.MAGIC_VALUE_V2)
+ shallowOffsetOfMaxTimestamp = lastOffset;
+ else
+ shallowOffsetOfMaxTimestamp = baseOffset;
+ return new RecordsInfo(logAppendTime, shallowOffsetOfMaxTimestamp);
+ } else if (maxTimestamp == RecordBatch.NO_TIMESTAMP) {
return new RecordsInfo(RecordBatch.NO_TIMESTAMP, lastOffset);
- else
- return new RecordsInfo(maxTimestamp, compressionType == CompressionType.NONE ? offsetOfMaxTimestamp : lastOffset);
+ } else {
+ long shallowOffsetOfMaxTimestamp;
+ // Use the last offset when dealing with record batches
+ if (compressionType != CompressionType.NONE || magic >= RecordBatch.MAGIC_VALUE_V2)
+ shallowOffsetOfMaxTimestamp = lastOffset;
+ else
+ shallowOffsetOfMaxTimestamp = offsetOfMaxTimestamp;
+ return new RecordsInfo(maxTimestamp, shallowOffsetOfMaxTimestamp);
+ }
}
public void setProducerState(long pid, short epoch, int baseSequence) {
http://git-wip-us.apache.org/repos/asf/kafka/blob/5cf64f06/clients/src/main/java/org/apache/kafka/common/record/RecordBatch.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/RecordBatch.java b/clients/src/main/java/org/apache/kafka/common/record/RecordBatch.java
index ae4a225..4fd03e1 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/RecordBatch.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/RecordBatch.java
@@ -58,7 +58,7 @@ public interface RecordBatch extends Iterable<Record> {
* Used to indicate an unknown leader epoch, which will be the case when the record set is
* first created by the producer.
*/
- int UNKNOWN_PARTITION_LEADER_EPOCH = -1;
+ int NO_PARTITION_LEADER_EPOCH = -1;
/**
* Check whether the checksum of this batch is correct.
http://git-wip-us.apache.org/repos/asf/kafka/blob/5cf64f06/clients/src/main/java/org/apache/kafka/common/requests/EpochEndOffset.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/EpochEndOffset.java b/clients/src/main/java/org/apache/kafka/common/requests/EpochEndOffset.java
index 2d49149..0965e36 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/EpochEndOffset.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/EpochEndOffset.java
@@ -18,14 +18,14 @@ package org.apache.kafka.common.requests;
import org.apache.kafka.common.protocol.Errors;
-import static org.apache.kafka.common.record.RecordBatch.UNKNOWN_PARTITION_LEADER_EPOCH;
+import static org.apache.kafka.common.record.RecordBatch.NO_PARTITION_LEADER_EPOCH;
/**
* The offset, fetched from a leader, for a particular partition.
*/
public class EpochEndOffset {
- public static final long UNDEFINED_EPOCH_OFFSET = UNKNOWN_PARTITION_LEADER_EPOCH;
+ public static final long UNDEFINED_EPOCH_OFFSET = NO_PARTITION_LEADER_EPOCH;
public static final int UNDEFINED_EPOCH = -1;
private Errors error;
http://git-wip-us.apache.org/repos/asf/kafka/blob/5cf64f06/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsBuilderTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsBuilderTest.java b/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsBuilderTest.java
index ef48783..3cc8c20 100644
--- a/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsBuilderTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsBuilderTest.java
@@ -50,7 +50,7 @@ public class MemoryRecordsBuilderTest {
MemoryRecordsBuilder builder = new MemoryRecordsBuilder(buffer, RecordBatch.MAGIC_VALUE_V0, compressionType,
TimestampType.CREATE_TIME, 0L, 0L, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, RecordBatch.NO_SEQUENCE,
- false, RecordBatch.UNKNOWN_PARTITION_LEADER_EPOCH, buffer.capacity());
+ false, RecordBatch.NO_PARTITION_LEADER_EPOCH, buffer.capacity());
MemoryRecords records = builder.build();
assertEquals(0, records.sizeInBytes());
assertEquals(bufferOffset, buffer.position());
@@ -66,7 +66,7 @@ public class MemoryRecordsBuilderTest {
int sequence = 2342;
MemoryRecordsBuilder builder = new MemoryRecordsBuilder(buffer, RecordBatch.CURRENT_MAGIC_VALUE, compressionType,
- TimestampType.CREATE_TIME, 0L, 0L, pid, epoch, sequence, true, RecordBatch.UNKNOWN_PARTITION_LEADER_EPOCH,
+ TimestampType.CREATE_TIME, 0L, 0L, pid, epoch, sequence, true, RecordBatch.NO_PARTITION_LEADER_EPOCH,
buffer.capacity());
builder.append(System.currentTimeMillis(), "foo".getBytes(), "bar".getBytes());
MemoryRecords records = builder.build();
@@ -86,7 +86,7 @@ public class MemoryRecordsBuilderTest {
int sequence = 2342;
new MemoryRecordsBuilder(buffer, RecordBatch.MAGIC_VALUE_V0, compressionType, TimestampType.CREATE_TIME,
- 0L, 0L, pid, epoch, sequence, true, RecordBatch.UNKNOWN_PARTITION_LEADER_EPOCH, buffer.capacity());
+ 0L, 0L, pid, epoch, sequence, true, RecordBatch.NO_PARTITION_LEADER_EPOCH, buffer.capacity());
}
@Test(expected = IllegalArgumentException.class)
@@ -99,7 +99,7 @@ public class MemoryRecordsBuilderTest {
int sequence = 2342;
new MemoryRecordsBuilder(buffer, RecordBatch.MAGIC_VALUE_V1, compressionType, TimestampType.CREATE_TIME,
- 0L, 0L, pid, epoch, sequence, true, RecordBatch.UNKNOWN_PARTITION_LEADER_EPOCH, buffer.capacity());
+ 0L, 0L, pid, epoch, sequence, true, RecordBatch.NO_PARTITION_LEADER_EPOCH, buffer.capacity());
}
@Test(expected = IllegalArgumentException.class)
@@ -112,7 +112,7 @@ public class MemoryRecordsBuilderTest {
int sequence = 2342;
new MemoryRecordsBuilder(buffer, RecordBatch.CURRENT_MAGIC_VALUE, compressionType, TimestampType.CREATE_TIME,
- 0L, 0L, pid, epoch, sequence, true, RecordBatch.UNKNOWN_PARTITION_LEADER_EPOCH, buffer.capacity());
+ 0L, 0L, pid, epoch, sequence, true, RecordBatch.NO_PARTITION_LEADER_EPOCH, buffer.capacity());
}
@Test(expected = IllegalArgumentException.class)
@@ -125,7 +125,7 @@ public class MemoryRecordsBuilderTest {
int sequence = 2342;
new MemoryRecordsBuilder(buffer, RecordBatch.CURRENT_MAGIC_VALUE, compressionType, TimestampType.CREATE_TIME,
- 0L, 0L, pid, epoch, sequence, true, RecordBatch.UNKNOWN_PARTITION_LEADER_EPOCH, buffer.capacity());
+ 0L, 0L, pid, epoch, sequence, true, RecordBatch.NO_PARTITION_LEADER_EPOCH, buffer.capacity());
}
@Test(expected = IllegalArgumentException.class)
@@ -138,7 +138,7 @@ public class MemoryRecordsBuilderTest {
int sequence = RecordBatch.NO_SEQUENCE;
new MemoryRecordsBuilder(buffer, RecordBatch.CURRENT_MAGIC_VALUE, compressionType, TimestampType.CREATE_TIME,
- 0L, 0L, pid, epoch, sequence, true, RecordBatch.UNKNOWN_PARTITION_LEADER_EPOCH, buffer.capacity());
+ 0L, 0L, pid, epoch, sequence, true, RecordBatch.NO_PARTITION_LEADER_EPOCH, buffer.capacity());
}
@Test
@@ -154,7 +154,7 @@ public class MemoryRecordsBuilderTest {
MemoryRecordsBuilder builder = new MemoryRecordsBuilder(buffer, RecordBatch.MAGIC_VALUE_V0, compressionType,
TimestampType.CREATE_TIME, 0L, 0L, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, RecordBatch.NO_SEQUENCE,
- false, RecordBatch.UNKNOWN_PARTITION_LEADER_EPOCH, buffer.capacity());
+ false, RecordBatch.NO_PARTITION_LEADER_EPOCH, buffer.capacity());
int uncompressedSize = 0;
for (LegacyRecord record : records) {
@@ -185,7 +185,7 @@ public class MemoryRecordsBuilderTest {
MemoryRecordsBuilder builder = new MemoryRecordsBuilder(buffer, RecordBatch.MAGIC_VALUE_V1, compressionType,
TimestampType.CREATE_TIME, 0L, 0L, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, RecordBatch.NO_SEQUENCE,
- false, RecordBatch.UNKNOWN_PARTITION_LEADER_EPOCH, buffer.capacity());
+ false, RecordBatch.NO_PARTITION_LEADER_EPOCH, buffer.capacity());
int uncompressedSize = 0;
for (LegacyRecord record : records) {
@@ -211,7 +211,7 @@ public class MemoryRecordsBuilderTest {
long logAppendTime = System.currentTimeMillis();
MemoryRecordsBuilder builder = new MemoryRecordsBuilder(buffer, RecordBatch.MAGIC_VALUE_V1, compressionType,
TimestampType.LOG_APPEND_TIME, 0L, logAppendTime, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH,
- RecordBatch.NO_SEQUENCE, false, RecordBatch.UNKNOWN_PARTITION_LEADER_EPOCH, buffer.capacity());
+ RecordBatch.NO_SEQUENCE, false, RecordBatch.NO_PARTITION_LEADER_EPOCH, buffer.capacity());
builder.append(0L, "a".getBytes(), "1".getBytes());
builder.append(0L, "b".getBytes(), "2".getBytes());
builder.append(0L, "c".getBytes(), "3".getBytes());
@@ -220,7 +220,10 @@ public class MemoryRecordsBuilderTest {
MemoryRecordsBuilder.RecordsInfo info = builder.info();
assertEquals(logAppendTime, info.maxTimestamp);
- assertEquals(2L, info.shallowOffsetOfMaxTimestamp);
+ if (compressionType != CompressionType.NONE)
+ assertEquals(2L, info.shallowOffsetOfMaxTimestamp);
+ else
+ assertEquals(0L, info.shallowOffsetOfMaxTimestamp);
for (RecordBatch batch : records.batches()) {
assertEquals(TimestampType.LOG_APPEND_TIME, batch.timestampType());
@@ -237,7 +240,7 @@ public class MemoryRecordsBuilderTest {
long logAppendTime = System.currentTimeMillis();
MemoryRecordsBuilder builder = new MemoryRecordsBuilder(buffer, RecordBatch.MAGIC_VALUE_V1, compressionType,
TimestampType.CREATE_TIME, 0L, logAppendTime, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, RecordBatch.NO_SEQUENCE,
- false, RecordBatch.UNKNOWN_PARTITION_LEADER_EPOCH, buffer.capacity());
+ false, RecordBatch.NO_PARTITION_LEADER_EPOCH, buffer.capacity());
builder.append(0L, "a".getBytes(), "1".getBytes());
builder.append(2L, "b".getBytes(), "2".getBytes());
builder.append(1L, "c".getBytes(), "3".getBytes());
@@ -270,7 +273,7 @@ public class MemoryRecordsBuilderTest {
ByteBuffer buffer = ByteBuffer.allocate(512);
MemoryRecordsBuilder builder = new MemoryRecordsBuilder(buffer, RecordBatch.CURRENT_MAGIC_VALUE, compressionType,
TimestampType.CREATE_TIME, 0L, LegacyRecord.NO_TIMESTAMP, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH,
- RecordBatch.NO_SEQUENCE, false, RecordBatch.UNKNOWN_PARTITION_LEADER_EPOCH, writeLimit);
+ RecordBatch.NO_SEQUENCE, false, RecordBatch.NO_PARTITION_LEADER_EPOCH, writeLimit);
assertFalse(builder.isFull());
assertTrue(builder.hasRoomFor(0L, key, value));
@@ -296,7 +299,7 @@ public class MemoryRecordsBuilderTest {
long logAppendTime = System.currentTimeMillis();
MemoryRecordsBuilder builder = new MemoryRecordsBuilder(buffer, RecordBatch.MAGIC_VALUE_V1, compressionType,
TimestampType.CREATE_TIME, 0L, logAppendTime, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, RecordBatch.NO_SEQUENCE,
- false, RecordBatch.UNKNOWN_PARTITION_LEADER_EPOCH, buffer.capacity());
+ false, RecordBatch.NO_PARTITION_LEADER_EPOCH, buffer.capacity());
builder.append(0L, "a".getBytes(), "1".getBytes());
builder.append(1L, "b".getBytes(), "2".getBytes());
@@ -324,7 +327,7 @@ public class MemoryRecordsBuilderTest {
long logAppendTime = System.currentTimeMillis();
MemoryRecordsBuilder builder = new MemoryRecordsBuilder(buffer, RecordBatch.MAGIC_VALUE_V1, compressionType,
TimestampType.CREATE_TIME, 0L, logAppendTime, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, RecordBatch.NO_SEQUENCE,
- false, RecordBatch.UNKNOWN_PARTITION_LEADER_EPOCH, buffer.capacity());
+ false, RecordBatch.NO_PARTITION_LEADER_EPOCH, buffer.capacity());
builder.appendWithOffset(0L, System.currentTimeMillis(), "a".getBytes(), null);
http://git-wip-us.apache.org/repos/asf/kafka/blob/5cf64f06/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java b/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java
index a2c761f..ea430b1 100644
--- a/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java
@@ -100,7 +100,7 @@ public class MemoryRecordsTest {
assertEquals(RecordBatch.NO_PRODUCER_ID, batch.producerId());
assertEquals(RecordBatch.NO_PRODUCER_EPOCH, batch.producerEpoch());
assertEquals(RecordBatch.NO_SEQUENCE, batch.baseSequence());
- assertEquals(RecordBatch.UNKNOWN_PARTITION_LEADER_EPOCH, batch.partitionLeaderEpoch());
+ assertEquals(RecordBatch.NO_PARTITION_LEADER_EPOCH, batch.partitionLeaderEpoch());
assertNull(batch.countOrNull());
if (magic == RecordBatch.MAGIC_VALUE_V0)
assertEquals(TimestampType.NO_TIMESTAMP_TYPE, batch.timestampType());
@@ -222,9 +222,9 @@ public class MemoryRecordsTest {
assertEquals(4, result.messagesRetained);
assertEquals(buffer.limit(), result.bytesRead);
assertEquals(filtered.limit(), result.bytesRetained);
- if (magic > 0) {
+ if (magic > RecordBatch.MAGIC_VALUE_V0) {
assertEquals(20L, result.maxTimestamp);
- if (compression == CompressionType.NONE)
+ if (compression == CompressionType.NONE && magic < RecordBatch.MAGIC_VALUE_V2)
assertEquals(4L, result.shallowOffsetOfMaxTimestamp);
else
assertEquals(5L, result.shallowOffsetOfMaxTimestamp);
http://git-wip-us.apache.org/repos/asf/kafka/blob/5cf64f06/core/src/main/scala/kafka/log/Log.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala
index 0e8cda8..d97dfa4 100644
--- a/core/src/main/scala/kafka/log/Log.scala
+++ b/core/src/main/scala/kafka/log/Log.scala
@@ -603,6 +603,10 @@ class Log(@volatile var dir: File,
val producerAppendInfos = mutable.Map[Long, ProducerAppendInfo]()
for (batch <- records.batches.asScala) {
+ // we only validate V2 and higher to avoid potential compatibility issues with older clients
+ if (batch.magic >= RecordBatch.MAGIC_VALUE_V2 && isFromClient && batch.baseOffset != 0)
+ throw new InvalidRecordException(s"The baseOffset of the record batch should be 0, but it is ${batch.baseOffset}")
+
// update the first offset if on the first message. For magic versions older than 2, we use the last offset
// to avoid the need to decompress the data (the last offset can be obtained directly from the wrapper message).
// For magic version 2, we can get the first offset directly from the batch header.
@@ -621,8 +625,8 @@ class Log(@volatile var dir: File,
if (batchSize > config.maxMessageSize) {
BrokerTopicStats.getBrokerTopicStats(topicPartition.topic).bytesRejectedRate.mark(records.sizeInBytes)
BrokerTopicStats.getBrokerAllTopicsStats.bytesRejectedRate.mark(records.sizeInBytes)
- throw new RecordTooLargeException(s"Message size is $batchSize bytes which exceeds the maximum configured " +
- s"message size of ${config.maxMessageSize}.")
+ throw new RecordTooLargeException(s"The record batch size is $batchSize bytes which exceeds the maximum configured " +
+ s"value of ${config.maxMessageSize}.")
}
// check the validity of the message by checking CRC
@@ -654,7 +658,8 @@ class Log(@volatile var dir: File,
firstOffset = lastEntry.firstOffset
lastOffset = lastEntry.lastOffset
maxTimestamp = lastEntry.timestamp
- info(s"Detected a duplicate at (firstOffset, lastOffset): (${firstOffset}, ${lastOffset}). Ignoring the incoming record.")
+ info(s"Detected a duplicate for partition $topicPartition at (firstOffset, lastOffset): (${firstOffset}, ${lastOffset}). " +
+ "Ignoring the incoming record.")
} else {
val producerAppendInfo = new ProducerAppendInfo(pid, lastEntry)
producerAppendInfos.put(pid, producerAppendInfo)
http://git-wip-us.apache.org/repos/asf/kafka/blob/5cf64f06/core/src/main/scala/kafka/log/LogValidator.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/LogValidator.scala b/core/src/main/scala/kafka/log/LogValidator.scala
index fa520ad..ae3d846 100644
--- a/core/src/main/scala/kafka/log/LogValidator.scala
+++ b/core/src/main/scala/kafka/log/LogValidator.scala
@@ -19,7 +19,7 @@ package kafka.log
import java.nio.ByteBuffer
import kafka.common.LongRef
-import kafka.message.{CompressionCodec, InvalidMessageException, NoCompressionCodec}
+import kafka.message.{CompressionCodec, NoCompressionCodec}
import kafka.utils.Logging
import org.apache.kafka.common.errors.InvalidTimestampException
import org.apache.kafka.common.record._
@@ -48,33 +48,46 @@ private[kafka] object LogValidator extends Logging {
now: Long,
sourceCodec: CompressionCodec,
targetCodec: CompressionCodec,
- compactedTopic: Boolean = false,
- messageFormatVersion: Byte = RecordBatch.CURRENT_MAGIC_VALUE,
- messageTimestampType: TimestampType,
- messageTimestampDiffMaxMs: Long,
- partitionLeaderEpoch: Int = RecordBatch.UNKNOWN_PARTITION_LEADER_EPOCH): ValidationAndOffsetAssignResult = {
+ compactedTopic: Boolean,
+ magic: Byte,
+ timestampType: TimestampType,
+ timestampDiffMaxMs: Long,
+ partitionLeaderEpoch: Int): ValidationAndOffsetAssignResult = {
if (sourceCodec == NoCompressionCodec && targetCodec == NoCompressionCodec) {
// check the magic value
- if (!records.hasMatchingMagic(messageFormatVersion))
- convertAndAssignOffsetsNonCompressed(records, offsetCounter, compactedTopic, now, messageTimestampType,
- messageTimestampDiffMaxMs, messageFormatVersion, partitionLeaderEpoch)
+ if (!records.hasMatchingMagic(magic))
+ convertAndAssignOffsetsNonCompressed(records, offsetCounter, compactedTopic, now, timestampType,
+ timestampDiffMaxMs, magic, partitionLeaderEpoch)
else
// Do in-place validation, offset assignment and maybe set timestamp
- assignOffsetsNonCompressed(records, offsetCounter, now, compactedTopic, messageTimestampType,
- messageTimestampDiffMaxMs, partitionLeaderEpoch)
+ assignOffsetsNonCompressed(records, offsetCounter, now, compactedTopic, timestampType, timestampDiffMaxMs,
+ partitionLeaderEpoch)
} else {
-
validateMessagesAndAssignOffsetsCompressed(records, offsetCounter, now, sourceCodec, targetCodec, compactedTopic,
- messageFormatVersion, messageTimestampType, messageTimestampDiffMaxMs, partitionLeaderEpoch)
+ magic, timestampType, timestampDiffMaxMs, partitionLeaderEpoch)
}
}
+ private def validateBatch(batch: RecordBatch): Unit = {
+ ensureNonTransactional(batch)
+ }
+
+ private def validateRecord(batch: RecordBatch, record: Record, now: Long, timestampType: TimestampType,
+ timestampDiffMaxMs: Long, compactedTopic: Boolean): Unit = {
+ if (!record.hasMagic(batch.magic))
+ throw new InvalidRecordException(s"Log record magic does not match outer magic ${batch.magic}")
+ record.ensureValid()
+ ensureNotControlRecord(record)
+ validateKey(record, compactedTopic)
+ validateTimestamp(batch, record, now, timestampType, timestampDiffMaxMs)
+ }
+
private def convertAndAssignOffsetsNonCompressed(records: MemoryRecords,
offsetCounter: LongRef,
compactedTopic: Boolean,
now: Long,
timestampType: TimestampType,
- messageTimestampDiffMaxMs: Long,
+ timestampDiffMaxMs: Long,
toMagicValue: Byte,
partitionLeaderEpoch: Int): ValidationAndOffsetAssignResult = {
val sizeInBytesAfterConversion = AbstractRecords.estimateSizeInBytes(toMagicValue, offsetCounter.value,
@@ -90,12 +103,10 @@ private[kafka] object LogValidator extends Logging {
offsetCounter.value, now, pid, epoch, sequence, partitionLeaderEpoch)
for (batch <- records.batches.asScala) {
- ensureNonTransactional(batch)
+ validateBatch(batch)
for (record <- batch.asScala) {
- ensureNotControlRecord(record)
- validateKey(record, compactedTopic)
- validateTimestamp(batch, record, now, timestampType, messageTimestampDiffMaxMs)
+ validateRecord(batch, record, now, timestampType, timestampDiffMaxMs, compactedTopic)
builder.appendWithOffset(offsetCounter.getAndIncrement(), record)
}
}
@@ -111,7 +122,7 @@ private[kafka] object LogValidator extends Logging {
private def assignOffsetsNonCompressed(records: MemoryRecords,
offsetCounter: LongRef,
- currentTimestamp: Long,
+ now: Long,
compactedTopic: Boolean,
timestampType: TimestampType,
timestampDiffMaxMs: Long,
@@ -119,40 +130,50 @@ private[kafka] object LogValidator extends Logging {
var maxTimestamp = RecordBatch.NO_TIMESTAMP
var offsetOfMaxTimestamp = -1L
val initialOffset = offsetCounter.value
+ var isMagicV2 = false
for (batch <- records.batches.asScala) {
- ensureNonTransactional(batch)
+ validateBatch(batch)
+
+ var maxBatchTimestamp = RecordBatch.NO_TIMESTAMP
+ var offsetOfMaxBatchTimestamp = -1L
for (record <- batch.asScala) {
- record.ensureValid()
- ensureNotControlRecord(record)
- validateKey(record, compactedTopic)
+ validateRecord(batch, record, now, timestampType, timestampDiffMaxMs, compactedTopic)
val offset = offsetCounter.getAndIncrement()
- if (batch.magic > RecordBatch.MAGIC_VALUE_V0) {
- validateTimestamp(batch, record, currentTimestamp, timestampType, timestampDiffMaxMs)
-
- if (record.timestamp > maxTimestamp) {
- maxTimestamp = record.timestamp
- offsetOfMaxTimestamp = offset
- }
+ if (batch.magic > RecordBatch.MAGIC_VALUE_V0 && record.timestamp > maxBatchTimestamp) {
+ maxBatchTimestamp = record.timestamp
+ offsetOfMaxBatchTimestamp = offset
}
}
+ if (batch.magic > RecordBatch.MAGIC_VALUE_V0 && maxBatchTimestamp > maxTimestamp) {
+ maxTimestamp = maxBatchTimestamp
+ offsetOfMaxTimestamp = offsetOfMaxBatchTimestamp
+ }
+
batch.setLastOffset(offsetCounter.value - 1)
- if(batch.magic >= RecordBatch.MAGIC_VALUE_V2)
+ if (batch.magic >= RecordBatch.MAGIC_VALUE_V2)
batch.setPartitionLeaderEpoch(partitionLeaderEpoch)
- // TODO: in the compressed path, we ensure that the batch max timestamp is correct.
- // We should either do the same or (better) let those two paths converge.
- if (batch.magic > RecordBatch.MAGIC_VALUE_V0 && timestampType == TimestampType.LOG_APPEND_TIME)
- batch.setMaxTimestamp(TimestampType.LOG_APPEND_TIME, currentTimestamp)
+ if (batch.magic > RecordBatch.MAGIC_VALUE_V0) {
+ if (timestampType == TimestampType.LOG_APPEND_TIME)
+ batch.setMaxTimestamp(TimestampType.LOG_APPEND_TIME, now)
+ else
+ batch.setMaxTimestamp(timestampType, maxBatchTimestamp)
+ }
+
+ isMagicV2 = batch.magic >= RecordBatch.MAGIC_VALUE_V2
}
if (timestampType == TimestampType.LOG_APPEND_TIME) {
- maxTimestamp = currentTimestamp
- offsetOfMaxTimestamp = initialOffset
+ maxTimestamp = now
+ if (isMagicV2)
+ offsetOfMaxTimestamp = offsetCounter.value - 1
+ else
+ offsetOfMaxTimestamp = initialOffset
}
ValidationAndOffsetAssignResult(
@@ -171,36 +192,32 @@ private[kafka] object LogValidator extends Logging {
*/
def validateMessagesAndAssignOffsetsCompressed(records: MemoryRecords,
offsetCounter: LongRef,
- currentTimestamp: Long,
+ now: Long,
sourceCodec: CompressionCodec,
targetCodec: CompressionCodec,
- compactedTopic: Boolean = false,
- messageFormatVersion: Byte = RecordBatch.CURRENT_MAGIC_VALUE,
- messageTimestampType: TimestampType,
- messageTimestampDiffMaxMs: Long,
+ compactedTopic: Boolean,
+ magic: Byte,
+ timestampType: TimestampType,
+ timestampDiffMaxMs: Long,
partitionLeaderEpoch: Int): ValidationAndOffsetAssignResult = {
// No in place assignment situation 1 and 2
- var inPlaceAssignment = sourceCodec == targetCodec && messageFormatVersion > RecordBatch.MAGIC_VALUE_V0
+ var inPlaceAssignment = sourceCodec == targetCodec && magic > RecordBatch.MAGIC_VALUE_V0
var maxTimestamp = RecordBatch.NO_TIMESTAMP
val expectedInnerOffset = new LongRef(0)
val validatedRecords = new mutable.ArrayBuffer[Record]
for (batch <- records.batches.asScala) {
- ensureNonTransactional(batch)
+ validateBatch(batch)
for (record <- batch.asScala) {
- if (!record.hasMagic(batch.magic))
- throw new InvalidRecordException(s"Log record magic does not match outer magic ${batch.magic}")
-
- record.ensureValid()
- ensureNotControlRecord(record)
- validateKey(record, compactedTopic)
+ validateRecord(batch, record, now, timestampType, timestampDiffMaxMs, compactedTopic)
+ if (sourceCodec != NoCompressionCodec && record.isCompressed)
+ throw new InvalidRecordException("Compressed outer record should not have an inner record with a " +
+ s"compression attribute set: $record")
- if (!record.hasMagic(RecordBatch.MAGIC_VALUE_V0) && messageFormatVersion > RecordBatch.MAGIC_VALUE_V0) {
- // Validate the timestamp
- validateTimestamp(batch, record, currentTimestamp, messageTimestampType, messageTimestampDiffMaxMs)
+ if (batch.magic > RecordBatch.MAGIC_VALUE_V0 && magic > RecordBatch.MAGIC_VALUE_V0) {
// Check if we need to overwrite offset
// No in place assignment situation 3
if (record.offset != expectedInnerOffset.getAndIncrement())
@@ -209,12 +226,8 @@ private[kafka] object LogValidator extends Logging {
maxTimestamp = record.timestamp
}
- if (sourceCodec != NoCompressionCodec && record.isCompressed)
- throw new InvalidMessageException("Compressed outer record should not have an inner record with a " +
- s"compression attribute set: $record")
-
// No in place assignment situation 4
- if (!record.hasMagic(messageFormatVersion))
+ if (!record.hasMagic(magic))
inPlaceAssignment = false
validatedRecords += record
@@ -223,14 +236,14 @@ private[kafka] object LogValidator extends Logging {
if (!inPlaceAssignment) {
val (pid, epoch, sequence) = {
- // note that we only reassign offsets for requests coming straight from a producer. For records with MagicV2,
+ // note that we only reassign offsets for requests coming straight from a producer. For records with magic V2,
// there should be exactly one RecordBatch per request, so the following is all we need to do. For Records
- // with older magic versions, this will always be NO_PRODUCER_ID, etc.
+ // with older magic versions, there will never be a producer id, etc.
val first = records.batches.asScala.head
(first.producerId, first.producerEpoch, first.baseSequence)
}
- buildRecordsAndAssignOffsets(messageFormatVersion, offsetCounter, messageTimestampType,
- CompressionType.forId(targetCodec.codec), currentTimestamp, validatedRecords, pid, epoch, sequence, partitionLeaderEpoch)
+ buildRecordsAndAssignOffsets(magic, offsetCounter, timestampType, CompressionType.forId(targetCodec.codec), now,
+ validatedRecords, pid, epoch, sequence, partitionLeaderEpoch)
} else {
// we can update the batch only and write the compressed payload as is
val batch = records.batches.iterator.next()
@@ -238,13 +251,13 @@ private[kafka] object LogValidator extends Logging {
batch.setLastOffset(lastOffset)
- if (messageTimestampType == TimestampType.LOG_APPEND_TIME)
- maxTimestamp = currentTimestamp
+ if (timestampType == TimestampType.LOG_APPEND_TIME)
+ maxTimestamp = now
- if (messageFormatVersion >= RecordBatch.MAGIC_VALUE_V1)
- batch.setMaxTimestamp(messageTimestampType, maxTimestamp)
+ if (magic >= RecordBatch.MAGIC_VALUE_V1)
+ batch.setMaxTimestamp(timestampType, maxTimestamp)
- if(messageFormatVersion >= RecordBatch.MAGIC_VALUE_V2)
+ if (magic >= RecordBatch.MAGIC_VALUE_V2)
batch.setPartitionLeaderEpoch(partitionLeaderEpoch)
ValidationAndOffsetAssignResult(validatedRecords = records,
@@ -290,7 +303,7 @@ private[kafka] object LogValidator extends Logging {
private def validateKey(record: Record, compactedTopic: Boolean) {
if (compactedTopic && !record.hasKey)
- throw new InvalidMessageException("Compacted topic cannot accept message without key.")
+ throw new InvalidRecordException("Compacted topic cannot accept message without key.")
}
/**
@@ -305,8 +318,8 @@ private[kafka] object LogValidator extends Logging {
if (timestampType == TimestampType.CREATE_TIME
&& record.timestamp != RecordBatch.NO_TIMESTAMP
&& math.abs(record.timestamp - now) > timestampDiffMaxMs)
- throw new InvalidTimestampException(s"Timestamp ${record.timestamp} of message is out of range. " +
- s"The timestamp should be within [${now - timestampDiffMaxMs}, ${now + timestampDiffMaxMs}]")
+ throw new InvalidTimestampException(s"Timestamp ${record.timestamp} of message with offset ${record.offset} is " +
+ s"out of range. The timestamp should be within [${now - timestampDiffMaxMs}, ${now + timestampDiffMaxMs}]")
if (batch.timestampType == TimestampType.LOG_APPEND_TIME)
throw new InvalidTimestampException(s"Invalid timestamp type in message $record. Producer should not set " +
s"timestamp type to LogAppendTime.")
http://git-wip-us.apache.org/repos/asf/kafka/blob/5cf64f06/core/src/main/scala/kafka/log/ProducerIdMapping.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/ProducerIdMapping.scala b/core/src/main/scala/kafka/log/ProducerIdMapping.scala
index a870b7d..054b2f6 100644
--- a/core/src/main/scala/kafka/log/ProducerIdMapping.scala
+++ b/core/src/main/scala/kafka/log/ProducerIdMapping.scala
@@ -47,7 +47,7 @@ private[log] case class ProducerIdEntry(epoch: Short, lastSeq: Int, lastOffset:
}
private[log] class ProducerAppendInfo(val pid: Long, initialEntry: ProducerIdEntry) {
- // the initialEntry here is the last successfull appended batch. we validate incoming entries transitively, starting
+ // the initialEntry here is the last successful appended batch. we validate incoming entries transitively, starting
// with the last appended entry.
private var epoch = initialEntry.epoch
private var firstSeq = initialEntry.firstSeq
http://git-wip-us.apache.org/repos/asf/kafka/blob/5cf64f06/core/src/main/scala/kafka/server/epoch/LeaderEpochFileCache.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/epoch/LeaderEpochFileCache.scala b/core/src/main/scala/kafka/server/epoch/LeaderEpochFileCache.scala
index 4a4727e..fe07a74 100644
--- a/core/src/main/scala/kafka/server/epoch/LeaderEpochFileCache.scala
+++ b/core/src/main/scala/kafka/server/epoch/LeaderEpochFileCache.scala
@@ -24,7 +24,6 @@ import org.apache.kafka.common.requests.EpochEndOffset.{UNDEFINED_EPOCH, UNDEFIN
import kafka.utils.CoreUtils._
import kafka.utils.Logging
import org.apache.kafka.common.TopicPartition
-import org.apache.kafka.common.requests.EpochEndOffset
import scala.collection.mutable.ListBuffer
@@ -50,7 +49,7 @@ trait LeaderEpochCache {
*/
class LeaderEpochFileCache(topicPartition: TopicPartition, leo: () => LogOffsetMetadata, checkpoint: LeaderEpochCheckpoint) extends LeaderEpochCache with Logging {
private val lock = new ReentrantReadWriteLock()
- private var epochs: ListBuffer[EpochEntry] = lock synchronized { ListBuffer(checkpoint.read(): _*) }
+ private var epochs: ListBuffer[EpochEntry] = lock.synchronized { ListBuffer(checkpoint.read(): _*) }
private var cachedLatestEpoch: Option[Int] = None //epoch which has yet to be assigned to a message.
/**
@@ -221,4 +220,4 @@ class LeaderEpochFileCache(topicPartition: TopicPartition, leo: () => LogOffsetM
}
// Mapping of epoch to the first offset of the subsequent epoch
-case class EpochEntry(epoch: Int, startOffset: Long)
\ No newline at end of file
+case class EpochEntry(epoch: Int, startOffset: Long)
http://git-wip-us.apache.org/repos/asf/kafka/blob/5cf64f06/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
index 928b03d..3261626 100755
--- a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
@@ -276,9 +276,9 @@ class LogCleanerTest extends JUnitSuite {
val log = makeLog(config = LogConfig.fromProps(logConfig.originals, logProps))
// create 6 segments with only one message in each segment
- val messageSet = TestUtils.singletonRecords(value = Array.fill[Byte](25)(0), key = 1.toString.getBytes)
+ def createRecorcs = TestUtils.singletonRecords(value = Array.fill[Byte](25)(0), key = 1.toString.getBytes)
for (_ <- 0 until 6)
- log.append(messageSet, assignOffsets = true)
+ log.append(createRecorcs, assignOffsets = true)
val logToClean = LogToClean(new TopicPartition("test", 0), log, log.activeSegment.baseOffset, log.activeSegment.baseOffset)
@@ -294,9 +294,9 @@ class LogCleanerTest extends JUnitSuite {
val log = makeLog(config = LogConfig.fromProps(logConfig.originals, logProps))
// create 6 segments with only one message in each segment
- val messageSet = TestUtils.singletonRecords(value = Array.fill[Byte](25)(0), key = 1.toString.getBytes)
+ def createRecords = TestUtils.singletonRecords(value = Array.fill[Byte](25)(0), key = 1.toString.getBytes)
for (_ <- 0 until 6)
- log.append(messageSet, assignOffsets = true)
+ log.append(createRecords, assignOffsets = true)
// segments [0,1] are clean; segments [2, 3] are cleanable; segments [4,5] are uncleanable
val segs = log.logSegments.toSeq
@@ -819,18 +819,19 @@ class LogCleanerTest extends JUnitSuite {
def record(key: Int, value: Int, pid: Long = RecordBatch.NO_PRODUCER_ID, epoch: Short = RecordBatch.NO_PRODUCER_EPOCH,
- sequence: Int = RecordBatch.NO_SEQUENCE): MemoryRecords = {
- MemoryRecords.withRecords(0L, CompressionType.NONE, pid, epoch, sequence,
- new SimpleRecord(key.toString.getBytes, value.toString.getBytes))
+ sequence: Int = RecordBatch.NO_SEQUENCE,
+ partitionLeaderEpoch: Int = RecordBatch.NO_PARTITION_LEADER_EPOCH): MemoryRecords = {
+ MemoryRecords.withRecords(RecordBatch.CURRENT_MAGIC_VALUE, 0L, CompressionType.NONE, pid, epoch, sequence,
+ partitionLeaderEpoch, new SimpleRecord(key.toString.getBytes, value.toString.getBytes))
}
- def record(key: Int, value: Array[Byte]) =
+ def record(key: Int, value: Array[Byte]): MemoryRecords =
TestUtils.singletonRecords(key = key.toString.getBytes, value = value)
- def unkeyedRecord(value: Int) =
+ def unkeyedRecord(value: Int): MemoryRecords =
TestUtils.singletonRecords(value = value.toString.getBytes)
- def tombstoneRecord(key: Int) = record(key, null)
+ def tombstoneRecord(key: Int): MemoryRecords = record(key, null)
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/5cf64f06/core/src/test/scala/unit/kafka/log/LogTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/LogTest.scala b/core/src/test/scala/unit/kafka/log/LogTest.scala
index 4fcf1c3..b61f261 100755
--- a/core/src/test/scala/unit/kafka/log/LogTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogTest.scala
@@ -24,7 +24,6 @@ import java.util.Properties
import org.apache.kafka.common.errors._
import kafka.api.ApiVersion
import org.junit.Assert._
-import org.scalatest.junit.JUnitSuite
import org.junit.{After, Before, Test}
import kafka.utils._
import kafka.server.KafkaConfig
@@ -37,7 +36,7 @@ import org.easymock.EasyMock._
import scala.collection.JavaConverters._
import scala.collection.mutable.ListBuffer
-class LogTest extends JUnitSuite {
+class LogTest {
val tmpDir = TestUtils.tempDir()
val logDir = TestUtils.randomPartitionLogDir(tmpDir)
@@ -69,7 +68,7 @@ class LogTest extends JUnitSuite {
*/
@Test
def testTimeBasedLogRoll() {
- val set = TestUtils.singletonRecords("test".getBytes)
+ def createRecords = TestUtils.singletonRecords("test".getBytes)
val logProps = new Properties()
logProps.put(LogConfig.SegmentMsProp, 1 * 60 * 60L: java.lang.Long)
@@ -86,36 +85,36 @@ class LogTest extends JUnitSuite {
assertEquals("Log begins with a single empty segment.", 1, log.numberOfSegments)
// Test the segment rolling behavior when messages do not have a timestamp.
time.sleep(log.config.segmentMs + 1)
- log.append(set)
+ log.append(createRecords)
assertEquals("Log doesn't roll if doing so creates an empty segment.", 1, log.numberOfSegments)
- log.append(set)
+ log.append(createRecords)
assertEquals("Log rolls on this append since time has expired.", 2, log.numberOfSegments)
- for(numSegments <- 3 until 5) {
+ for (numSegments <- 3 until 5) {
time.sleep(log.config.segmentMs + 1)
- log.append(set)
+ log.append(createRecords)
assertEquals("Changing time beyond rollMs and appending should create a new segment.", numSegments, log.numberOfSegments)
}
- // Append a message with timestamp to a segment whose first messgae do not have a timestamp.
- val setWithTimestamp =
- TestUtils.singletonRecords(value = "test".getBytes, timestamp = time.milliseconds + log.config.segmentMs + 1)
- log.append(setWithTimestamp)
+ // Append a message with timestamp to a segment whose first message do not have a timestamp.
+ val timestamp = time.milliseconds + log.config.segmentMs + 1
+ def createRecordsWithTimestamp = TestUtils.singletonRecords(value = "test".getBytes, timestamp = timestamp)
+ log.append(createRecordsWithTimestamp)
assertEquals("Segment should not have been rolled out because the log rolling should be based on wall clock.", 4, log.numberOfSegments)
// Test the segment rolling behavior when messages have timestamps.
time.sleep(log.config.segmentMs + 1)
- log.append(setWithTimestamp)
+ log.append(createRecordsWithTimestamp)
assertEquals("A new segment should have been rolled out", 5, log.numberOfSegments)
// move the wall clock beyond log rolling time
time.sleep(log.config.segmentMs + 1)
- log.append(setWithTimestamp)
+ log.append(createRecordsWithTimestamp)
assertEquals("Log should not roll because the roll should depend on timestamp of the first message.", 5, log.numberOfSegments)
- val setWithExpiredTimestamp = TestUtils.singletonRecords(value = "test".getBytes, timestamp = time.milliseconds)
- log.append(setWithExpiredTimestamp)
+ val recordWithExpiredTimestamp = TestUtils.singletonRecords(value = "test".getBytes, timestamp = time.milliseconds)
+ log.append(recordWithExpiredTimestamp)
assertEquals("Log should roll because the timestamp in the message should make the log segment expire.", 6, log.numberOfSegments)
val numSegments = log.numberOfSegments
@@ -161,66 +160,67 @@ class LogTest extends JUnitSuite {
var seq = 0
// Pad the beginning of the log.
- for (i <- 0 to 5) {
+ for (_ <- 0 to 5) {
val record = TestUtils.records(List(new SimpleRecord(time.milliseconds, "key".getBytes, "value".getBytes)),
pid = pid, epoch = epoch, sequence = seq)
log.append(record, assignOffsets = true)
seq = seq + 1
}
// Append an entry with multiple log records.
- var record = TestUtils.records(List(
+ def createRecords = TestUtils.records(List(
new SimpleRecord(time.milliseconds, s"key-$seq".getBytes, s"value-$seq".getBytes),
new SimpleRecord(time.milliseconds, s"key-$seq".getBytes, s"value-$seq".getBytes),
new SimpleRecord(time.milliseconds, s"key-$seq".getBytes, s"value-$seq".getBytes)
), pid = pid, epoch = epoch, sequence = seq)
- val multiEntryAppendInfo = log.append(record, assignOffsets = true)
+ val multiEntryAppendInfo = log.append(createRecords, assignOffsets = true)
assertEquals("should have appended 3 entries", multiEntryAppendInfo.lastOffset - multiEntryAppendInfo.firstOffset + 1, 3)
- seq = seq + 3
// Append a Duplicate of the tail, when the entry at the tail has multiple records.
- val dupMultiEntryAppendInfo = log.append(record, assignOffsets = true)
+ val dupMultiEntryAppendInfo = log.append(createRecords, assignOffsets = true)
assertEquals("Somehow appended a duplicate entry with multiple log records to the tail",
multiEntryAppendInfo.firstOffset, dupMultiEntryAppendInfo.firstOffset)
assertEquals("Somehow appended a duplicate entry with multiple log records to the tail",
multiEntryAppendInfo.lastOffset, dupMultiEntryAppendInfo.lastOffset)
+ seq = seq + 3
+
// Append a partial duplicate of the tail. This is not allowed.
try {
- record = TestUtils.records(
+ val records = TestUtils.records(
List(
new SimpleRecord(time.milliseconds, s"key-$seq".getBytes, s"value-$seq".getBytes),
new SimpleRecord(time.milliseconds, s"key-$seq".getBytes, s"value-$seq".getBytes)),
pid = pid, epoch = epoch, sequence = seq - 2)
- log.append(record, assignOffsets = true)
- fail ("Should have received an OutOfOrderSequenceException since we attempted to append a duplicate of a record " +
+ log.append(records, assignOffsets = true)
+ fail ("Should have received an OutOfOrderSequenceException since we attempted to append a duplicate of a records " +
"in the middle of the log.")
} catch {
- case e: OutOfOrderSequenceException => // Good!
+ case _: OutOfOrderSequenceException => // Good!
}
// Append a Duplicate of an entry in the middle of the log. This is not allowed.
try {
- record = TestUtils.records(
+ val records = TestUtils.records(
List(new SimpleRecord(time.milliseconds, s"key-1".getBytes, s"value-1".getBytes)),
pid = pid, epoch = epoch, sequence = 1)
- log.append(record, assignOffsets = true)
- fail ("Should have received an OutOfOrderSequenceException since we attempted to append a duplicate of a record " +
+ log.append(records, assignOffsets = true)
+ fail ("Should have received an OutOfOrderSequenceException since we attempted to append a duplicate of a records " +
"in the middle of the log.")
} catch {
- case e: OutOfOrderSequenceException => // Good!
+ case _: OutOfOrderSequenceException => // Good!
}
- // Append a duplicate entry with a single record at the tail of the log. This should return the appendInfo of the original entry.
- record = TestUtils.records(List(new SimpleRecord(time.milliseconds, "key".getBytes, "value".getBytes)),
+ // Append a duplicate entry with a single records at the tail of the log. This should return the appendInfo of the original entry.
+ def createRecordsWithDuplicate = TestUtils.records(List(new SimpleRecord(time.milliseconds, "key".getBytes, "value".getBytes)),
pid = pid, epoch = epoch, sequence = seq)
- val origAppendInfo = log.append(record, assignOffsets = true)
- val newAppendInfo = log.append(record, assignOffsets = true)
- assertEquals("Inserted a duplicate record into the log", origAppendInfo.firstOffset, newAppendInfo.firstOffset)
- assertEquals("Inserted a duplicate record into the log", origAppendInfo.lastOffset, newAppendInfo.lastOffset)
+ val origAppendInfo = log.append(createRecordsWithDuplicate, assignOffsets = true)
+ val newAppendInfo = log.append(createRecordsWithDuplicate, assignOffsets = true)
+ assertEquals("Inserted a duplicate records into the log", origAppendInfo.firstOffset, newAppendInfo.firstOffset)
+ assertEquals("Inserted a duplicate records into the log", origAppendInfo.lastOffset, newAppendInfo.lastOffset)
}
@Test
- def testMulitplePidsPerMemoryRecord() : Unit = {
+ def testMultiplePidsPerMemoryRecord() : Unit = {
val logProps = new Properties()
// create a log
@@ -374,8 +374,8 @@ class LogTest extends JUnitSuite {
*/
@Test
def testSizeBasedLogRoll() {
- val set = TestUtils.singletonRecords(value = "test".getBytes, timestamp = time.milliseconds)
- val setSize = set.sizeInBytes
+ def createRecords = TestUtils.singletonRecords(value = "test".getBytes, timestamp = time.milliseconds)
+ val setSize = createRecords.sizeInBytes
val msgPerSeg = 10
val segmentSize = msgPerSeg * (setSize - 1) // each segment will be 10 messages
@@ -388,9 +388,8 @@ class LogTest extends JUnitSuite {
assertEquals("There should be exactly 1 segment.", 1, log.numberOfSegments)
// segments expire in size
- for (_ <- 1 to (msgPerSeg + 1)) {
- log.append(set)
- }
+ for (_ <- 1 to (msgPerSeg + 1))
+ log.append(createRecords)
assertEquals("There should be exactly 2 segments.", 2, log.numberOfSegments)
}
@@ -949,8 +948,8 @@ class LogTest extends JUnitSuite {
*/
@Test
def testTruncateTo() {
- val set = TestUtils.singletonRecords(value = "test".getBytes, timestamp = time.milliseconds)
- val setSize = set.sizeInBytes
+ def createRecords = TestUtils.singletonRecords(value = "test".getBytes, timestamp = time.milliseconds)
+ val setSize = createRecords.sizeInBytes
val msgPerSeg = 10
val segmentSize = msgPerSeg * setSize // each segment will be 10 messages
@@ -962,7 +961,7 @@ class LogTest extends JUnitSuite {
assertEquals("There should be exactly 1 segment.", 1, log.numberOfSegments)
for (_ <- 1 to msgPerSeg)
- log.append(set)
+ log.append(createRecords)
assertEquals("There should be exactly 1 segments.", 1, log.numberOfSegments)
assertEquals("Log end offset should be equal to number of messages", msgPerSeg, log.logEndOffset)
@@ -983,7 +982,7 @@ class LogTest extends JUnitSuite {
assertEquals("Should change log size", 0, log.size)
for (_ <- 1 to msgPerSeg)
- log.append(set)
+ log.append(createRecords)
assertEquals("Should be back to original offset", log.logEndOffset, lastOffset)
assertEquals("Should be back to original size", log.size, size)
@@ -992,7 +991,7 @@ class LogTest extends JUnitSuite {
assertEquals("Should change log size", log.size, 0)
for (_ <- 1 to msgPerSeg)
- log.append(set)
+ log.append(createRecords)
assertTrue("Should be ahead of to original offset", log.logEndOffset > msgPerSeg)
assertEquals("log size should be same as before", size, log.size)
@@ -1050,9 +1049,9 @@ class LogTest extends JUnitSuite {
val bogusIndex2 = Log.indexFilename(logDir, 5)
val bogusTimeIndex2 = Log.timeIndexFilename(logDir, 5)
- val set = TestUtils.singletonRecords(value = "test".getBytes, timestamp = time.milliseconds)
+ def createRecords = TestUtils.singletonRecords(value = "test".getBytes, timestamp = time.milliseconds)
val logProps = new Properties()
- logProps.put(LogConfig.SegmentBytesProp, set.sizeInBytes * 5: java.lang.Integer)
+ logProps.put(LogConfig.SegmentBytesProp, createRecords.sizeInBytes * 5: java.lang.Integer)
logProps.put(LogConfig.SegmentIndexBytesProp, 1000: java.lang.Integer)
logProps.put(LogConfig.IndexIntervalBytesProp, 1: java.lang.Integer)
val log = new Log(logDir,
@@ -1069,7 +1068,7 @@ class LogTest extends JUnitSuite {
// check that we can append to the log
for (_ <- 0 until 10)
- log.append(set)
+ log.append(createRecords)
log.delete()
}
@@ -1079,9 +1078,9 @@ class LogTest extends JUnitSuite {
*/
@Test
def testReopenThenTruncate() {
- val set = TestUtils.singletonRecords(value = "test".getBytes, timestamp = time.milliseconds)
+ def createRecords = TestUtils.singletonRecords(value = "test".getBytes, timestamp = time.milliseconds)
val logProps = new Properties()
- logProps.put(LogConfig.SegmentBytesProp, set.sizeInBytes * 5: java.lang.Integer)
+ logProps.put(LogConfig.SegmentBytesProp, createRecords.sizeInBytes * 5: java.lang.Integer)
logProps.put(LogConfig.SegmentIndexBytesProp, 1000: java.lang.Integer)
logProps.put(LogConfig.IndexIntervalBytesProp, 10000: java.lang.Integer)
val config = LogConfig(logProps)
@@ -1096,7 +1095,7 @@ class LogTest extends JUnitSuite {
// add enough messages to roll over several segments then close and re-open and attempt to truncate
for (_ <- 0 until 100)
- log.append(set)
+ log.append(createRecords)
log.close()
log = new Log(logDir,
config,
@@ -1114,10 +1113,10 @@ class LogTest extends JUnitSuite {
*/
@Test
def testAsyncDelete() {
- val set = TestUtils.singletonRecords(value = "test".getBytes, timestamp = time.milliseconds - 1000L)
+ def createRecords = TestUtils.singletonRecords(value = "test".getBytes, timestamp = time.milliseconds - 1000L)
val asyncDeleteMs = 1000
val logProps = new Properties()
- logProps.put(LogConfig.SegmentBytesProp, set.sizeInBytes * 5: java.lang.Integer)
+ logProps.put(LogConfig.SegmentBytesProp, createRecords.sizeInBytes * 5: java.lang.Integer)
logProps.put(LogConfig.SegmentIndexBytesProp, 1000: java.lang.Integer)
logProps.put(LogConfig.IndexIntervalBytesProp, 10000: java.lang.Integer)
logProps.put(LogConfig.FileDeleteDelayMsProp, asyncDeleteMs: java.lang.Integer)
@@ -1133,7 +1132,7 @@ class LogTest extends JUnitSuite {
// append some messages to create some segments
for (_ <- 0 until 100)
- log.append(set)
+ log.append(createRecords)
// files should be renamed
val segments = log.logSegments.toArray
@@ -1159,9 +1158,9 @@ class LogTest extends JUnitSuite {
*/
@Test
def testOpenDeletesObsoleteFiles() {
- val set = TestUtils.singletonRecords(value = "test".getBytes, timestamp = time.milliseconds - 1000)
+ def createRecords = TestUtils.singletonRecords(value = "test".getBytes, timestamp = time.milliseconds - 1000)
val logProps = new Properties()
- logProps.put(LogConfig.SegmentBytesProp, set.sizeInBytes * 5: java.lang.Integer)
+ logProps.put(LogConfig.SegmentBytesProp, createRecords.sizeInBytes * 5: java.lang.Integer)
logProps.put(LogConfig.SegmentIndexBytesProp, 1000: java.lang.Integer)
logProps.put(LogConfig.RetentionMsProp, 999: java.lang.Integer)
val config = LogConfig(logProps)
@@ -1174,7 +1173,7 @@ class LogTest extends JUnitSuite {
// append some messages to create some segments
for (_ <- 0 until 100)
- log.append(set)
+ log.append(createRecords)
// expire all segments
log.deleteOldSegments()
@@ -1237,7 +1236,7 @@ class LogTest extends JUnitSuite {
logProps.put(LogConfig.IndexIntervalBytesProp, 1: java.lang.Integer)
logProps.put(LogConfig.MaxMessageBytesProp, 64*1024: java.lang.Integer)
val config = LogConfig(logProps)
- val set = TestUtils.singletonRecords(value = "test".getBytes, timestamp = time.milliseconds)
+ def createRecords = TestUtils.singletonRecords(value = "test".getBytes, timestamp = time.milliseconds)
val recoveryPoint = 50L
for (_ <- 0 until 10) {
// create a log and write some messages to it
@@ -1250,7 +1249,7 @@ class LogTest extends JUnitSuite {
time = time)
val numMessages = 50 + TestUtils.random.nextInt(50)
for (_ <- 0 until numMessages)
- log.append(set)
+ log.append(createRecords)
val records = log.logSegments.flatMap(_.log.records.asScala.toList).toList
log.close()
@@ -1326,7 +1325,7 @@ class LogTest extends JUnitSuite {
logProps.put(LogConfig.MaxMessageBytesProp, 64*1024: java.lang.Integer)
logProps.put(LogConfig.IndexIntervalBytesProp, 1: java.lang.Integer)
val config = LogConfig(logProps)
- val set = TestUtils.singletonRecords(value = "test".getBytes, timestamp = time.milliseconds)
+ def createRecords = TestUtils.singletonRecords(value = "test".getBytes, timestamp = time.milliseconds)
val parentLogDir = logDir.getParentFile
assertTrue("Data directory %s must exist", parentLogDir.isDirectory)
val cleanShutdownFile = new File(parentLogDir, Log.CleanShutdownFile)
@@ -1341,7 +1340,7 @@ class LogTest extends JUnitSuite {
scheduler = time.scheduler,
time = time)
for (_ <- 0 until 100)
- log.append(set)
+ log.append(createRecords)
log.close()
// check if recovery was attempted. Even if the recovery point is 0L, recovery should not be attempted as the
@@ -1428,9 +1427,9 @@ class LogTest extends JUnitSuite {
@Test
def testDeleteOldSegmentsMethod() {
- val set = TestUtils.singletonRecords(value = "test".getBytes, timestamp = time.milliseconds - 1000)
+ def createRecords = TestUtils.singletonRecords(value = "test".getBytes, timestamp = time.milliseconds - 1000)
val logProps = new Properties()
- logProps.put(LogConfig.SegmentBytesProp, set.sizeInBytes * 5: java.lang.Integer)
+ logProps.put(LogConfig.SegmentBytesProp, createRecords.sizeInBytes * 5: java.lang.Integer)
logProps.put(LogConfig.SegmentIndexBytesProp, 1000: java.lang.Integer)
logProps.put(LogConfig.RetentionMsProp, 999: java.lang.Integer)
val config = LogConfig(logProps)
@@ -1443,7 +1442,7 @@ class LogTest extends JUnitSuite {
// append some messages to create some segments
for (_ <- 0 until 100)
- log.append(set)
+ log.append(createRecords)
log.leaderEpochCache.assign(0, 40)
log.leaderEpochCache.assign(1, 90)
@@ -1456,7 +1455,7 @@ class LogTest extends JUnitSuite {
// append some messages to create some segments
for (_ <- 0 until 100)
- log.append(set)
+ log.append(createRecords)
log.delete()
assertEquals("The number of segments should be 0", 0, log.numberOfSegments)
@@ -1467,11 +1466,11 @@ class LogTest extends JUnitSuite {
@Test
def testLogDeletionAfterDeleteRecords() {
- val set = TestUtils.singletonRecords("test".getBytes)
- val log = createLog(set.sizeInBytes)
+ def createRecords = TestUtils.singletonRecords("test".getBytes)
+ val log = createLog(createRecords.sizeInBytes)
for (_ <- 0 until 15)
- log.append(set)
+ log.append(createRecords)
assertEquals("should have 3 segments", 3, log.numberOfSegments)
assertEquals(log.logStartOffset, 0)
@@ -1497,12 +1496,12 @@ class LogTest extends JUnitSuite {
@Test
def shouldDeleteSizeBasedSegments() {
- val set = TestUtils.singletonRecords("test".getBytes)
- val log = createLog(set.sizeInBytes, retentionBytes = set.sizeInBytes * 10)
+ def createRecords = TestUtils.singletonRecords("test".getBytes)
+ val log = createLog(createRecords.sizeInBytes, retentionBytes = createRecords.sizeInBytes * 10)
// append some messages to create some segments
for (_ <- 0 until 15)
- log.append(set)
+ log.append(createRecords)
log.deleteOldSegments
assertEquals("should have 2 segments", 2,log.numberOfSegments)
@@ -1510,12 +1509,12 @@ class LogTest extends JUnitSuite {
@Test
def shouldNotDeleteSizeBasedSegmentsWhenUnderRetentionSize() {
- val set = TestUtils.singletonRecords("test".getBytes)
- val log = createLog(set.sizeInBytes, retentionBytes = set.sizeInBytes * 15)
+ def createRecords = TestUtils.singletonRecords("test".getBytes)
+ val log = createLog(createRecords.sizeInBytes, retentionBytes = createRecords.sizeInBytes * 15)
// append some messages to create some segments
for (_ <- 0 until 15)
- log.append(set)
+ log.append(createRecords)
log.deleteOldSegments
assertEquals("should have 3 segments", 3,log.numberOfSegments)
@@ -1523,12 +1522,12 @@ class LogTest extends JUnitSuite {
@Test
def shouldDeleteTimeBasedSegmentsReadyToBeDeleted() {
- val set = TestUtils.singletonRecords("test".getBytes, timestamp = 10)
- val log = createLog(set.sizeInBytes, retentionMs = 10000)
+ def createRecords = TestUtils.singletonRecords("test".getBytes, timestamp = 10)
+ val log = createLog(createRecords.sizeInBytes, retentionMs = 10000)
// append some messages to create some segments
for (_ <- 0 until 15)
- log.append(set)
+ log.append(createRecords)
log.deleteOldSegments()
assertEquals("There should be 1 segment remaining", 1, log.numberOfSegments)
@@ -1536,12 +1535,12 @@ class LogTest extends JUnitSuite {
@Test
def shouldNotDeleteTimeBasedSegmentsWhenNoneReadyToBeDeleted() {
- val set = TestUtils.singletonRecords("test".getBytes, timestamp = time.milliseconds)
- val log = createLog(set.sizeInBytes, retentionMs = 10000000)
+ def createRecords = TestUtils.singletonRecords("test".getBytes, timestamp = time.milliseconds)
+ val log = createLog(createRecords.sizeInBytes, retentionMs = 10000000)
// append some messages to create some segments
for (_ <- 0 until 15)
- log.append(set)
+ log.append(createRecords)
log.deleteOldSegments()
assertEquals("There should be 3 segments remaining", 3, log.numberOfSegments)
@@ -1549,14 +1548,14 @@ class LogTest extends JUnitSuite {
@Test
def shouldNotDeleteSegmentsWhenPolicyDoesNotIncludeDelete() {
- val set = TestUtils.singletonRecords("test".getBytes, key = "test".getBytes(), timestamp = 10L)
- val log = createLog(set.sizeInBytes,
+ def createRecords = TestUtils.singletonRecords("test".getBytes, key = "test".getBytes(), timestamp = 10L)
+ val log = createLog(createRecords.sizeInBytes,
retentionMs = 10000,
cleanupPolicy = "compact")
// append some messages to create some segments
for (_ <- 0 until 15)
- log.append(set)
+ log.append(createRecords)
// mark oldest segment as older the retention.ms
log.logSegments.head.lastModified = time.milliseconds - 20000
@@ -1568,14 +1567,14 @@ class LogTest extends JUnitSuite {
@Test
def shouldDeleteSegmentsReadyToBeDeletedWhenCleanupPolicyIsCompactAndDelete() {
- val set = TestUtils.singletonRecords("test".getBytes, key = "test".getBytes,timestamp = 10L)
- val log = createLog(set.sizeInBytes,
+ def createRecords = TestUtils.singletonRecords("test".getBytes, key = "test".getBytes,timestamp = 10L)
+ val log = createLog(createRecords.sizeInBytes,
retentionMs = 10000,
cleanupPolicy = "compact,delete")
// append some messages to create some segments
for (_ <- 0 until 15)
- log.append(set)
+ log.append(createRecords)
log.deleteOldSegments()
assertEquals("There should be 1 segment remaining", 1, log.numberOfSegments)
@@ -1583,25 +1582,24 @@ class LogTest extends JUnitSuite {
@Test
def shouldApplyEpochToMessageOnAppendIfLeader() {
- val messageIds = (0 until 50).toArray
- val records = messageIds.map(id => new SimpleRecord(id.toString.getBytes))
+ val records = (0 until 50).toArray.map(id => new SimpleRecord(id.toString.getBytes))
//Given this partition is on leader epoch 72
val epoch = 72
val log = new Log(logDir, LogConfig(), recoveryPoint = 0L, scheduler = time.scheduler, time = time)
//When appending messages as a leader (i.e. assignOffsets = true)
- for (i <- records.indices)
+ for (record <- records)
log.append(
- MemoryRecords.withRecords(messageIds(i), CompressionType.NONE, records(i)),
+ MemoryRecords.withRecords(CompressionType.NONE, record),
leaderEpochCache = mockCache(epoch),
assignOffsets = true
)
//Then leader epoch should be set on messages
for (i <- records.indices) {
- val read = log.read(i, 100, Some(i+1)).records.batches().iterator.next()
- assertEquals("Should have set leader epoch", 72, read.partitionLeaderEpoch())
+ val read = log.read(i, 100, Some(i+1)).records.batches.iterator.next()
+ assertEquals("Should have set leader epoch", 72, read.partitionLeaderEpoch)
}
}
@@ -1615,7 +1613,7 @@ class LogTest extends JUnitSuite {
//Given each message has an offset & epoch, as msgs from leader would
def recordsForEpoch(i: Int): MemoryRecords = {
val recs = MemoryRecords.withRecords(messageIds(i), CompressionType.NONE, records(i))
- recs.batches().asScala.foreach{record =>
+ recs.batches.asScala.foreach{record =>
record.setPartitionLeaderEpoch(42)
record.setLastOffset(i)
}
@@ -1623,7 +1621,7 @@ class LogTest extends JUnitSuite {
}
//Verify we save the epoch to the cache.
- expect(cache.assign(EasyMock.eq(42), anyInt())).times(records.size)
+ expect(cache.assign(EasyMock.eq(42), anyInt)).times(records.size)
replay(cache)
val log = new Log(logDir, LogConfig(), recoveryPoint = 0L, scheduler = time.scheduler, time = time)
@@ -1637,13 +1635,13 @@ class LogTest extends JUnitSuite {
@Test
def shouldTruncateLeaderEpochsWhenDeletingSegments() {
- val set = TestUtils.singletonRecords("test".getBytes)
- val log = createLog(set.sizeInBytes, retentionBytes = set.sizeInBytes * 10)
+ def createRecords = TestUtils.singletonRecords("test".getBytes)
+ val log = createLog(createRecords.sizeInBytes, retentionBytes = createRecords.sizeInBytes * 10)
val cache = epochCache(log)
// Given three segments of 5 messages each
for (e <- 0 until 15) {
- log.append(set)
+ log.append(createRecords)
}
//Given epochs
@@ -1660,13 +1658,13 @@ class LogTest extends JUnitSuite {
@Test
def shouldUpdateOffsetForLeaderEpochsWhenDeletingSegments() {
- val set = TestUtils.singletonRecords("test".getBytes)
- val log = createLog(set.sizeInBytes, retentionBytes = set.sizeInBytes * 10)
+ def createRecords = TestUtils.singletonRecords("test".getBytes)
+ val log = createLog(createRecords.sizeInBytes, retentionBytes = createRecords.sizeInBytes * 10)
val cache = epochCache(log)
// Given three segments of 5 messages each
for (e <- 0 until 15) {
- log.append(set)
+ log.append(createRecords)
}
//Given epochs
@@ -1683,14 +1681,14 @@ class LogTest extends JUnitSuite {
@Test
def shouldTruncateLeaderEpochFileWhenTruncatingLog() {
- val set = TestUtils.singletonRecords(value = "test".getBytes, timestamp = time.milliseconds)
- val logProps = CoreUtils.propsWith(LogConfig.SegmentBytesProp, (10 * set.sizeInBytes).toString)
+ def createRecords = TestUtils.singletonRecords(value = "test".getBytes, timestamp = time.milliseconds)
+ val logProps = CoreUtils.propsWith(LogConfig.SegmentBytesProp, (10 * createRecords.sizeInBytes).toString)
val log = new Log(logDir, LogConfig( logProps), recoveryPoint = 0L, scheduler = time.scheduler, time = time)
val cache = epochCache(log)
//Given 2 segments, 10 messages per segment
for (epoch <- 1 to 20)
- log.append(set)
+ log.append(createRecords)
//Simulate some leader changes at specific offsets
cache.assign(0, 0)
@@ -1704,25 +1702,25 @@ class LogTest extends JUnitSuite {
log.truncateTo(log.logEndOffset)
//Then no change
- assertEquals(3, cache.epochEntries().size)
+ assertEquals(3, cache.epochEntries.size)
//When truncate
log.truncateTo(11)
//Then no change
- assertEquals(2, cache.epochEntries().size)
+ assertEquals(2, cache.epochEntries.size)
//When truncate
log.truncateTo(10)
//Then
- assertEquals(1, cache.epochEntries().size)
+ assertEquals(1, cache.epochEntries.size)
//When truncate all
log.truncateTo(0)
//Then
- assertEquals(0, cache.epochEntries().size)
+ assertEquals(0, cache.epochEntries.size)
}
/**
@@ -1800,7 +1798,7 @@ class LogTest extends JUnitSuite {
private def mockCache(epoch: Int) = {
val cache = EasyMock.createNiceMock(classOf[LeaderEpochCache])
- EasyMock.expect(cache.latestUsedEpoch()).andReturn(epoch).anyTimes()
+ EasyMock.expect(cache.latestUsedEpoch).andReturn(epoch).anyTimes
EasyMock.replay(cache)
cache
}