You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ij...@apache.org on 2017/04/07 17:30:11 UTC

[2/2] kafka git commit: MINOR: Log append validation improvements

MINOR: Log append validation improvements

- Consistent validation across different code paths in LogValidator
- Validate baseOffset for message format V2
- Flesh out LogValidatorTest to check producerId, baseSequence, producerEpoch and partitionLeaderEpoch.

Author: Ismael Juma <is...@juma.me.uk>

Reviewers: Jun Rao <ju...@gmail.com>

Closes #2802 from ijuma/validate-base-offset


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/5cf64f06
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/5cf64f06
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/5cf64f06

Branch: refs/heads/trunk
Commit: 5cf64f06a877a181d12a2ae2390516ba1a572135
Parents: a4c5068
Author: Ismael Juma <is...@juma.me.uk>
Authored: Fri Apr 7 18:29:55 2017 +0100
Committer: Ismael Juma <is...@juma.me.uk>
Committed: Fri Apr 7 18:29:55 2017 +0100

----------------------------------------------------------------------
 .../record/AbstractLegacyRecordBatch.java       |   2 +-
 .../kafka/common/record/MemoryRecords.java      |  31 +-
 .../common/record/MemoryRecordsBuilder.java     |  33 +-
 .../apache/kafka/common/record/RecordBatch.java |   2 +-
 .../kafka/common/requests/EpochEndOffset.java   |   4 +-
 .../common/record/MemoryRecordsBuilderTest.java |  33 +-
 .../kafka/common/record/MemoryRecordsTest.java  |   6 +-
 core/src/main/scala/kafka/log/Log.scala         |  11 +-
 .../src/main/scala/kafka/log/LogValidator.scala | 153 ++++----
 .../scala/kafka/log/ProducerIdMapping.scala     |   2 +-
 .../server/epoch/LeaderEpochFileCache.scala     |   5 +-
 .../scala/unit/kafka/log/LogCleanerTest.scala   |  21 +-
 .../src/test/scala/unit/kafka/log/LogTest.scala | 214 ++++++-----
 .../scala/unit/kafka/log/LogValidatorTest.scala | 368 +++++++++++++------
 ...rivenReplicationProtocolAcceptanceTest.scala |   3 +-
 15 files changed, 529 insertions(+), 359 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/5cf64f06/clients/src/main/java/org/apache/kafka/common/record/AbstractLegacyRecordBatch.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/AbstractLegacyRecordBatch.java b/clients/src/main/java/org/apache/kafka/common/record/AbstractLegacyRecordBatch.java
index 7e09b93..ddb2bc7 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/AbstractLegacyRecordBatch.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/AbstractLegacyRecordBatch.java
@@ -201,7 +201,7 @@ public abstract class AbstractLegacyRecordBatch extends AbstractRecordBatch impl
 
     @Override
     public int partitionLeaderEpoch() {
-        return RecordBatch.UNKNOWN_PARTITION_LEADER_EPOCH;
+        return RecordBatch.NO_PARTITION_LEADER_EPOCH;
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/kafka/blob/5cf64f06/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
index b3beed5..9a20a97 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
@@ -303,7 +303,7 @@ public class MemoryRecords extends AbstractRecords {
                                                long baseOffset,
                                                long logAppendTime) {
         return builder(buffer, magic, compressionType, timestampType, baseOffset, logAppendTime,
-                RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, RecordBatch.NO_SEQUENCE, RecordBatch.UNKNOWN_PARTITION_LEADER_EPOCH);
+                RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, RecordBatch.NO_SEQUENCE, RecordBatch.NO_PARTITION_LEADER_EPOCH);
     }
 
     public static MemoryRecordsBuilder builder(ByteBuffer buffer,
@@ -323,11 +323,11 @@ public class MemoryRecords extends AbstractRecords {
                                                TimestampType timestampType,
                                                long baseOffset,
                                                long logAppendTime,
-                                               long pid,
-                                               short epoch,
+                                               long producerId,
+                                               short producerEpoch,
                                                int baseSequence) {
         return builder(buffer, magic, compressionType, timestampType, baseOffset, logAppendTime,
-                pid, epoch, baseSequence, RecordBatch.UNKNOWN_PARTITION_LEADER_EPOCH);
+                producerId, producerEpoch, baseSequence, RecordBatch.NO_PARTITION_LEADER_EPOCH);
     }
 
     public static MemoryRecordsBuilder builder(ByteBuffer buffer,
@@ -336,12 +336,12 @@ public class MemoryRecords extends AbstractRecords {
                                                TimestampType timestampType,
                                                long baseOffset,
                                                long logAppendTime,
-                                               long pid,
-                                               short epoch,
+                                               long producerId,
+                                               short producerEpoch,
                                                int baseSequence,
                                                int partitionLeaderEpoch) {
         return new MemoryRecordsBuilder(buffer, magic, compressionType, timestampType, baseOffset,
-                logAppendTime, pid, epoch, baseSequence, false, partitionLeaderEpoch,
+                logAppendTime, producerId, producerEpoch, baseSequence, false, partitionLeaderEpoch,
                 buffer.remaining());
     }
 
@@ -357,21 +357,22 @@ public class MemoryRecords extends AbstractRecords {
         return withRecords(RecordBatch.CURRENT_MAGIC_VALUE, initialOffset, compressionType, TimestampType.CREATE_TIME, records);
     }
 
-    public static MemoryRecords withRecords(long initialOffset, CompressionType compressionType, Long pid,
-                                            short epoch, int baseSequence, SimpleRecord... records) {
-        return withRecords(RecordBatch.CURRENT_MAGIC_VALUE, initialOffset, compressionType, TimestampType.CREATE_TIME,
-                pid, epoch, baseSequence, records);
+    public static MemoryRecords withRecords(byte magic, long initialOffset, CompressionType compressionType,
+                                            long producerId, short producerEpoch, int baseSequence,
+                                            int partitionLeaderEpoch, SimpleRecord... records) {
+        return withRecords(magic, initialOffset, compressionType, TimestampType.CREATE_TIME, producerId, producerEpoch,
+                baseSequence, partitionLeaderEpoch, records);
     }
 
     public static MemoryRecords withRecords(byte magic, long initialOffset, CompressionType compressionType,
                                             TimestampType timestampType, SimpleRecord... records) {
         return withRecords(magic, initialOffset, compressionType, timestampType, RecordBatch.NO_PRODUCER_ID,
-                RecordBatch.NO_PRODUCER_EPOCH, RecordBatch.NO_SEQUENCE, records);
+                RecordBatch.NO_PRODUCER_EPOCH, RecordBatch.NO_SEQUENCE, RecordBatch.NO_PARTITION_LEADER_EPOCH, records);
     }
 
     private static MemoryRecords withRecords(byte magic, long initialOffset, CompressionType compressionType,
-                                             TimestampType timestampType, long pid, short epoch, int baseSequence,
-                                             SimpleRecord ... records) {
+                                             TimestampType timestampType, long producerId, short producerEpoch,
+                                             int baseSequence, int partitionLeaderEpoch, SimpleRecord ... records) {
         if (records.length == 0)
             return MemoryRecords.EMPTY;
         int sizeEstimate = AbstractRecords.estimateSizeInBytes(magic, compressionType, Arrays.asList(records));
@@ -380,7 +381,7 @@ public class MemoryRecords extends AbstractRecords {
         if (timestampType == TimestampType.LOG_APPEND_TIME)
             logAppendTime = System.currentTimeMillis();
         MemoryRecordsBuilder builder = builder(buffer, magic, compressionType, timestampType, initialOffset,
-                logAppendTime, pid, epoch, baseSequence);
+                logAppendTime, producerId, producerEpoch, baseSequence, partitionLeaderEpoch);
         for (SimpleRecord record : records)
             builder.append(record);
         return builder.build();

http://git-wip-us.apache.org/repos/asf/kafka/blob/5cf64f06/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java
index 7f66193..208db5b 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java
@@ -189,17 +189,36 @@ public class MemoryRecordsBuilder {
     }
 
     /**
-     * Get the max timestamp and its offset. If the log append time is used, then the offset will
-     * be either the first offset in the set if no compression is used or the last offset otherwise.
+     * Get the max timestamp and its offset. The details of the offset returned are a bit subtle.
+     *
+     * If the log append time is used, the offset will be the last offset unless no compression is used and
+     * the message format version is 0 or 1, in which case, it will be the first offset.
+     *
+     * If create time is used, the offset will be the last offset unless no compression is used and the message
+     * format version is 0 or 1, in which case, it will be the offset of the record with the max timestamp.
+     *
      * @return The max timestamp and its offset
      */
     public RecordsInfo info() {
-        if (timestampType == TimestampType.LOG_APPEND_TIME)
-            return new RecordsInfo(logAppendTime,  lastOffset);
-        else if (maxTimestamp == RecordBatch.NO_TIMESTAMP)
+        if (timestampType == TimestampType.LOG_APPEND_TIME) {
+            long shallowOffsetOfMaxTimestamp;
+            // Use the last offset when dealing with record batches
+            if (compressionType != CompressionType.NONE || magic >= RecordBatch.MAGIC_VALUE_V2)
+                shallowOffsetOfMaxTimestamp = lastOffset;
+            else
+                shallowOffsetOfMaxTimestamp = baseOffset;
+            return new RecordsInfo(logAppendTime, shallowOffsetOfMaxTimestamp);
+        } else if (maxTimestamp == RecordBatch.NO_TIMESTAMP) {
             return new RecordsInfo(RecordBatch.NO_TIMESTAMP, lastOffset);
-        else
-            return new RecordsInfo(maxTimestamp, compressionType == CompressionType.NONE ? offsetOfMaxTimestamp : lastOffset);
+        } else {
+            long shallowOffsetOfMaxTimestamp;
+            // Use the last offset when dealing with record batches
+            if (compressionType != CompressionType.NONE || magic >= RecordBatch.MAGIC_VALUE_V2)
+                shallowOffsetOfMaxTimestamp = lastOffset;
+            else
+                shallowOffsetOfMaxTimestamp = offsetOfMaxTimestamp;
+            return new RecordsInfo(maxTimestamp, shallowOffsetOfMaxTimestamp);
+        }
     }
 
     public void setProducerState(long pid, short epoch, int baseSequence) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/5cf64f06/clients/src/main/java/org/apache/kafka/common/record/RecordBatch.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/RecordBatch.java b/clients/src/main/java/org/apache/kafka/common/record/RecordBatch.java
index ae4a225..4fd03e1 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/RecordBatch.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/RecordBatch.java
@@ -58,7 +58,7 @@ public interface RecordBatch extends Iterable<Record> {
      * Used to indicate an unknown leader epoch, which will be the case when the record set is
      * first created by the producer.
      */
-    int UNKNOWN_PARTITION_LEADER_EPOCH = -1;
+    int NO_PARTITION_LEADER_EPOCH = -1;
 
     /**
      * Check whether the checksum of this batch is correct.

http://git-wip-us.apache.org/repos/asf/kafka/blob/5cf64f06/clients/src/main/java/org/apache/kafka/common/requests/EpochEndOffset.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/EpochEndOffset.java b/clients/src/main/java/org/apache/kafka/common/requests/EpochEndOffset.java
index 2d49149..0965e36 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/EpochEndOffset.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/EpochEndOffset.java
@@ -18,14 +18,14 @@ package org.apache.kafka.common.requests;
 
 import org.apache.kafka.common.protocol.Errors;
 
-import static org.apache.kafka.common.record.RecordBatch.UNKNOWN_PARTITION_LEADER_EPOCH;
+import static org.apache.kafka.common.record.RecordBatch.NO_PARTITION_LEADER_EPOCH;
 
 /**
  * The offset, fetched from a leader, for a particular partition.
  */
 
 public class EpochEndOffset {
-    public static final long UNDEFINED_EPOCH_OFFSET = UNKNOWN_PARTITION_LEADER_EPOCH;
+    public static final long UNDEFINED_EPOCH_OFFSET = NO_PARTITION_LEADER_EPOCH;
     public static final int UNDEFINED_EPOCH = -1;
 
     private Errors error;

http://git-wip-us.apache.org/repos/asf/kafka/blob/5cf64f06/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsBuilderTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsBuilderTest.java b/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsBuilderTest.java
index ef48783..3cc8c20 100644
--- a/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsBuilderTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsBuilderTest.java
@@ -50,7 +50,7 @@ public class MemoryRecordsBuilderTest {
 
         MemoryRecordsBuilder builder = new MemoryRecordsBuilder(buffer, RecordBatch.MAGIC_VALUE_V0, compressionType,
                 TimestampType.CREATE_TIME, 0L, 0L, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, RecordBatch.NO_SEQUENCE,
-                false, RecordBatch.UNKNOWN_PARTITION_LEADER_EPOCH, buffer.capacity());
+                false, RecordBatch.NO_PARTITION_LEADER_EPOCH, buffer.capacity());
         MemoryRecords records = builder.build();
         assertEquals(0, records.sizeInBytes());
         assertEquals(bufferOffset, buffer.position());
@@ -66,7 +66,7 @@ public class MemoryRecordsBuilderTest {
         int sequence = 2342;
 
         MemoryRecordsBuilder builder = new MemoryRecordsBuilder(buffer, RecordBatch.CURRENT_MAGIC_VALUE, compressionType,
-                TimestampType.CREATE_TIME, 0L, 0L, pid, epoch, sequence, true, RecordBatch.UNKNOWN_PARTITION_LEADER_EPOCH,
+                TimestampType.CREATE_TIME, 0L, 0L, pid, epoch, sequence, true, RecordBatch.NO_PARTITION_LEADER_EPOCH,
                 buffer.capacity());
         builder.append(System.currentTimeMillis(), "foo".getBytes(), "bar".getBytes());
         MemoryRecords records = builder.build();
@@ -86,7 +86,7 @@ public class MemoryRecordsBuilderTest {
         int sequence = 2342;
 
         new MemoryRecordsBuilder(buffer, RecordBatch.MAGIC_VALUE_V0, compressionType, TimestampType.CREATE_TIME,
-                0L, 0L, pid, epoch, sequence, true, RecordBatch.UNKNOWN_PARTITION_LEADER_EPOCH, buffer.capacity());
+                0L, 0L, pid, epoch, sequence, true, RecordBatch.NO_PARTITION_LEADER_EPOCH, buffer.capacity());
     }
 
     @Test(expected = IllegalArgumentException.class)
@@ -99,7 +99,7 @@ public class MemoryRecordsBuilderTest {
         int sequence = 2342;
 
         new MemoryRecordsBuilder(buffer, RecordBatch.MAGIC_VALUE_V1, compressionType, TimestampType.CREATE_TIME,
-                0L, 0L, pid, epoch, sequence, true, RecordBatch.UNKNOWN_PARTITION_LEADER_EPOCH, buffer.capacity());
+                0L, 0L, pid, epoch, sequence, true, RecordBatch.NO_PARTITION_LEADER_EPOCH, buffer.capacity());
     }
 
     @Test(expected = IllegalArgumentException.class)
@@ -112,7 +112,7 @@ public class MemoryRecordsBuilderTest {
         int sequence = 2342;
 
         new MemoryRecordsBuilder(buffer, RecordBatch.CURRENT_MAGIC_VALUE, compressionType, TimestampType.CREATE_TIME,
-                0L, 0L, pid, epoch, sequence, true, RecordBatch.UNKNOWN_PARTITION_LEADER_EPOCH, buffer.capacity());
+                0L, 0L, pid, epoch, sequence, true, RecordBatch.NO_PARTITION_LEADER_EPOCH, buffer.capacity());
     }
 
     @Test(expected = IllegalArgumentException.class)
@@ -125,7 +125,7 @@ public class MemoryRecordsBuilderTest {
         int sequence = 2342;
 
         new MemoryRecordsBuilder(buffer, RecordBatch.CURRENT_MAGIC_VALUE, compressionType, TimestampType.CREATE_TIME,
-                0L, 0L, pid, epoch, sequence, true, RecordBatch.UNKNOWN_PARTITION_LEADER_EPOCH, buffer.capacity());
+                0L, 0L, pid, epoch, sequence, true, RecordBatch.NO_PARTITION_LEADER_EPOCH, buffer.capacity());
     }
 
     @Test(expected = IllegalArgumentException.class)
@@ -138,7 +138,7 @@ public class MemoryRecordsBuilderTest {
         int sequence = RecordBatch.NO_SEQUENCE;
 
         new MemoryRecordsBuilder(buffer, RecordBatch.CURRENT_MAGIC_VALUE, compressionType, TimestampType.CREATE_TIME,
-                0L, 0L, pid, epoch, sequence, true, RecordBatch.UNKNOWN_PARTITION_LEADER_EPOCH, buffer.capacity());
+                0L, 0L, pid, epoch, sequence, true, RecordBatch.NO_PARTITION_LEADER_EPOCH, buffer.capacity());
     }
 
     @Test
@@ -154,7 +154,7 @@ public class MemoryRecordsBuilderTest {
 
         MemoryRecordsBuilder builder = new MemoryRecordsBuilder(buffer, RecordBatch.MAGIC_VALUE_V0, compressionType,
                 TimestampType.CREATE_TIME, 0L, 0L, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, RecordBatch.NO_SEQUENCE,
-                false, RecordBatch.UNKNOWN_PARTITION_LEADER_EPOCH, buffer.capacity());
+                false, RecordBatch.NO_PARTITION_LEADER_EPOCH, buffer.capacity());
 
         int uncompressedSize = 0;
         for (LegacyRecord record : records) {
@@ -185,7 +185,7 @@ public class MemoryRecordsBuilderTest {
 
         MemoryRecordsBuilder builder = new MemoryRecordsBuilder(buffer, RecordBatch.MAGIC_VALUE_V1, compressionType,
                 TimestampType.CREATE_TIME, 0L, 0L, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, RecordBatch.NO_SEQUENCE,
-                false, RecordBatch.UNKNOWN_PARTITION_LEADER_EPOCH, buffer.capacity());
+                false, RecordBatch.NO_PARTITION_LEADER_EPOCH, buffer.capacity());
 
         int uncompressedSize = 0;
         for (LegacyRecord record : records) {
@@ -211,7 +211,7 @@ public class MemoryRecordsBuilderTest {
         long logAppendTime = System.currentTimeMillis();
         MemoryRecordsBuilder builder = new MemoryRecordsBuilder(buffer, RecordBatch.MAGIC_VALUE_V1, compressionType,
                 TimestampType.LOG_APPEND_TIME, 0L, logAppendTime, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH,
-                RecordBatch.NO_SEQUENCE, false, RecordBatch.UNKNOWN_PARTITION_LEADER_EPOCH, buffer.capacity());
+                RecordBatch.NO_SEQUENCE, false, RecordBatch.NO_PARTITION_LEADER_EPOCH, buffer.capacity());
         builder.append(0L, "a".getBytes(), "1".getBytes());
         builder.append(0L, "b".getBytes(), "2".getBytes());
         builder.append(0L, "c".getBytes(), "3".getBytes());
@@ -220,7 +220,10 @@ public class MemoryRecordsBuilderTest {
         MemoryRecordsBuilder.RecordsInfo info = builder.info();
         assertEquals(logAppendTime, info.maxTimestamp);
 
-        assertEquals(2L, info.shallowOffsetOfMaxTimestamp);
+        if (compressionType != CompressionType.NONE)
+            assertEquals(2L, info.shallowOffsetOfMaxTimestamp);
+        else
+            assertEquals(0L, info.shallowOffsetOfMaxTimestamp);
 
         for (RecordBatch batch : records.batches()) {
             assertEquals(TimestampType.LOG_APPEND_TIME, batch.timestampType());
@@ -237,7 +240,7 @@ public class MemoryRecordsBuilderTest {
         long logAppendTime = System.currentTimeMillis();
         MemoryRecordsBuilder builder = new MemoryRecordsBuilder(buffer, RecordBatch.MAGIC_VALUE_V1, compressionType,
                 TimestampType.CREATE_TIME, 0L, logAppendTime, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, RecordBatch.NO_SEQUENCE,
-                false, RecordBatch.UNKNOWN_PARTITION_LEADER_EPOCH, buffer.capacity());
+                false, RecordBatch.NO_PARTITION_LEADER_EPOCH, buffer.capacity());
         builder.append(0L, "a".getBytes(), "1".getBytes());
         builder.append(2L, "b".getBytes(), "2".getBytes());
         builder.append(1L, "c".getBytes(), "3".getBytes());
@@ -270,7 +273,7 @@ public class MemoryRecordsBuilderTest {
         ByteBuffer buffer = ByteBuffer.allocate(512);
         MemoryRecordsBuilder builder = new MemoryRecordsBuilder(buffer, RecordBatch.CURRENT_MAGIC_VALUE, compressionType,
                 TimestampType.CREATE_TIME, 0L, LegacyRecord.NO_TIMESTAMP, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH,
-                RecordBatch.NO_SEQUENCE, false, RecordBatch.UNKNOWN_PARTITION_LEADER_EPOCH, writeLimit);
+                RecordBatch.NO_SEQUENCE, false, RecordBatch.NO_PARTITION_LEADER_EPOCH, writeLimit);
 
         assertFalse(builder.isFull());
         assertTrue(builder.hasRoomFor(0L, key, value));
@@ -296,7 +299,7 @@ public class MemoryRecordsBuilderTest {
         long logAppendTime = System.currentTimeMillis();
         MemoryRecordsBuilder builder = new MemoryRecordsBuilder(buffer, RecordBatch.MAGIC_VALUE_V1, compressionType,
                 TimestampType.CREATE_TIME, 0L, logAppendTime, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, RecordBatch.NO_SEQUENCE,
-                false, RecordBatch.UNKNOWN_PARTITION_LEADER_EPOCH, buffer.capacity());
+                false, RecordBatch.NO_PARTITION_LEADER_EPOCH, buffer.capacity());
         builder.append(0L, "a".getBytes(), "1".getBytes());
         builder.append(1L, "b".getBytes(), "2".getBytes());
 
@@ -324,7 +327,7 @@ public class MemoryRecordsBuilderTest {
         long logAppendTime = System.currentTimeMillis();
         MemoryRecordsBuilder builder = new MemoryRecordsBuilder(buffer, RecordBatch.MAGIC_VALUE_V1, compressionType,
                 TimestampType.CREATE_TIME, 0L, logAppendTime, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, RecordBatch.NO_SEQUENCE,
-                false, RecordBatch.UNKNOWN_PARTITION_LEADER_EPOCH, buffer.capacity());
+                false, RecordBatch.NO_PARTITION_LEADER_EPOCH, buffer.capacity());
 
         builder.appendWithOffset(0L, System.currentTimeMillis(), "a".getBytes(), null);
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/5cf64f06/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java b/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java
index a2c761f..ea430b1 100644
--- a/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java
@@ -100,7 +100,7 @@ public class MemoryRecordsTest {
                     assertEquals(RecordBatch.NO_PRODUCER_ID, batch.producerId());
                     assertEquals(RecordBatch.NO_PRODUCER_EPOCH, batch.producerEpoch());
                     assertEquals(RecordBatch.NO_SEQUENCE, batch.baseSequence());
-                    assertEquals(RecordBatch.UNKNOWN_PARTITION_LEADER_EPOCH, batch.partitionLeaderEpoch());
+                    assertEquals(RecordBatch.NO_PARTITION_LEADER_EPOCH, batch.partitionLeaderEpoch());
                     assertNull(batch.countOrNull());
                     if (magic == RecordBatch.MAGIC_VALUE_V0)
                         assertEquals(TimestampType.NO_TIMESTAMP_TYPE, batch.timestampType());
@@ -222,9 +222,9 @@ public class MemoryRecordsTest {
         assertEquals(4, result.messagesRetained);
         assertEquals(buffer.limit(), result.bytesRead);
         assertEquals(filtered.limit(), result.bytesRetained);
-        if (magic > 0) {
+        if (magic > RecordBatch.MAGIC_VALUE_V0) {
             assertEquals(20L, result.maxTimestamp);
-            if (compression == CompressionType.NONE)
+            if (compression == CompressionType.NONE && magic < RecordBatch.MAGIC_VALUE_V2)
                 assertEquals(4L, result.shallowOffsetOfMaxTimestamp);
             else
                 assertEquals(5L, result.shallowOffsetOfMaxTimestamp);

http://git-wip-us.apache.org/repos/asf/kafka/blob/5cf64f06/core/src/main/scala/kafka/log/Log.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala
index 0e8cda8..d97dfa4 100644
--- a/core/src/main/scala/kafka/log/Log.scala
+++ b/core/src/main/scala/kafka/log/Log.scala
@@ -603,6 +603,10 @@ class Log(@volatile var dir: File,
     val producerAppendInfos = mutable.Map[Long, ProducerAppendInfo]()
 
     for (batch <- records.batches.asScala) {
+      // we only validate V2 and higher to avoid potential compatibility issues with older clients
+      if (batch.magic >= RecordBatch.MAGIC_VALUE_V2 && isFromClient && batch.baseOffset != 0)
+        throw new InvalidRecordException(s"The baseOffset of the record batch should be 0, but it is ${batch.baseOffset}")
+
       // update the first offset if on the first message. For magic versions older than 2, we use the last offset
       // to avoid the need to decompress the data (the last offset can be obtained directly from the wrapper message).
       // For magic version 2, we can get the first offset directly from the batch header.
@@ -621,8 +625,8 @@ class Log(@volatile var dir: File,
       if (batchSize > config.maxMessageSize) {
         BrokerTopicStats.getBrokerTopicStats(topicPartition.topic).bytesRejectedRate.mark(records.sizeInBytes)
         BrokerTopicStats.getBrokerAllTopicsStats.bytesRejectedRate.mark(records.sizeInBytes)
-        throw new RecordTooLargeException(s"Message size is $batchSize bytes which exceeds the maximum configured " +
-          s"message size of ${config.maxMessageSize}.")
+        throw new RecordTooLargeException(s"The record batch size is $batchSize bytes which exceeds the maximum configured " +
+          s"value of ${config.maxMessageSize}.")
       }
 
       // check the validity of the message by checking CRC
@@ -654,7 +658,8 @@ class Log(@volatile var dir: File,
               firstOffset = lastEntry.firstOffset
               lastOffset = lastEntry.lastOffset
               maxTimestamp = lastEntry.timestamp
-              info(s"Detected a duplicate at (firstOffset, lastOffset): (${firstOffset}, ${lastOffset}). Ignoring the incoming record.")
+              info(s"Detected a duplicate for partition $topicPartition at (firstOffset, lastOffset): (${firstOffset}, ${lastOffset}). " +
+                "Ignoring the incoming record.")
             } else {
               val producerAppendInfo = new ProducerAppendInfo(pid, lastEntry)
               producerAppendInfos.put(pid, producerAppendInfo)

http://git-wip-us.apache.org/repos/asf/kafka/blob/5cf64f06/core/src/main/scala/kafka/log/LogValidator.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/LogValidator.scala b/core/src/main/scala/kafka/log/LogValidator.scala
index fa520ad..ae3d846 100644
--- a/core/src/main/scala/kafka/log/LogValidator.scala
+++ b/core/src/main/scala/kafka/log/LogValidator.scala
@@ -19,7 +19,7 @@ package kafka.log
 import java.nio.ByteBuffer
 
 import kafka.common.LongRef
-import kafka.message.{CompressionCodec, InvalidMessageException, NoCompressionCodec}
+import kafka.message.{CompressionCodec, NoCompressionCodec}
 import kafka.utils.Logging
 import org.apache.kafka.common.errors.InvalidTimestampException
 import org.apache.kafka.common.record._
@@ -48,33 +48,46 @@ private[kafka] object LogValidator extends Logging {
                                                       now: Long,
                                                       sourceCodec: CompressionCodec,
                                                       targetCodec: CompressionCodec,
-                                                      compactedTopic: Boolean = false,
-                                                      messageFormatVersion: Byte = RecordBatch.CURRENT_MAGIC_VALUE,
-                                                      messageTimestampType: TimestampType,
-                                                      messageTimestampDiffMaxMs: Long,
-                                                      partitionLeaderEpoch: Int = RecordBatch.UNKNOWN_PARTITION_LEADER_EPOCH): ValidationAndOffsetAssignResult = {
+                                                      compactedTopic: Boolean,
+                                                      magic: Byte,
+                                                      timestampType: TimestampType,
+                                                      timestampDiffMaxMs: Long,
+                                                      partitionLeaderEpoch: Int): ValidationAndOffsetAssignResult = {
     if (sourceCodec == NoCompressionCodec && targetCodec == NoCompressionCodec) {
       // check the magic value
-      if (!records.hasMatchingMagic(messageFormatVersion))
-        convertAndAssignOffsetsNonCompressed(records, offsetCounter, compactedTopic, now, messageTimestampType,
-          messageTimestampDiffMaxMs, messageFormatVersion, partitionLeaderEpoch)
+      if (!records.hasMatchingMagic(magic))
+        convertAndAssignOffsetsNonCompressed(records, offsetCounter, compactedTopic, now, timestampType,
+          timestampDiffMaxMs, magic, partitionLeaderEpoch)
       else
         // Do in-place validation, offset assignment and maybe set timestamp
-        assignOffsetsNonCompressed(records, offsetCounter, now, compactedTopic, messageTimestampType,
-          messageTimestampDiffMaxMs, partitionLeaderEpoch)
+        assignOffsetsNonCompressed(records, offsetCounter, now, compactedTopic, timestampType, timestampDiffMaxMs,
+          partitionLeaderEpoch)
     } else {
-
       validateMessagesAndAssignOffsetsCompressed(records, offsetCounter, now, sourceCodec, targetCodec, compactedTopic,
-        messageFormatVersion, messageTimestampType, messageTimestampDiffMaxMs, partitionLeaderEpoch)
+        magic, timestampType, timestampDiffMaxMs, partitionLeaderEpoch)
     }
   }
 
+  private def validateBatch(batch: RecordBatch): Unit = {
+    ensureNonTransactional(batch)
+  }
+
+  private def validateRecord(batch: RecordBatch, record: Record, now: Long, timestampType: TimestampType,
+                             timestampDiffMaxMs: Long, compactedTopic: Boolean): Unit = {
+    if (!record.hasMagic(batch.magic))
+      throw new InvalidRecordException(s"Log record magic does not match outer magic ${batch.magic}")
+    record.ensureValid()
+    ensureNotControlRecord(record)
+    validateKey(record, compactedTopic)
+    validateTimestamp(batch, record, now, timestampType, timestampDiffMaxMs)
+  }
+
   private def convertAndAssignOffsetsNonCompressed(records: MemoryRecords,
                                                    offsetCounter: LongRef,
                                                    compactedTopic: Boolean,
                                                    now: Long,
                                                    timestampType: TimestampType,
-                                                   messageTimestampDiffMaxMs: Long,
+                                                   timestampDiffMaxMs: Long,
                                                    toMagicValue: Byte,
                                                    partitionLeaderEpoch: Int): ValidationAndOffsetAssignResult = {
     val sizeInBytesAfterConversion = AbstractRecords.estimateSizeInBytes(toMagicValue, offsetCounter.value,
@@ -90,12 +103,10 @@ private[kafka] object LogValidator extends Logging {
       offsetCounter.value, now, pid, epoch, sequence, partitionLeaderEpoch)
 
     for (batch <- records.batches.asScala) {
-      ensureNonTransactional(batch)
+      validateBatch(batch)
 
       for (record <- batch.asScala) {
-        ensureNotControlRecord(record)
-        validateKey(record, compactedTopic)
-        validateTimestamp(batch, record, now, timestampType, messageTimestampDiffMaxMs)
+        validateRecord(batch, record, now, timestampType, timestampDiffMaxMs, compactedTopic)
         builder.appendWithOffset(offsetCounter.getAndIncrement(), record)
       }
     }
@@ -111,7 +122,7 @@ private[kafka] object LogValidator extends Logging {
 
   private def assignOffsetsNonCompressed(records: MemoryRecords,
                                          offsetCounter: LongRef,
-                                         currentTimestamp: Long,
+                                         now: Long,
                                          compactedTopic: Boolean,
                                          timestampType: TimestampType,
                                          timestampDiffMaxMs: Long,
@@ -119,40 +130,50 @@ private[kafka] object LogValidator extends Logging {
     var maxTimestamp = RecordBatch.NO_TIMESTAMP
     var offsetOfMaxTimestamp = -1L
     val initialOffset = offsetCounter.value
+    var isMagicV2 = false
 
     for (batch <- records.batches.asScala) {
-      ensureNonTransactional(batch)
+      validateBatch(batch)
+
+      var maxBatchTimestamp = RecordBatch.NO_TIMESTAMP
+      var offsetOfMaxBatchTimestamp = -1L
 
       for (record <- batch.asScala) {
-        record.ensureValid()
-        ensureNotControlRecord(record)
-        validateKey(record, compactedTopic)
+        validateRecord(batch, record, now, timestampType, timestampDiffMaxMs, compactedTopic)
 
         val offset = offsetCounter.getAndIncrement()
-        if (batch.magic > RecordBatch.MAGIC_VALUE_V0) {
-          validateTimestamp(batch, record, currentTimestamp, timestampType, timestampDiffMaxMs)
-
-          if (record.timestamp > maxTimestamp) {
-            maxTimestamp = record.timestamp
-            offsetOfMaxTimestamp = offset
-          }
+        if (batch.magic > RecordBatch.MAGIC_VALUE_V0 && record.timestamp > maxBatchTimestamp) {
+          maxBatchTimestamp = record.timestamp
+          offsetOfMaxBatchTimestamp = offset
         }
       }
 
+      if (batch.magic > RecordBatch.MAGIC_VALUE_V0 && maxBatchTimestamp > maxTimestamp) {
+        maxTimestamp = maxBatchTimestamp
+        offsetOfMaxTimestamp = offsetOfMaxBatchTimestamp
+      }
+
       batch.setLastOffset(offsetCounter.value - 1)
 
-      if(batch.magic >= RecordBatch.MAGIC_VALUE_V2)
+      if (batch.magic >= RecordBatch.MAGIC_VALUE_V2)
         batch.setPartitionLeaderEpoch(partitionLeaderEpoch)
 
-      // TODO: in the compressed path, we ensure that the batch max timestamp is correct.
-      //       We should either do the same or (better) let those two paths converge.
-      if (batch.magic > RecordBatch.MAGIC_VALUE_V0 && timestampType == TimestampType.LOG_APPEND_TIME)
-        batch.setMaxTimestamp(TimestampType.LOG_APPEND_TIME, currentTimestamp)
+      if (batch.magic > RecordBatch.MAGIC_VALUE_V0) {
+        if (timestampType == TimestampType.LOG_APPEND_TIME)
+          batch.setMaxTimestamp(TimestampType.LOG_APPEND_TIME, now)
+        else
+          batch.setMaxTimestamp(timestampType, maxBatchTimestamp)
+      }
+
+      isMagicV2 = batch.magic >= RecordBatch.MAGIC_VALUE_V2
     }
 
     if (timestampType == TimestampType.LOG_APPEND_TIME) {
-      maxTimestamp = currentTimestamp
-      offsetOfMaxTimestamp = initialOffset
+      maxTimestamp = now
+      if (isMagicV2)
+        offsetOfMaxTimestamp = offsetCounter.value - 1
+      else
+        offsetOfMaxTimestamp = initialOffset
     }
 
     ValidationAndOffsetAssignResult(
@@ -171,36 +192,32 @@ private[kafka] object LogValidator extends Logging {
    */
   def validateMessagesAndAssignOffsetsCompressed(records: MemoryRecords,
                                                  offsetCounter: LongRef,
-                                                 currentTimestamp: Long,
+                                                 now: Long,
                                                  sourceCodec: CompressionCodec,
                                                  targetCodec: CompressionCodec,
-                                                 compactedTopic: Boolean = false,
-                                                 messageFormatVersion: Byte = RecordBatch.CURRENT_MAGIC_VALUE,
-                                                 messageTimestampType: TimestampType,
-                                                 messageTimestampDiffMaxMs: Long,
+                                                 compactedTopic: Boolean,
+                                                 magic: Byte,
+                                                 timestampType: TimestampType,
+                                                 timestampDiffMaxMs: Long,
                                                  partitionLeaderEpoch: Int): ValidationAndOffsetAssignResult = {
 
       // No in place assignment situation 1 and 2
-      var inPlaceAssignment = sourceCodec == targetCodec && messageFormatVersion > RecordBatch.MAGIC_VALUE_V0
+      var inPlaceAssignment = sourceCodec == targetCodec && magic > RecordBatch.MAGIC_VALUE_V0
 
       var maxTimestamp = RecordBatch.NO_TIMESTAMP
       val expectedInnerOffset = new LongRef(0)
       val validatedRecords = new mutable.ArrayBuffer[Record]
 
       for (batch <- records.batches.asScala) {
-        ensureNonTransactional(batch)
+        validateBatch(batch)
 
         for (record <- batch.asScala) {
-          if (!record.hasMagic(batch.magic))
-            throw new InvalidRecordException(s"Log record magic does not match outer magic ${batch.magic}")
-
-          record.ensureValid()
-          ensureNotControlRecord(record)
-          validateKey(record, compactedTopic)
+          validateRecord(batch, record, now, timestampType, timestampDiffMaxMs, compactedTopic)
+          if (sourceCodec != NoCompressionCodec && record.isCompressed)
+            throw new InvalidRecordException("Compressed outer record should not have an inner record with a " +
+              s"compression attribute set: $record")
 
-          if (!record.hasMagic(RecordBatch.MAGIC_VALUE_V0) && messageFormatVersion > RecordBatch.MAGIC_VALUE_V0) {
-            // Validate the timestamp
-            validateTimestamp(batch, record, currentTimestamp, messageTimestampType, messageTimestampDiffMaxMs)
+          if (batch.magic > RecordBatch.MAGIC_VALUE_V0 && magic > RecordBatch.MAGIC_VALUE_V0) {
             // Check if we need to overwrite offset
             // No in place assignment situation 3
             if (record.offset != expectedInnerOffset.getAndIncrement())
@@ -209,12 +226,8 @@ private[kafka] object LogValidator extends Logging {
               maxTimestamp = record.timestamp
           }
 
-          if (sourceCodec != NoCompressionCodec && record.isCompressed)
-            throw new InvalidMessageException("Compressed outer record should not have an inner record with a " +
-              s"compression attribute set: $record")
-
           // No in place assignment situation 4
-          if (!record.hasMagic(messageFormatVersion))
+          if (!record.hasMagic(magic))
             inPlaceAssignment = false
 
           validatedRecords += record
@@ -223,14 +236,14 @@ private[kafka] object LogValidator extends Logging {
 
       if (!inPlaceAssignment) {
         val (pid, epoch, sequence) = {
-          // note that we only reassign offsets for requests coming straight from a producer. For records with MagicV2,
+          // note that we only reassign offsets for requests coming straight from a producer. For records with magic V2,
           // there should be exactly one RecordBatch per request, so the following is all we need to do. For Records
-          // with older magic versions, this will always be NO_PRODUCER_ID, etc.
+          // with older magic versions, there will never be a producer id, etc.
           val first = records.batches.asScala.head
           (first.producerId, first.producerEpoch, first.baseSequence)
         }
-        buildRecordsAndAssignOffsets(messageFormatVersion, offsetCounter, messageTimestampType,
-          CompressionType.forId(targetCodec.codec), currentTimestamp, validatedRecords, pid, epoch, sequence, partitionLeaderEpoch)
+        buildRecordsAndAssignOffsets(magic, offsetCounter, timestampType, CompressionType.forId(targetCodec.codec), now,
+          validatedRecords, pid, epoch, sequence, partitionLeaderEpoch)
       } else {
         // we can update the batch only and write the compressed payload as is
         val batch = records.batches.iterator.next()
@@ -238,13 +251,13 @@ private[kafka] object LogValidator extends Logging {
 
         batch.setLastOffset(lastOffset)
 
-        if (messageTimestampType == TimestampType.LOG_APPEND_TIME)
-          maxTimestamp = currentTimestamp
+        if (timestampType == TimestampType.LOG_APPEND_TIME)
+          maxTimestamp = now
 
-        if (messageFormatVersion >= RecordBatch.MAGIC_VALUE_V1)
-          batch.setMaxTimestamp(messageTimestampType, maxTimestamp)
+        if (magic >= RecordBatch.MAGIC_VALUE_V1)
+          batch.setMaxTimestamp(timestampType, maxTimestamp)
 
-        if(messageFormatVersion >= RecordBatch.MAGIC_VALUE_V2)
+        if (magic >= RecordBatch.MAGIC_VALUE_V2)
           batch.setPartitionLeaderEpoch(partitionLeaderEpoch)
 
         ValidationAndOffsetAssignResult(validatedRecords = records,
@@ -290,7 +303,7 @@ private[kafka] object LogValidator extends Logging {
 
   private def validateKey(record: Record, compactedTopic: Boolean) {
     if (compactedTopic && !record.hasKey)
-      throw new InvalidMessageException("Compacted topic cannot accept message without key.")
+      throw new InvalidRecordException("Compacted topic cannot accept message without key.")
   }
 
   /**
@@ -305,8 +318,8 @@ private[kafka] object LogValidator extends Logging {
     if (timestampType == TimestampType.CREATE_TIME
       && record.timestamp != RecordBatch.NO_TIMESTAMP
       && math.abs(record.timestamp - now) > timestampDiffMaxMs)
-      throw new InvalidTimestampException(s"Timestamp ${record.timestamp} of message is out of range. " +
-        s"The timestamp should be within [${now - timestampDiffMaxMs}, ${now + timestampDiffMaxMs}]")
+      throw new InvalidTimestampException(s"Timestamp ${record.timestamp} of message with offset ${record.offset} is " +
+        s"out of range. The timestamp should be within [${now - timestampDiffMaxMs}, ${now + timestampDiffMaxMs}]")
     if (batch.timestampType == TimestampType.LOG_APPEND_TIME)
       throw new InvalidTimestampException(s"Invalid timestamp type in message $record. Producer should not set " +
         s"timestamp type to LogAppendTime.")

http://git-wip-us.apache.org/repos/asf/kafka/blob/5cf64f06/core/src/main/scala/kafka/log/ProducerIdMapping.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/ProducerIdMapping.scala b/core/src/main/scala/kafka/log/ProducerIdMapping.scala
index a870b7d..054b2f6 100644
--- a/core/src/main/scala/kafka/log/ProducerIdMapping.scala
+++ b/core/src/main/scala/kafka/log/ProducerIdMapping.scala
@@ -47,7 +47,7 @@ private[log] case class ProducerIdEntry(epoch: Short, lastSeq: Int, lastOffset:
 }
 
 private[log] class ProducerAppendInfo(val pid: Long, initialEntry: ProducerIdEntry) {
-  // the initialEntry here is the last successfull appended batch. we validate incoming entries transitively, starting
+  // the initialEntry here is the last successful appended batch. we validate incoming entries transitively, starting
   // with the last appended entry.
   private var epoch = initialEntry.epoch
   private var firstSeq = initialEntry.firstSeq

http://git-wip-us.apache.org/repos/asf/kafka/blob/5cf64f06/core/src/main/scala/kafka/server/epoch/LeaderEpochFileCache.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/epoch/LeaderEpochFileCache.scala b/core/src/main/scala/kafka/server/epoch/LeaderEpochFileCache.scala
index 4a4727e..fe07a74 100644
--- a/core/src/main/scala/kafka/server/epoch/LeaderEpochFileCache.scala
+++ b/core/src/main/scala/kafka/server/epoch/LeaderEpochFileCache.scala
@@ -24,7 +24,6 @@ import org.apache.kafka.common.requests.EpochEndOffset.{UNDEFINED_EPOCH, UNDEFIN
 import kafka.utils.CoreUtils._
 import kafka.utils.Logging
 import org.apache.kafka.common.TopicPartition
-import org.apache.kafka.common.requests.EpochEndOffset
 
 import scala.collection.mutable.ListBuffer
 
@@ -50,7 +49,7 @@ trait LeaderEpochCache {
   */
 class LeaderEpochFileCache(topicPartition: TopicPartition, leo: () => LogOffsetMetadata, checkpoint: LeaderEpochCheckpoint) extends LeaderEpochCache with Logging {
   private val lock = new ReentrantReadWriteLock()
-  private var epochs: ListBuffer[EpochEntry] = lock synchronized { ListBuffer(checkpoint.read(): _*) }
+  private var epochs: ListBuffer[EpochEntry] = lock.synchronized { ListBuffer(checkpoint.read(): _*) }
   private var cachedLatestEpoch: Option[Int] = None //epoch which has yet to be assigned to a message.
 
   /**
@@ -221,4 +220,4 @@ class LeaderEpochFileCache(topicPartition: TopicPartition, leo: () => LogOffsetM
 }
 
 // Mapping of epoch to the first offset of the subsequent epoch
-case class EpochEntry(epoch: Int, startOffset: Long)
\ No newline at end of file
+case class EpochEntry(epoch: Int, startOffset: Long)

http://git-wip-us.apache.org/repos/asf/kafka/blob/5cf64f06/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
index 928b03d..3261626 100755
--- a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
@@ -276,9 +276,9 @@ class LogCleanerTest extends JUnitSuite {
     val log = makeLog(config = LogConfig.fromProps(logConfig.originals, logProps))
 
     // create 6 segments with only one message in each segment
-    val messageSet = TestUtils.singletonRecords(value = Array.fill[Byte](25)(0), key = 1.toString.getBytes)
+    def createRecorcs = TestUtils.singletonRecords(value = Array.fill[Byte](25)(0), key = 1.toString.getBytes)
     for (_ <- 0 until 6)
-      log.append(messageSet, assignOffsets = true)
+      log.append(createRecorcs, assignOffsets = true)
 
     val logToClean = LogToClean(new TopicPartition("test", 0), log, log.activeSegment.baseOffset, log.activeSegment.baseOffset)
 
@@ -294,9 +294,9 @@ class LogCleanerTest extends JUnitSuite {
     val log = makeLog(config = LogConfig.fromProps(logConfig.originals, logProps))
 
     // create 6 segments with only one message in each segment
-    val messageSet = TestUtils.singletonRecords(value = Array.fill[Byte](25)(0), key = 1.toString.getBytes)
+    def createRecords = TestUtils.singletonRecords(value = Array.fill[Byte](25)(0), key = 1.toString.getBytes)
     for (_ <- 0 until 6)
-      log.append(messageSet, assignOffsets = true)
+      log.append(createRecords, assignOffsets = true)
 
     // segments [0,1] are clean; segments [2, 3] are cleanable; segments [4,5] are uncleanable
     val segs = log.logSegments.toSeq
@@ -819,18 +819,19 @@ class LogCleanerTest extends JUnitSuite {
 
 
   def record(key: Int, value: Int, pid: Long = RecordBatch.NO_PRODUCER_ID, epoch: Short = RecordBatch.NO_PRODUCER_EPOCH,
-             sequence: Int = RecordBatch.NO_SEQUENCE): MemoryRecords = {
-    MemoryRecords.withRecords(0L, CompressionType.NONE, pid, epoch, sequence,
-      new SimpleRecord(key.toString.getBytes, value.toString.getBytes))
+             sequence: Int = RecordBatch.NO_SEQUENCE,
+             partitionLeaderEpoch: Int = RecordBatch.NO_PARTITION_LEADER_EPOCH): MemoryRecords = {
+    MemoryRecords.withRecords(RecordBatch.CURRENT_MAGIC_VALUE, 0L, CompressionType.NONE, pid, epoch, sequence,
+      partitionLeaderEpoch, new SimpleRecord(key.toString.getBytes, value.toString.getBytes))
   }
 
-  def record(key: Int, value: Array[Byte]) =
+  def record(key: Int, value: Array[Byte]): MemoryRecords =
     TestUtils.singletonRecords(key = key.toString.getBytes, value = value)
 
-  def unkeyedRecord(value: Int) =
+  def unkeyedRecord(value: Int): MemoryRecords =
     TestUtils.singletonRecords(value = value.toString.getBytes)
 
-  def tombstoneRecord(key: Int) = record(key, null)
+  def tombstoneRecord(key: Int): MemoryRecords = record(key, null)
 
 }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/5cf64f06/core/src/test/scala/unit/kafka/log/LogTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/LogTest.scala b/core/src/test/scala/unit/kafka/log/LogTest.scala
index 4fcf1c3..b61f261 100755
--- a/core/src/test/scala/unit/kafka/log/LogTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogTest.scala
@@ -24,7 +24,6 @@ import java.util.Properties
 import org.apache.kafka.common.errors._
 import kafka.api.ApiVersion
 import org.junit.Assert._
-import org.scalatest.junit.JUnitSuite
 import org.junit.{After, Before, Test}
 import kafka.utils._
 import kafka.server.KafkaConfig
@@ -37,7 +36,7 @@ import org.easymock.EasyMock._
 import scala.collection.JavaConverters._
 import scala.collection.mutable.ListBuffer
 
-class LogTest extends JUnitSuite {
+class LogTest {
 
   val tmpDir = TestUtils.tempDir()
   val logDir = TestUtils.randomPartitionLogDir(tmpDir)
@@ -69,7 +68,7 @@ class LogTest extends JUnitSuite {
    */
   @Test
   def testTimeBasedLogRoll() {
-    val set = TestUtils.singletonRecords("test".getBytes)
+    def createRecords = TestUtils.singletonRecords("test".getBytes)
 
     val logProps = new Properties()
     logProps.put(LogConfig.SegmentMsProp, 1 * 60 * 60L: java.lang.Long)
@@ -86,36 +85,36 @@ class LogTest extends JUnitSuite {
     assertEquals("Log begins with a single empty segment.", 1, log.numberOfSegments)
     // Test the segment rolling behavior when messages do not have a timestamp.
     time.sleep(log.config.segmentMs + 1)
-    log.append(set)
+    log.append(createRecords)
     assertEquals("Log doesn't roll if doing so creates an empty segment.", 1, log.numberOfSegments)
 
-    log.append(set)
+    log.append(createRecords)
     assertEquals("Log rolls on this append since time has expired.", 2, log.numberOfSegments)
 
-    for(numSegments <- 3 until 5) {
+    for (numSegments <- 3 until 5) {
       time.sleep(log.config.segmentMs + 1)
-      log.append(set)
+      log.append(createRecords)
       assertEquals("Changing time beyond rollMs and appending should create a new segment.", numSegments, log.numberOfSegments)
     }
 
-    // Append a message with timestamp to a segment whose first messgae do not have a timestamp.
-    val setWithTimestamp =
-      TestUtils.singletonRecords(value = "test".getBytes, timestamp = time.milliseconds + log.config.segmentMs + 1)
-    log.append(setWithTimestamp)
+    // Append a message with timestamp to a segment whose first message do not have a timestamp.
+    val timestamp = time.milliseconds + log.config.segmentMs + 1
+    def createRecordsWithTimestamp = TestUtils.singletonRecords(value = "test".getBytes, timestamp = timestamp)
+    log.append(createRecordsWithTimestamp)
     assertEquals("Segment should not have been rolled out because the log rolling should be based on wall clock.", 4, log.numberOfSegments)
 
     // Test the segment rolling behavior when messages have timestamps.
     time.sleep(log.config.segmentMs + 1)
-    log.append(setWithTimestamp)
+    log.append(createRecordsWithTimestamp)
     assertEquals("A new segment should have been rolled out", 5, log.numberOfSegments)
 
     // move the wall clock beyond log rolling time
     time.sleep(log.config.segmentMs + 1)
-    log.append(setWithTimestamp)
+    log.append(createRecordsWithTimestamp)
     assertEquals("Log should not roll because the roll should depend on timestamp of the first message.", 5, log.numberOfSegments)
 
-    val setWithExpiredTimestamp = TestUtils.singletonRecords(value = "test".getBytes, timestamp = time.milliseconds)
-    log.append(setWithExpiredTimestamp)
+    val recordWithExpiredTimestamp = TestUtils.singletonRecords(value = "test".getBytes, timestamp = time.milliseconds)
+    log.append(recordWithExpiredTimestamp)
     assertEquals("Log should roll because the timestamp in the message should make the log segment expire.", 6, log.numberOfSegments)
 
     val numSegments = log.numberOfSegments
@@ -161,66 +160,67 @@ class LogTest extends JUnitSuite {
 
     var seq = 0
     // Pad the beginning of the log.
-    for (i <- 0 to 5) {
+    for (_ <- 0 to 5) {
       val record = TestUtils.records(List(new SimpleRecord(time.milliseconds, "key".getBytes, "value".getBytes)),
         pid = pid, epoch = epoch, sequence = seq)
       log.append(record, assignOffsets = true)
       seq = seq + 1
     }
     // Append an entry with multiple log records.
-    var record = TestUtils.records(List(
+    def createRecords = TestUtils.records(List(
       new SimpleRecord(time.milliseconds, s"key-$seq".getBytes, s"value-$seq".getBytes),
       new SimpleRecord(time.milliseconds, s"key-$seq".getBytes, s"value-$seq".getBytes),
       new SimpleRecord(time.milliseconds, s"key-$seq".getBytes, s"value-$seq".getBytes)
     ), pid = pid, epoch = epoch, sequence = seq)
-    val multiEntryAppendInfo = log.append(record, assignOffsets = true)
+    val multiEntryAppendInfo = log.append(createRecords, assignOffsets = true)
     assertEquals("should have appended 3 entries", multiEntryAppendInfo.lastOffset - multiEntryAppendInfo.firstOffset + 1, 3)
-    seq = seq + 3
 
     // Append a Duplicate of the tail, when the entry at the tail has multiple records.
-    val dupMultiEntryAppendInfo = log.append(record, assignOffsets = true)
+    val dupMultiEntryAppendInfo = log.append(createRecords, assignOffsets = true)
     assertEquals("Somehow appended a duplicate entry with multiple log records to the tail",
       multiEntryAppendInfo.firstOffset, dupMultiEntryAppendInfo.firstOffset)
     assertEquals("Somehow appended a duplicate entry with multiple log records to the tail",
       multiEntryAppendInfo.lastOffset, dupMultiEntryAppendInfo.lastOffset)
 
+    seq = seq + 3
+
     // Append a partial duplicate of the tail. This is not allowed.
     try {
-      record = TestUtils.records(
+      val records = TestUtils.records(
         List(
           new SimpleRecord(time.milliseconds, s"key-$seq".getBytes, s"value-$seq".getBytes),
           new SimpleRecord(time.milliseconds, s"key-$seq".getBytes, s"value-$seq".getBytes)),
         pid = pid, epoch = epoch, sequence = seq - 2)
-      log.append(record, assignOffsets = true)
-      fail ("Should have received an OutOfOrderSequenceException since we attempted to append a duplicate of a record " +
+      log.append(records, assignOffsets = true)
+      fail ("Should have received an OutOfOrderSequenceException since we attempted to append a duplicate of a records " +
         "in the middle of the log.")
     } catch {
-      case e: OutOfOrderSequenceException => // Good!
+      case _: OutOfOrderSequenceException => // Good!
     }
 
     // Append a Duplicate of an entry in the middle of the log. This is not allowed.
      try {
-      record = TestUtils.records(
+      val records = TestUtils.records(
         List(new SimpleRecord(time.milliseconds, s"key-1".getBytes, s"value-1".getBytes)),
         pid = pid, epoch = epoch, sequence = 1)
-      log.append(record, assignOffsets = true)
-      fail ("Should have received an OutOfOrderSequenceException since we attempted to append a duplicate of a record " +
+      log.append(records, assignOffsets = true)
+      fail ("Should have received an OutOfOrderSequenceException since we attempted to append a duplicate of a records " +
         "in the middle of the log.")
     } catch {
-      case e: OutOfOrderSequenceException => // Good!
+      case _: OutOfOrderSequenceException => // Good!
     }
 
-    // Append a duplicate entry with a single record at the tail of the log. This should return the appendInfo of the original entry.
-    record = TestUtils.records(List(new SimpleRecord(time.milliseconds, "key".getBytes, "value".getBytes)),
+    // Append a duplicate entry with a single records at the tail of the log. This should return the appendInfo of the original entry.
+    def createRecordsWithDuplicate = TestUtils.records(List(new SimpleRecord(time.milliseconds, "key".getBytes, "value".getBytes)),
       pid = pid, epoch = epoch, sequence = seq)
-    val origAppendInfo = log.append(record, assignOffsets = true)
-    val newAppendInfo = log.append(record, assignOffsets = true)
-    assertEquals("Inserted a duplicate record into the log", origAppendInfo.firstOffset, newAppendInfo.firstOffset)
-    assertEquals("Inserted a duplicate record into the log", origAppendInfo.lastOffset, newAppendInfo.lastOffset)
+    val origAppendInfo = log.append(createRecordsWithDuplicate, assignOffsets = true)
+    val newAppendInfo = log.append(createRecordsWithDuplicate, assignOffsets = true)
+    assertEquals("Inserted a duplicate records into the log", origAppendInfo.firstOffset, newAppendInfo.firstOffset)
+    assertEquals("Inserted a duplicate records into the log", origAppendInfo.lastOffset, newAppendInfo.lastOffset)
   }
 
   @Test
-  def testMulitplePidsPerMemoryRecord() : Unit = {
+  def testMultiplePidsPerMemoryRecord() : Unit = {
     val logProps = new Properties()
 
     // create a log
@@ -374,8 +374,8 @@ class LogTest extends JUnitSuite {
    */
   @Test
   def testSizeBasedLogRoll() {
-    val set = TestUtils.singletonRecords(value = "test".getBytes, timestamp = time.milliseconds)
-    val setSize = set.sizeInBytes
+    def createRecords = TestUtils.singletonRecords(value = "test".getBytes, timestamp = time.milliseconds)
+    val setSize = createRecords.sizeInBytes
     val msgPerSeg = 10
     val segmentSize = msgPerSeg * (setSize - 1) // each segment will be 10 messages
 
@@ -388,9 +388,8 @@ class LogTest extends JUnitSuite {
     assertEquals("There should be exactly 1 segment.", 1, log.numberOfSegments)
 
     // segments expire in size
-    for (_ <- 1 to (msgPerSeg + 1)) {
-      log.append(set)
-    }
+    for (_ <- 1 to (msgPerSeg + 1))
+      log.append(createRecords)
     assertEquals("There should be exactly 2 segments.", 2, log.numberOfSegments)
   }
 
@@ -949,8 +948,8 @@ class LogTest extends JUnitSuite {
    */
   @Test
   def testTruncateTo() {
-    val set = TestUtils.singletonRecords(value = "test".getBytes, timestamp = time.milliseconds)
-    val setSize = set.sizeInBytes
+    def createRecords = TestUtils.singletonRecords(value = "test".getBytes, timestamp = time.milliseconds)
+    val setSize = createRecords.sizeInBytes
     val msgPerSeg = 10
     val segmentSize = msgPerSeg * setSize  // each segment will be 10 messages
 
@@ -962,7 +961,7 @@ class LogTest extends JUnitSuite {
     assertEquals("There should be exactly 1 segment.", 1, log.numberOfSegments)
 
     for (_ <- 1 to msgPerSeg)
-      log.append(set)
+      log.append(createRecords)
 
     assertEquals("There should be exactly 1 segments.", 1, log.numberOfSegments)
     assertEquals("Log end offset should be equal to number of messages", msgPerSeg, log.logEndOffset)
@@ -983,7 +982,7 @@ class LogTest extends JUnitSuite {
     assertEquals("Should change log size", 0, log.size)
 
     for (_ <- 1 to msgPerSeg)
-      log.append(set)
+      log.append(createRecords)
 
     assertEquals("Should be back to original offset", log.logEndOffset, lastOffset)
     assertEquals("Should be back to original size", log.size, size)
@@ -992,7 +991,7 @@ class LogTest extends JUnitSuite {
     assertEquals("Should change log size", log.size, 0)
 
     for (_ <- 1 to msgPerSeg)
-      log.append(set)
+      log.append(createRecords)
 
     assertTrue("Should be ahead of to original offset", log.logEndOffset > msgPerSeg)
     assertEquals("log size should be same as before", size, log.size)
@@ -1050,9 +1049,9 @@ class LogTest extends JUnitSuite {
     val bogusIndex2 = Log.indexFilename(logDir, 5)
     val bogusTimeIndex2 = Log.timeIndexFilename(logDir, 5)
 
-    val set = TestUtils.singletonRecords(value = "test".getBytes, timestamp = time.milliseconds)
+    def createRecords = TestUtils.singletonRecords(value = "test".getBytes, timestamp = time.milliseconds)
     val logProps = new Properties()
-    logProps.put(LogConfig.SegmentBytesProp, set.sizeInBytes * 5: java.lang.Integer)
+    logProps.put(LogConfig.SegmentBytesProp, createRecords.sizeInBytes * 5: java.lang.Integer)
     logProps.put(LogConfig.SegmentIndexBytesProp, 1000: java.lang.Integer)
     logProps.put(LogConfig.IndexIntervalBytesProp, 1: java.lang.Integer)
     val log = new Log(logDir,
@@ -1069,7 +1068,7 @@ class LogTest extends JUnitSuite {
 
     // check that we can append to the log
     for (_ <- 0 until 10)
-      log.append(set)
+      log.append(createRecords)
 
     log.delete()
   }
@@ -1079,9 +1078,9 @@ class LogTest extends JUnitSuite {
    */
   @Test
   def testReopenThenTruncate() {
-    val set = TestUtils.singletonRecords(value = "test".getBytes, timestamp = time.milliseconds)
+    def createRecords = TestUtils.singletonRecords(value = "test".getBytes, timestamp = time.milliseconds)
     val logProps = new Properties()
-    logProps.put(LogConfig.SegmentBytesProp, set.sizeInBytes * 5: java.lang.Integer)
+    logProps.put(LogConfig.SegmentBytesProp, createRecords.sizeInBytes * 5: java.lang.Integer)
     logProps.put(LogConfig.SegmentIndexBytesProp, 1000: java.lang.Integer)
     logProps.put(LogConfig.IndexIntervalBytesProp, 10000: java.lang.Integer)
     val config = LogConfig(logProps)
@@ -1096,7 +1095,7 @@ class LogTest extends JUnitSuite {
 
     // add enough messages to roll over several segments then close and re-open and attempt to truncate
     for (_ <- 0 until 100)
-      log.append(set)
+      log.append(createRecords)
     log.close()
     log = new Log(logDir,
                   config,
@@ -1114,10 +1113,10 @@ class LogTest extends JUnitSuite {
    */
   @Test
   def testAsyncDelete() {
-    val set = TestUtils.singletonRecords(value = "test".getBytes, timestamp = time.milliseconds - 1000L)
+    def createRecords = TestUtils.singletonRecords(value = "test".getBytes, timestamp = time.milliseconds - 1000L)
     val asyncDeleteMs = 1000
     val logProps = new Properties()
-    logProps.put(LogConfig.SegmentBytesProp, set.sizeInBytes * 5: java.lang.Integer)
+    logProps.put(LogConfig.SegmentBytesProp, createRecords.sizeInBytes * 5: java.lang.Integer)
     logProps.put(LogConfig.SegmentIndexBytesProp, 1000: java.lang.Integer)
     logProps.put(LogConfig.IndexIntervalBytesProp, 10000: java.lang.Integer)
     logProps.put(LogConfig.FileDeleteDelayMsProp, asyncDeleteMs: java.lang.Integer)
@@ -1133,7 +1132,7 @@ class LogTest extends JUnitSuite {
 
     // append some messages to create some segments
     for (_ <- 0 until 100)
-      log.append(set)
+      log.append(createRecords)
 
     // files should be renamed
     val segments = log.logSegments.toArray
@@ -1159,9 +1158,9 @@ class LogTest extends JUnitSuite {
    */
   @Test
   def testOpenDeletesObsoleteFiles() {
-    val set = TestUtils.singletonRecords(value = "test".getBytes, timestamp = time.milliseconds - 1000)
+    def createRecords = TestUtils.singletonRecords(value = "test".getBytes, timestamp = time.milliseconds - 1000)
     val logProps = new Properties()
-    logProps.put(LogConfig.SegmentBytesProp, set.sizeInBytes * 5: java.lang.Integer)
+    logProps.put(LogConfig.SegmentBytesProp, createRecords.sizeInBytes * 5: java.lang.Integer)
     logProps.put(LogConfig.SegmentIndexBytesProp, 1000: java.lang.Integer)
     logProps.put(LogConfig.RetentionMsProp, 999: java.lang.Integer)
     val config = LogConfig(logProps)
@@ -1174,7 +1173,7 @@ class LogTest extends JUnitSuite {
 
     // append some messages to create some segments
     for (_ <- 0 until 100)
-      log.append(set)
+      log.append(createRecords)
 
     // expire all segments
     log.deleteOldSegments()
@@ -1237,7 +1236,7 @@ class LogTest extends JUnitSuite {
     logProps.put(LogConfig.IndexIntervalBytesProp, 1: java.lang.Integer)
     logProps.put(LogConfig.MaxMessageBytesProp, 64*1024: java.lang.Integer)
     val config = LogConfig(logProps)
-    val set = TestUtils.singletonRecords(value = "test".getBytes, timestamp = time.milliseconds)
+    def createRecords = TestUtils.singletonRecords(value = "test".getBytes, timestamp = time.milliseconds)
     val recoveryPoint = 50L
     for (_ <- 0 until 10) {
       // create a log and write some messages to it
@@ -1250,7 +1249,7 @@ class LogTest extends JUnitSuite {
                         time = time)
       val numMessages = 50 + TestUtils.random.nextInt(50)
       for (_ <- 0 until numMessages)
-        log.append(set)
+        log.append(createRecords)
       val records = log.logSegments.flatMap(_.log.records.asScala.toList).toList
       log.close()
 
@@ -1326,7 +1325,7 @@ class LogTest extends JUnitSuite {
     logProps.put(LogConfig.MaxMessageBytesProp, 64*1024: java.lang.Integer)
     logProps.put(LogConfig.IndexIntervalBytesProp, 1: java.lang.Integer)
     val config = LogConfig(logProps)
-    val set = TestUtils.singletonRecords(value = "test".getBytes, timestamp = time.milliseconds)
+    def createRecords = TestUtils.singletonRecords(value = "test".getBytes, timestamp = time.milliseconds)
     val parentLogDir = logDir.getParentFile
     assertTrue("Data directory %s must exist", parentLogDir.isDirectory)
     val cleanShutdownFile = new File(parentLogDir, Log.CleanShutdownFile)
@@ -1341,7 +1340,7 @@ class LogTest extends JUnitSuite {
       scheduler = time.scheduler,
       time = time)
     for (_ <- 0 until 100)
-      log.append(set)
+      log.append(createRecords)
     log.close()
 
     // check if recovery was attempted. Even if the recovery point is 0L, recovery should not be attempted as the
@@ -1428,9 +1427,9 @@ class LogTest extends JUnitSuite {
 
   @Test
   def testDeleteOldSegmentsMethod() {
-    val set = TestUtils.singletonRecords(value = "test".getBytes, timestamp = time.milliseconds - 1000)
+    def createRecords = TestUtils.singletonRecords(value = "test".getBytes, timestamp = time.milliseconds - 1000)
     val logProps = new Properties()
-    logProps.put(LogConfig.SegmentBytesProp, set.sizeInBytes * 5: java.lang.Integer)
+    logProps.put(LogConfig.SegmentBytesProp, createRecords.sizeInBytes * 5: java.lang.Integer)
     logProps.put(LogConfig.SegmentIndexBytesProp, 1000: java.lang.Integer)
     logProps.put(LogConfig.RetentionMsProp, 999: java.lang.Integer)
     val config = LogConfig(logProps)
@@ -1443,7 +1442,7 @@ class LogTest extends JUnitSuite {
 
     // append some messages to create some segments
     for (_ <- 0 until 100)
-      log.append(set)
+      log.append(createRecords)
 
     log.leaderEpochCache.assign(0, 40)
     log.leaderEpochCache.assign(1, 90)
@@ -1456,7 +1455,7 @@ class LogTest extends JUnitSuite {
 
     // append some messages to create some segments
     for (_ <- 0 until 100)
-      log.append(set)
+      log.append(createRecords)
 
     log.delete()
     assertEquals("The number of segments should be 0", 0, log.numberOfSegments)
@@ -1467,11 +1466,11 @@ class LogTest extends JUnitSuite {
 
   @Test
   def testLogDeletionAfterDeleteRecords() {
-    val set = TestUtils.singletonRecords("test".getBytes)
-    val log = createLog(set.sizeInBytes)
+    def createRecords = TestUtils.singletonRecords("test".getBytes)
+    val log = createLog(createRecords.sizeInBytes)
 
     for (_ <- 0 until 15)
-      log.append(set)
+      log.append(createRecords)
     assertEquals("should have 3 segments", 3, log.numberOfSegments)
     assertEquals(log.logStartOffset, 0)
 
@@ -1497,12 +1496,12 @@ class LogTest extends JUnitSuite {
 
   @Test
   def shouldDeleteSizeBasedSegments() {
-    val set = TestUtils.singletonRecords("test".getBytes)
-    val log = createLog(set.sizeInBytes, retentionBytes = set.sizeInBytes * 10)
+    def createRecords = TestUtils.singletonRecords("test".getBytes)
+    val log = createLog(createRecords.sizeInBytes, retentionBytes = createRecords.sizeInBytes * 10)
 
     // append some messages to create some segments
     for (_ <- 0 until 15)
-      log.append(set)
+      log.append(createRecords)
 
     log.deleteOldSegments
     assertEquals("should have 2 segments", 2,log.numberOfSegments)
@@ -1510,12 +1509,12 @@ class LogTest extends JUnitSuite {
 
   @Test
   def shouldNotDeleteSizeBasedSegmentsWhenUnderRetentionSize() {
-    val set = TestUtils.singletonRecords("test".getBytes)
-    val log = createLog(set.sizeInBytes, retentionBytes = set.sizeInBytes * 15)
+    def createRecords = TestUtils.singletonRecords("test".getBytes)
+    val log = createLog(createRecords.sizeInBytes, retentionBytes = createRecords.sizeInBytes * 15)
 
     // append some messages to create some segments
     for (_ <- 0 until 15)
-      log.append(set)
+      log.append(createRecords)
 
     log.deleteOldSegments
     assertEquals("should have 3 segments", 3,log.numberOfSegments)
@@ -1523,12 +1522,12 @@ class LogTest extends JUnitSuite {
 
   @Test
   def shouldDeleteTimeBasedSegmentsReadyToBeDeleted() {
-    val set = TestUtils.singletonRecords("test".getBytes, timestamp = 10)
-    val log = createLog(set.sizeInBytes, retentionMs = 10000)
+    def createRecords = TestUtils.singletonRecords("test".getBytes, timestamp = 10)
+    val log = createLog(createRecords.sizeInBytes, retentionMs = 10000)
 
     // append some messages to create some segments
     for (_ <- 0 until 15)
-      log.append(set)
+      log.append(createRecords)
 
     log.deleteOldSegments()
     assertEquals("There should be 1 segment remaining", 1, log.numberOfSegments)
@@ -1536,12 +1535,12 @@ class LogTest extends JUnitSuite {
 
   @Test
   def shouldNotDeleteTimeBasedSegmentsWhenNoneReadyToBeDeleted() {
-    val set = TestUtils.singletonRecords("test".getBytes, timestamp = time.milliseconds)
-    val log = createLog(set.sizeInBytes, retentionMs = 10000000)
+    def createRecords = TestUtils.singletonRecords("test".getBytes, timestamp = time.milliseconds)
+    val log = createLog(createRecords.sizeInBytes, retentionMs = 10000000)
 
     // append some messages to create some segments
     for (_ <- 0 until 15)
-      log.append(set)
+      log.append(createRecords)
 
     log.deleteOldSegments()
     assertEquals("There should be 3 segments remaining", 3, log.numberOfSegments)
@@ -1549,14 +1548,14 @@ class LogTest extends JUnitSuite {
 
   @Test
   def shouldNotDeleteSegmentsWhenPolicyDoesNotIncludeDelete() {
-    val set = TestUtils.singletonRecords("test".getBytes, key = "test".getBytes(), timestamp = 10L)
-    val log = createLog(set.sizeInBytes,
+    def createRecords = TestUtils.singletonRecords("test".getBytes, key = "test".getBytes(), timestamp = 10L)
+    val log = createLog(createRecords.sizeInBytes,
       retentionMs = 10000,
       cleanupPolicy = "compact")
 
     // append some messages to create some segments
     for (_ <- 0 until 15)
-      log.append(set)
+      log.append(createRecords)
 
     // mark oldest segment as older the retention.ms
     log.logSegments.head.lastModified = time.milliseconds - 20000
@@ -1568,14 +1567,14 @@ class LogTest extends JUnitSuite {
 
   @Test
   def shouldDeleteSegmentsReadyToBeDeletedWhenCleanupPolicyIsCompactAndDelete() {
-    val set = TestUtils.singletonRecords("test".getBytes, key = "test".getBytes,timestamp = 10L)
-    val log = createLog(set.sizeInBytes,
+    def createRecords = TestUtils.singletonRecords("test".getBytes, key = "test".getBytes,timestamp = 10L)
+    val log = createLog(createRecords.sizeInBytes,
       retentionMs = 10000,
       cleanupPolicy = "compact,delete")
 
     // append some messages to create some segments
     for (_ <- 0 until 15)
-      log.append(set)
+      log.append(createRecords)
 
     log.deleteOldSegments()
     assertEquals("There should be 1 segment remaining", 1, log.numberOfSegments)
@@ -1583,25 +1582,24 @@ class LogTest extends JUnitSuite {
 
   @Test
   def shouldApplyEpochToMessageOnAppendIfLeader() {
-    val messageIds = (0 until 50).toArray
-    val records = messageIds.map(id => new SimpleRecord(id.toString.getBytes))
+    val records = (0 until 50).toArray.map(id => new SimpleRecord(id.toString.getBytes))
 
     //Given this partition is on leader epoch 72
     val epoch = 72
     val log = new Log(logDir, LogConfig(), recoveryPoint = 0L, scheduler = time.scheduler, time = time)
 
     //When appending messages as a leader (i.e. assignOffsets = true)
-    for (i <- records.indices)
+    for (record <- records)
       log.append(
-        MemoryRecords.withRecords(messageIds(i), CompressionType.NONE, records(i)),
+        MemoryRecords.withRecords(CompressionType.NONE, record),
         leaderEpochCache = mockCache(epoch),
         assignOffsets = true
       )
 
     //Then leader epoch should be set on messages
     for (i <- records.indices) {
-      val read = log.read(i, 100, Some(i+1)).records.batches().iterator.next()
-      assertEquals("Should have set leader epoch", 72, read.partitionLeaderEpoch())
+      val read = log.read(i, 100, Some(i+1)).records.batches.iterator.next()
+      assertEquals("Should have set leader epoch", 72, read.partitionLeaderEpoch)
     }
   }
 
@@ -1615,7 +1613,7 @@ class LogTest extends JUnitSuite {
     //Given each message has an offset & epoch, as msgs from leader would
     def recordsForEpoch(i: Int): MemoryRecords = {
       val recs = MemoryRecords.withRecords(messageIds(i), CompressionType.NONE, records(i))
-      recs.batches().asScala.foreach{record =>
+      recs.batches.asScala.foreach{record =>
         record.setPartitionLeaderEpoch(42)
         record.setLastOffset(i)
       }
@@ -1623,7 +1621,7 @@ class LogTest extends JUnitSuite {
     }
 
     //Verify we save the epoch to the cache.
-    expect(cache.assign(EasyMock.eq(42), anyInt())).times(records.size)
+    expect(cache.assign(EasyMock.eq(42), anyInt)).times(records.size)
     replay(cache)
 
     val log = new Log(logDir, LogConfig(), recoveryPoint = 0L, scheduler = time.scheduler, time = time)
@@ -1637,13 +1635,13 @@ class LogTest extends JUnitSuite {
 
   @Test
   def shouldTruncateLeaderEpochsWhenDeletingSegments() {
-    val set = TestUtils.singletonRecords("test".getBytes)
-    val log = createLog(set.sizeInBytes, retentionBytes = set.sizeInBytes * 10)
+    def createRecords = TestUtils.singletonRecords("test".getBytes)
+    val log = createLog(createRecords.sizeInBytes, retentionBytes = createRecords.sizeInBytes * 10)
     val cache = epochCache(log)
 
     // Given three segments of 5 messages each
     for (e <- 0 until 15) {
-      log.append(set)
+      log.append(createRecords)
     }
 
     //Given epochs
@@ -1660,13 +1658,13 @@ class LogTest extends JUnitSuite {
 
   @Test
   def shouldUpdateOffsetForLeaderEpochsWhenDeletingSegments() {
-    val set = TestUtils.singletonRecords("test".getBytes)
-    val log = createLog(set.sizeInBytes, retentionBytes = set.sizeInBytes * 10)
+    def createRecords = TestUtils.singletonRecords("test".getBytes)
+    val log = createLog(createRecords.sizeInBytes, retentionBytes = createRecords.sizeInBytes * 10)
     val cache = epochCache(log)
 
     // Given three segments of 5 messages each
     for (e <- 0 until 15) {
-      log.append(set)
+      log.append(createRecords)
     }
 
     //Given epochs
@@ -1683,14 +1681,14 @@ class LogTest extends JUnitSuite {
 
   @Test
   def shouldTruncateLeaderEpochFileWhenTruncatingLog() {
-    val set = TestUtils.singletonRecords(value = "test".getBytes, timestamp = time.milliseconds)
-    val logProps = CoreUtils.propsWith(LogConfig.SegmentBytesProp, (10 * set.sizeInBytes).toString)
+    def createRecords = TestUtils.singletonRecords(value = "test".getBytes, timestamp = time.milliseconds)
+    val logProps = CoreUtils.propsWith(LogConfig.SegmentBytesProp, (10 * createRecords.sizeInBytes).toString)
     val log = new Log(logDir, LogConfig( logProps), recoveryPoint = 0L, scheduler = time.scheduler, time = time)
     val cache = epochCache(log)
 
     //Given 2 segments, 10 messages per segment
     for (epoch <- 1 to 20)
-      log.append(set)
+      log.append(createRecords)
 
     //Simulate some leader changes at specific offsets
     cache.assign(0, 0)
@@ -1704,25 +1702,25 @@ class LogTest extends JUnitSuite {
     log.truncateTo(log.logEndOffset)
 
     //Then no change
-    assertEquals(3, cache.epochEntries().size)
+    assertEquals(3, cache.epochEntries.size)
 
     //When truncate
     log.truncateTo(11)
 
     //Then no change
-    assertEquals(2, cache.epochEntries().size)
+    assertEquals(2, cache.epochEntries.size)
 
     //When truncate
     log.truncateTo(10)
 
     //Then
-    assertEquals(1, cache.epochEntries().size)
+    assertEquals(1, cache.epochEntries.size)
 
     //When truncate all
     log.truncateTo(0)
 
     //Then
-    assertEquals(0, cache.epochEntries().size)
+    assertEquals(0, cache.epochEntries.size)
   }
 
   /**
@@ -1800,7 +1798,7 @@ class LogTest extends JUnitSuite {
 
   private def mockCache(epoch: Int) = {
     val cache = EasyMock.createNiceMock(classOf[LeaderEpochCache])
-    EasyMock.expect(cache.latestUsedEpoch()).andReturn(epoch).anyTimes()
+    EasyMock.expect(cache.latestUsedEpoch).andReturn(epoch).anyTimes
     EasyMock.replay(cache)
     cache
   }