You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2016/12/13 18:41:33 UTC

[6/9] kafka git commit: KAFKA-4390; Replace MessageSet usage with client-side alternatives

http://git-wip-us.apache.org/repos/asf/kafka/blob/67f1e5b9/clients/src/test/java/org/apache/kafka/common/record/RecordTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/record/RecordTest.java b/clients/src/test/java/org/apache/kafka/common/record/RecordTest.java
index 6482529..551d820 100644
--- a/clients/src/test/java/org/apache/kafka/common/record/RecordTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/record/RecordTest.java
@@ -35,18 +35,22 @@ import org.junit.runners.Parameterized.Parameters;
 @RunWith(value = Parameterized.class)
 public class RecordTest {
 
+    private byte magic;
     private long timestamp;
     private ByteBuffer key;
     private ByteBuffer value;
     private CompressionType compression;
+    private TimestampType timestampType;
     private Record record;
 
-    public RecordTest(long timestamp, byte[] key, byte[] value, CompressionType compression) {
+    public RecordTest(byte magic, long timestamp, byte[] key, byte[] value, CompressionType compression) {
+        this.magic = magic;
         this.timestamp = timestamp;
+        this.timestampType = TimestampType.CREATE_TIME;
         this.key = key == null ? null : ByteBuffer.wrap(key);
         this.value = value == null ? null : ByteBuffer.wrap(value);
         this.compression = compression;
-        this.record = new Record(timestamp, key, value, compression);
+        this.record = Record.create(magic, timestamp, key, value, compression, timestampType);
     }
 
     @Test
@@ -56,22 +60,33 @@ public class RecordTest {
         assertEquals(key, record.key());
         if (key != null)
             assertEquals(key.limit(), record.keySize());
-        assertEquals(Record.CURRENT_MAGIC_VALUE, record.magic());
+        assertEquals(magic, record.magic());
         assertEquals(value, record.value());
         if (value != null)
             assertEquals(value.limit(), record.valueSize());
+        if (magic > 0) {
+            assertEquals(timestamp, record.timestamp());
+            assertEquals(timestampType, record.timestampType());
+        } else {
+            assertEquals(Record.NO_TIMESTAMP, record.timestamp());
+            assertEquals(TimestampType.NO_TIMESTAMP_TYPE, record.timestampType());
+        }
     }
 
     @Test
     public void testChecksum() {
         assertEquals(record.checksum(), record.computeChecksum());
+
+        byte attributes = Record.computeAttributes(magic, this.compression, TimestampType.CREATE_TIME);
         assertEquals(record.checksum(), Record.computeChecksum(
-            this.timestamp,
-            this.key == null ? null : this.key.array(),
-            this.value == null ? null : this.value.array(),
-            this.compression, 0, -1));
+                magic,
+                attributes,
+                this.timestamp,
+                this.key == null ? null : this.key.array(),
+                this.value == null ? null : this.value.array()
+        ));
         assertTrue(record.isValid());
-        for (int i = Record.CRC_OFFSET + Record.CRC_LENGTH; i < record.size(); i++) {
+        for (int i = Record.CRC_OFFSET + Record.CRC_LENGTH; i < record.sizeInBytes(); i++) {
             Record copy = copyOf(record);
             copy.buffer().put(i, (byte) 69);
             assertFalse(copy.isValid());
@@ -85,7 +100,7 @@ public class RecordTest {
     }
 
     private Record copyOf(Record record) {
-        ByteBuffer buffer = ByteBuffer.allocate(record.size());
+        ByteBuffer buffer = ByteBuffer.allocate(record.sizeInBytes());
         record.buffer().put(buffer);
         buffer.rewind();
         record.buffer().rewind();
@@ -101,12 +116,13 @@ public class RecordTest {
     public static Collection<Object[]> data() {
         byte[] payload = new byte[1000];
         Arrays.fill(payload, (byte) 1);
-        List<Object[]> values = new ArrayList<Object[]>();
-        for (long timestamp : Arrays.asList(Record.NO_TIMESTAMP, 0L, 1L))
-            for (byte[] key : Arrays.asList(null, "".getBytes(), "key".getBytes(), payload))
-                for (byte[] value : Arrays.asList(null, "".getBytes(), "value".getBytes(), payload))
-                    for (CompressionType compression : CompressionType.values())
-                        values.add(new Object[] {timestamp, key, value, compression});
+        List<Object[]> values = new ArrayList<>();
+        for (byte magic : Arrays.asList(Record.MAGIC_VALUE_V0, Record.MAGIC_VALUE_V1))
+            for (long timestamp : Arrays.asList(Record.NO_TIMESTAMP, 0L, 1L))
+                for (byte[] key : Arrays.asList(null, "".getBytes(), "key".getBytes(), payload))
+                    for (byte[] value : Arrays.asList(null, "".getBytes(), "value".getBytes(), payload))
+                        for (CompressionType compression : CompressionType.values())
+                            values.add(new Object[] {magic, timestamp, key, value, compression});
         return values;
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/67f1e5b9/clients/src/test/java/org/apache/kafka/common/record/SimpleRecordTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/record/SimpleRecordTest.java b/clients/src/test/java/org/apache/kafka/common/record/SimpleRecordTest.java
index aabadfe..427c743 100644
--- a/clients/src/test/java/org/apache/kafka/common/record/SimpleRecordTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/record/SimpleRecordTest.java
@@ -20,35 +20,29 @@ import org.junit.Test;
 
 import java.nio.ByteBuffer;
 
+import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
 
 public class SimpleRecordTest {
 
     /* This scenario can happen if the record size field is corrupt and we end up allocating a buffer that is too small */
-    @Test
+    @Test(expected = InvalidRecordException.class)
     public void testIsValidWithTooSmallBuffer() {
         ByteBuffer buffer = ByteBuffer.allocate(2);
         Record record = new Record(buffer);
         assertFalse(record.isValid());
-        try {
-            record.ensureValid();
-            fail("InvalidRecordException should have been thrown");
-        } catch (InvalidRecordException e) { }
+        record.ensureValid();
     }
 
-    @Test
+    @Test(expected = InvalidRecordException.class)
     public void testIsValidWithChecksumMismatch() {
         ByteBuffer buffer = ByteBuffer.allocate(4);
         // set checksum
         buffer.putInt(2);
         Record record = new Record(buffer);
         assertFalse(record.isValid());
-        try {
-            record.ensureValid();
-            fail("InvalidRecordException should have been thrown");
-        } catch (InvalidRecordException e) { }
+        record.ensureValid();
     }
 
     @Test
@@ -63,4 +57,40 @@ public class SimpleRecordTest {
         record.ensureValid();
     }
 
+    @Test
+    public void testConvertFromV0ToV1() {
+        byte[][] keys = new byte[][] {"a".getBytes(), "".getBytes(), null, "b".getBytes()};
+        byte[][] values = new byte[][] {"1".getBytes(), "".getBytes(), "2".getBytes(), null};
+
+        for (int i = 0; i < keys.length; i++) {
+            Record record = Record.create(Record.MAGIC_VALUE_V0, Record.NO_TIMESTAMP, keys[i], values[i]);
+            Record converted = record.convert(Record.MAGIC_VALUE_V1);
+
+            assertEquals(Record.MAGIC_VALUE_V1, converted.magic());
+            assertEquals(Record.NO_TIMESTAMP, converted.timestamp());
+            assertEquals(record.key(), converted.key());
+            assertEquals(record.value(), converted.value());
+            assertTrue(record.isValid());
+            assertEquals(record.convertedSize(Record.MAGIC_VALUE_V1), converted.sizeInBytes());
+        }
+    }
+
+    @Test
+    public void testConvertFromV1ToV0() {
+        byte[][] keys = new byte[][] {"a".getBytes(), "".getBytes(), null, "b".getBytes()};
+        byte[][] values = new byte[][] {"1".getBytes(), "".getBytes(), "2".getBytes(), null};
+
+        for (int i = 0; i < keys.length; i++) {
+            Record record = Record.create(Record.MAGIC_VALUE_V1, System.currentTimeMillis(), keys[i], values[i]);
+            Record converted = record.convert(Record.MAGIC_VALUE_V0);
+
+            assertEquals(Record.MAGIC_VALUE_V0, converted.magic());
+            assertEquals(Record.NO_TIMESTAMP, converted.timestamp());
+            assertEquals(record.key(), converted.key());
+            assertEquals(record.value(), converted.value());
+            assertTrue(record.isValid());
+            assertEquals(record.convertedSize(Record.MAGIC_VALUE_V0), converted.sizeInBytes());
+        }
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/67f1e5b9/clients/src/test/java/org/apache/kafka/common/record/TimestampTypeTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/record/TimestampTypeTest.java b/clients/src/test/java/org/apache/kafka/common/record/TimestampTypeTest.java
new file mode 100644
index 0000000..4759715
--- /dev/null
+++ b/clients/src/test/java/org/apache/kafka/common/record/TimestampTypeTest.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+package org.apache.kafka.common.record;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+public class TimestampTypeTest {
+
+    @Test
+    public void toAndFromAttributesCreateTime() {
+        byte attributes = TimestampType.CREATE_TIME.updateAttributes((byte) 0);
+        assertEquals(TimestampType.CREATE_TIME, TimestampType.forAttributes(attributes));
+    }
+
+    @Test
+    public void toAndFromAttributesLogAppendTime() {
+        byte attributes = TimestampType.LOG_APPEND_TIME.updateAttributes((byte) 0);
+        assertEquals(TimestampType.LOG_APPEND_TIME, TimestampType.forAttributes(attributes));
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/67f1e5b9/clients/src/test/java/org/apache/kafka/test/TestUtils.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/test/TestUtils.java b/clients/src/test/java/org/apache/kafka/test/TestUtils.java
index d3280e5..4e80b61 100644
--- a/clients/src/test/java/org/apache/kafka/test/TestUtils.java
+++ b/clients/src/test/java/org/apache/kafka/test/TestUtils.java
@@ -23,8 +23,10 @@ import org.apache.kafka.common.Node;
 import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.record.CompressionType;
 import org.apache.kafka.common.record.MemoryRecords;
+import org.apache.kafka.common.record.MemoryRecordsBuilder;
 import org.apache.kafka.common.record.Record;
 import org.apache.kafka.common.record.Records;
+import org.apache.kafka.common.record.TimestampType;
 import org.apache.kafka.common.utils.Utils;
 
 import javax.xml.bind.DatatypeConverter;
@@ -35,6 +37,7 @@ import java.nio.file.Files;
 import java.nio.file.Path;
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
@@ -46,6 +49,7 @@ import java.util.regex.Pattern;
 
 import static java.util.Arrays.asList;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
@@ -185,13 +189,13 @@ public class TestUtils {
     public static ByteBuffer partitionRecordsBuffer(final long offset, final CompressionType compressionType, final Record... records) {
         int bufferSize = 0;
         for (final Record record : records)
-            bufferSize += Records.LOG_OVERHEAD + record.size();
+            bufferSize += Records.LOG_OVERHEAD + record.sizeInBytes();
         final ByteBuffer buffer = ByteBuffer.allocate(bufferSize);
-        final MemoryRecords memoryRecords = MemoryRecords.emptyRecords(buffer, compressionType);
+        MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, compressionType, TimestampType.CREATE_TIME);
+        long nextOffset = offset;
         for (final Record record : records)
-            memoryRecords.append(offset, record);
-        memoryRecords.close();
-        return memoryRecords.buffer();
+            builder.append(nextOffset++, record);
+        return builder.build().buffer();
     }
 
     public static Properties producerConfig(final String bootstrapServers,
@@ -309,4 +313,22 @@ public class TestUtils {
             fail(clusterId + " cannot be converted back to UUID.");
         }
     }
+
+    /**
+     * Throw an exception if the two iterators are of differing lengths or contain
+     * different messages on their Nth element
+     */
+    public static <T> void checkEquals(Iterator<T> s1, Iterator<T> s2) {
+        while (s1.hasNext() && s2.hasNext())
+            assertEquals(s1.next(), s2.next());
+        assertFalse("Iterators have uneven length--first has more", s1.hasNext());
+        assertFalse("Iterators have uneven length--second has more", s2.hasNext());
+    }
+
+    public static <T> List<T> toList(Iterator<T> iterator) {
+        List<T> res = new ArrayList<>();
+        while (iterator.hasNext())
+            res.add(iterator.next());
+        return res;
+    }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/67f1e5b9/core/src/main/scala/kafka/api/ApiVersion.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/ApiVersion.scala b/core/src/main/scala/kafka/api/ApiVersion.scala
index 895c1b1..4052639 100644
--- a/core/src/main/scala/kafka/api/ApiVersion.scala
+++ b/core/src/main/scala/kafka/api/ApiVersion.scala
@@ -17,7 +17,7 @@
 
 package kafka.api
 
-import kafka.message.Message
+import org.apache.kafka.common.record.Record
 
 /**
  * This class contains the different Kafka versions.
@@ -87,54 +87,54 @@ sealed trait ApiVersion extends Ordered[ApiVersion] {
 // Keep the IDs in order of versions
 case object KAFKA_0_8_0 extends ApiVersion {
   val version: String = "0.8.0.X"
-  val messageFormatVersion: Byte = Message.MagicValue_V0
+  val messageFormatVersion: Byte = Record.MAGIC_VALUE_V0
   val id: Int = 0
 }
 
 case object KAFKA_0_8_1 extends ApiVersion {
   val version: String = "0.8.1.X"
-  val messageFormatVersion: Byte = Message.MagicValue_V0
+  val messageFormatVersion: Byte = Record.MAGIC_VALUE_V0
   val id: Int = 1
 }
 
 case object KAFKA_0_8_2 extends ApiVersion {
   val version: String = "0.8.2.X"
-  val messageFormatVersion: Byte = Message.MagicValue_V0
+  val messageFormatVersion: Byte = Record.MAGIC_VALUE_V0
   val id: Int = 2
 }
 
 case object KAFKA_0_9_0 extends ApiVersion {
   val version: String = "0.9.0.X"
-  val messageFormatVersion: Byte = Message.MagicValue_V0
+  val messageFormatVersion: Byte = Record.MAGIC_VALUE_V0
   val id: Int = 3
 }
 
 case object KAFKA_0_10_0_IV0 extends ApiVersion {
   val version: String = "0.10.0-IV0"
-  val messageFormatVersion: Byte = Message.MagicValue_V1
+  val messageFormatVersion: Byte = Record.MAGIC_VALUE_V1
   val id: Int = 4
 }
 
 case object KAFKA_0_10_0_IV1 extends ApiVersion {
   val version: String = "0.10.0-IV1"
-  val messageFormatVersion: Byte = Message.MagicValue_V1
+  val messageFormatVersion: Byte = Record.MAGIC_VALUE_V1
   val id: Int = 5
 }
 
 case object KAFKA_0_10_1_IV0 extends ApiVersion {
   val version: String = "0.10.1-IV0"
-  val messageFormatVersion: Byte = Message.MagicValue_V1
+  val messageFormatVersion: Byte = Record.MAGIC_VALUE_V1
   val id: Int = 6
 }
 
 case object KAFKA_0_10_1_IV1 extends ApiVersion {
   val version: String = "0.10.1-IV1"
-  val messageFormatVersion: Byte = Message.MagicValue_V1
+  val messageFormatVersion: Byte = Record.MAGIC_VALUE_V1
   val id: Int = 7
 }
 
 case object KAFKA_0_10_1_IV2 extends ApiVersion {
   val version: String = "0.10.1-IV2"
-  val messageFormatVersion: Byte = Message.MagicValue_V1
+  val messageFormatVersion: Byte = Record.MAGIC_VALUE_V1
   val id: Int = 8
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/67f1e5b9/core/src/main/scala/kafka/cluster/Partition.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala
index 7e52a91..9eb92cd 100755
--- a/core/src/main/scala/kafka/cluster/Partition.scala
+++ b/core/src/main/scala/kafka/cluster/Partition.scala
@@ -25,7 +25,6 @@ import kafka.log.LogConfig
 import kafka.server._
 import kafka.metrics.KafkaMetricsGroup
 import kafka.controller.KafkaController
-import kafka.message.ByteBufferMessageSet
 import java.io.IOException
 import java.util.concurrent.locks.ReentrantReadWriteLock
 
@@ -34,6 +33,7 @@ import org.apache.kafka.common.protocol.Errors
 
 import scala.collection.JavaConverters._
 import com.yammer.metrics.core.Gauge
+import org.apache.kafka.common.record.MemoryRecords
 import org.apache.kafka.common.requests.PartitionState
 import org.apache.kafka.common.utils.Time
 
@@ -190,7 +190,7 @@ class Partition(val topic: String,
       allReplicas.foreach(replica => getOrCreateReplica(replica))
       val newInSyncReplicas = partitionStateInfo.isr.asScala.map(r => getOrCreateReplica(r)).toSet
       // remove assigned replicas that have been removed by the controller
-      (assignedReplicas().map(_.brokerId) -- allReplicas).foreach(removeReplica(_))
+      (assignedReplicas().map(_.brokerId) -- allReplicas).foreach(removeReplica)
       inSyncReplicas = newInSyncReplicas
       leaderEpoch = partitionStateInfo.leaderEpoch
       zkVersion = partitionStateInfo.zkVersion
@@ -440,7 +440,7 @@ class Partition(val topic: String,
     laggingReplicas
   }
 
-  def appendMessagesToLeader(messages: ByteBufferMessageSet, requiredAcks: Int = 0) = {
+  def appendRecordsToLeader(records: MemoryRecords, requiredAcks: Int = 0) = {
     val (info, leaderHWIncremented) = inReadLock(leaderIsrUpdateLock) {
       val leaderReplicaOpt = leaderReplicaIfLocal()
       leaderReplicaOpt match {
@@ -455,7 +455,7 @@ class Partition(val topic: String,
               .format(topic, partitionId, inSyncSize, minIsr))
           }
 
-          val info = log.append(messages, assignOffsets = true)
+          val info = log.append(records, assignOffsets = true)
           // probably unblock some follower fetch requests since log end offset has been updated
           replicaManager.tryCompleteDelayedFetch(TopicPartitionOperationKey(this.topic, this.partitionId))
           // we may need to increment high watermark since ISR could be down to 1
@@ -480,7 +480,7 @@ class Partition(val topic: String,
       newLeaderAndIsr, controllerEpoch, zkVersion)
 
     if(updateSucceeded) {
-      replicaManager.recordIsrChange(new TopicAndPartition(topic, partitionId))
+      replicaManager.recordIsrChange(TopicAndPartition(topic, partitionId))
       inSyncReplicas = newIsr
       zkVersion = newVersion
       trace("ISR updated to [%s] and zkVersion updated to [%d]".format(newIsr.mkString(","), zkVersion))

http://git-wip-us.apache.org/repos/asf/kafka/blob/67f1e5b9/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala b/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala
index c47efb7..f702b9d 100644
--- a/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala
+++ b/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala
@@ -17,14 +17,16 @@
 
 package kafka.consumer
 
-import kafka.api.{OffsetRequest, Request, FetchRequestBuilder, FetchResponsePartitionData}
+import kafka.api.{FetchRequestBuilder, FetchResponsePartitionData, OffsetRequest, Request}
 import kafka.cluster.BrokerEndPoint
 import kafka.message.ByteBufferMessageSet
-import kafka.server.{PartitionFetchState, AbstractFetcherThread}
+import kafka.server.{AbstractFetcherThread, PartitionFetchState}
 import kafka.common.{ErrorMapping, TopicAndPartition}
+
 import scala.collection.Map
 import ConsumerFetcherThread._
 import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.record.MemoryRecords
 
 class ConsumerFetcherThread(name: String,
                             val config: ConsumerConfig,
@@ -81,7 +83,7 @@ class ConsumerFetcherThread(name: String,
       case OffsetRequest.LargestTimeString => OffsetRequest.LatestTime
       case _ => OffsetRequest.LatestTime
     }
-    val topicAndPartition = new TopicAndPartition(topicPartition.topic, topicPartition.partition)
+    val topicAndPartition = TopicAndPartition(topicPartition.topic, topicPartition.partition)
     val newOffset = simpleConsumer.earliestOrLatestOffset(topicAndPartition, startTimestamp, Request.OrdinaryConsumerId)
     val pti = partitionMap(topicPartition)
     pti.resetFetchOffset(newOffset)
@@ -123,7 +125,7 @@ object ConsumerFetcherThread {
 
   class PartitionData(val underlying: FetchResponsePartitionData) extends AbstractFetcherThread.PartitionData {
     def errorCode: Short = underlying.error
-    def toByteBufferMessageSet: ByteBufferMessageSet = underlying.messages.asInstanceOf[ByteBufferMessageSet]
+    def toRecords: MemoryRecords = underlying.messages.asInstanceOf[ByteBufferMessageSet].asRecords
     def highWatermark: Long = underlying.hw
     def exception: Option[Throwable] =
       if (errorCode == ErrorMapping.NoError) None else Some(ErrorMapping.exceptionFor(errorCode))

http://git-wip-us.apache.org/repos/asf/kafka/blob/67f1e5b9/core/src/main/scala/kafka/coordinator/GroupCoordinator.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/GroupCoordinator.scala b/core/src/main/scala/kafka/coordinator/GroupCoordinator.scala
index 0c53345..db40482 100644
--- a/core/src/main/scala/kafka/coordinator/GroupCoordinator.scala
+++ b/core/src/main/scala/kafka/coordinator/GroupCoordinator.scala
@@ -458,14 +458,14 @@ class GroupCoordinator(val brokerId: Int,
   def handleFetchOffsets(groupId: String,
                          partitions: Seq[TopicPartition]): Map[TopicPartition, OffsetFetchResponse.PartitionData] = {
     if (!isActive.get) {
-      partitions.map { case topicPartition =>
+      partitions.map { topicPartition =>
         (topicPartition, new OffsetFetchResponse.PartitionData(OffsetFetchResponse.INVALID_OFFSET, "", Errors.GROUP_COORDINATOR_NOT_AVAILABLE.code))}.toMap
     } else if (!isCoordinatorForGroup(groupId)) {
       debug("Could not fetch offsets for group %s (not group coordinator).".format(groupId))
-      partitions.map { case topicPartition =>
+      partitions.map { topicPartition =>
         (topicPartition, new OffsetFetchResponse.PartitionData(OffsetFetchResponse.INVALID_OFFSET, "", Errors.NOT_COORDINATOR_FOR_GROUP.code))}.toMap
     } else if (isCoordinatorLoadingInProgress(groupId)) {
-      partitions.map { case topicPartition =>
+      partitions.map { topicPartition =>
         (topicPartition, new OffsetFetchResponse.PartitionData(OffsetFetchResponse.INVALID_OFFSET, "", Errors.GROUP_LOAD_IN_PROGRESS.code))}.toMap
     } else {
       // return offsets blindly regardless the current group state since the group may be using

http://git-wip-us.apache.org/repos/asf/kafka/blob/67f1e5b9/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala b/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala
index e55bcaa..a97b527 100644
--- a/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala
+++ b/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala
@@ -17,38 +17,31 @@
 
 package kafka.coordinator
 
-import org.apache.kafka.common.TopicPartition
-import org.apache.kafka.common.protocol.Errors
-import org.apache.kafka.common.protocol.types.{ArrayOf, Field, Schema, Struct}
-import org.apache.kafka.common.protocol.types.Type.STRING
-import org.apache.kafka.common.protocol.types.Type.NULLABLE_STRING
-import org.apache.kafka.common.protocol.types.Type.INT32
-import org.apache.kafka.common.protocol.types.Type.INT64
-import org.apache.kafka.common.protocol.types.Type.BYTES
-import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse
-import org.apache.kafka.common.requests.OffsetFetchResponse
-import org.apache.kafka.common.utils.Utils
-import org.apache.kafka.common.utils.Time
-import org.apache.kafka.clients.consumer.ConsumerRecord
-import kafka.utils._
-import kafka.common._
-import kafka.message._
-import kafka.log.FileMessageSet
-import kafka.metrics.KafkaMetricsGroup
-import kafka.common.TopicAndPartition
-import kafka.common.MessageFormatter
-import kafka.server.ReplicaManager
-
-import scala.collection._
 import java.io.PrintStream
 import java.nio.ByteBuffer
-import java.util.concurrent.atomic.AtomicBoolean
 import java.util.concurrent.TimeUnit
+import java.util.concurrent.atomic.AtomicBoolean
 import java.util.concurrent.locks.ReentrantLock
 
 import com.yammer.metrics.core.Gauge
 import kafka.api.{ApiVersion, KAFKA_0_10_1_IV0}
+import kafka.common.{MessageFormatter, TopicAndPartition, _}
+import kafka.metrics.KafkaMetricsGroup
+import kafka.server.ReplicaManager
 import kafka.utils.CoreUtils.inLock
+import kafka.utils._
+import org.apache.kafka.clients.consumer.ConsumerRecord
+import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.protocol.Errors
+import org.apache.kafka.common.protocol.types.Type._
+import org.apache.kafka.common.protocol.types.{ArrayOf, Field, Schema, Struct}
+import org.apache.kafka.common.record._
+import org.apache.kafka.common.requests.OffsetFetchResponse
+import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse
+import org.apache.kafka.common.utils.{Time, Utils}
+
+import scala.collection.JavaConverters._
+import scala.collection._
 
 class GroupMetadataManager(val brokerId: Int,
                            val interBrokerProtocolVersion: ApiVersion,
@@ -57,6 +50,8 @@ class GroupMetadataManager(val brokerId: Int,
                            zkUtils: ZkUtils,
                            time: Time) extends Logging with KafkaMetricsGroup {
 
+  private val compressionType: CompressionType = CompressionType.forId(config.offsetsTopicCompressionCodec.codec)
+
   private val groupMetadataCache = new Pool[String, GroupMetadata]
 
   /* lock protecting access to loading and owned partition sets */
@@ -135,13 +130,11 @@ class GroupMetadataManager(val brokerId: Int,
     }
   }
 
-
   def prepareStoreGroup(group: GroupMetadata,
                         groupAssignment: Map[String, Array[Byte]],
                         responseCallback: Errors => Unit): Option[DelayedStore] = {
-    val magicValueAndTimestampOpt = getMessageFormatVersionAndTimestamp(partitionFor(group.groupId))
-    magicValueAndTimestampOpt match {
-      case Some((magicValue, timestamp)) =>
+    getMagicAndTimestamp(partitionFor(group.groupId)) match {
+      case Some((magicValue, timestampType, timestamp)) =>
         val groupMetadataValueVersion = {
           if (interBrokerProtocolVersion < KAFKA_0_10_1_IV0)
             0.toShort
@@ -149,17 +142,12 @@ class GroupMetadataManager(val brokerId: Int,
             GroupMetadataManager.CURRENT_GROUP_VALUE_SCHEMA_VERSION
         }
 
-        val message = new Message(
-          key = GroupMetadataManager.groupMetadataKey(group.groupId),
-          bytes = GroupMetadataManager.groupMetadataValue(group, groupAssignment, version = groupMetadataValueVersion),
-          timestamp = timestamp,
-          magicValue = magicValue)
+        val record = Record.create(magicValue, timestampType, timestamp,
+          GroupMetadataManager.groupMetadataKey(group.groupId),
+          GroupMetadataManager.groupMetadataValue(group, groupAssignment, version = groupMetadataValueVersion))
 
         val groupMetadataPartition = new TopicPartition(Topic.GroupMetadataTopicName, partitionFor(group.groupId))
-
-        val groupMetadataMessageSet = Map(groupMetadataPartition ->
-          new ByteBufferMessageSet(config.offsetsTopicCompressionCodec, message))
-
+        val groupMetadataRecords = Map(groupMetadataPartition -> MemoryRecords.withRecords(timestampType, compressionType, record))
         val generationId = group.generationId
 
         // set the callback function to insert the created group into cache after log append completed
@@ -212,7 +200,7 @@ class GroupMetadataManager(val brokerId: Int,
 
           responseCallback(responseError)
         }
-        Some(DelayedStore(groupMetadataMessageSet, putCacheCallback))
+        Some(DelayedStore(groupMetadataRecords, putCacheCallback))
 
       case None =>
         responseCallback(Errors.NOT_COORDINATOR_FOR_GROUP)
@@ -222,11 +210,11 @@ class GroupMetadataManager(val brokerId: Int,
 
   def store(delayedStore: DelayedStore) {
     // call replica manager to append the group message
-    replicaManager.appendMessages(
+    replicaManager.appendRecords(
       config.offsetCommitTimeoutMs.toLong,
       config.offsetCommitRequiredAcks,
       true, // allow appending to internal offset topic
-      delayedStore.messageSet,
+      delayedStore.partitionRecords,
       delayedStore.callback)
   }
 
@@ -244,22 +232,17 @@ class GroupMetadataManager(val brokerId: Int,
     }
 
     // construct the message set to append
-    val magicValueAndTimestampOpt = getMessageFormatVersionAndTimestamp(partitionFor(group.groupId))
-    magicValueAndTimestampOpt match {
-      case Some((magicValue, timestamp)) =>
-        val messages = filteredOffsetMetadata.map { case (topicAndPartition, offsetAndMetadata) =>
-          new Message(
-            key = GroupMetadataManager.offsetCommitKey(group.groupId, topicAndPartition.topic, topicAndPartition.partition),
-            bytes = GroupMetadataManager.offsetCommitValue(offsetAndMetadata),
-            timestamp = timestamp,
-            magicValue = magicValue
-          )
+    getMagicAndTimestamp(partitionFor(group.groupId)) match {
+      case Some((magicValue, timestampType, timestamp)) =>
+        val records = filteredOffsetMetadata.map { case (topicAndPartition, offsetAndMetadata) =>
+          Record.create(magicValue, timestampType, timestamp,
+            GroupMetadataManager.offsetCommitKey(group.groupId, topicAndPartition.topic, topicAndPartition.partition),
+            GroupMetadataManager.offsetCommitValue(offsetAndMetadata))
         }.toSeq
 
         val offsetTopicPartition = new TopicPartition(Topic.GroupMetadataTopicName, partitionFor(group.groupId))
 
-        val offsetsAndMetadataMessageSet = Map(offsetTopicPartition ->
-          new ByteBufferMessageSet(config.offsetsTopicCompressionCodec, messages:_*))
+        val entries = Map(offsetTopicPartition -> MemoryRecords.withRecords(timestampType, compressionType, records:_*))
 
         // set the callback function to insert offsets into cache after log append completed
         def putCacheCallback(responseStatus: Map[TopicPartition, PartitionResponse]) {
@@ -330,7 +313,7 @@ class GroupMetadataManager(val brokerId: Int,
           group.prepareOffsetCommit(offsetMetadata)
         }
 
-        Some(DelayedStore(offsetsAndMetadataMessageSet, putCacheCallback))
+        Some(DelayedStore(entries, putCacheCallback))
 
       case None =>
         val commitStatus = offsetMetadata.map { case (topicAndPartition, offsetAndMetadata) =>
@@ -412,28 +395,30 @@ class GroupMetadataManager(val brokerId: Int,
 
             while (currOffset < getHighWatermark(offsetsPartition) && !shuttingDown.get()) {
               buffer.clear()
-              val messages = log.read(currOffset, config.loadBufferSize, minOneMessage = true).messageSet.asInstanceOf[FileMessageSet]
-              messages.readInto(buffer, 0)
-              val messageSet = new ByteBufferMessageSet(buffer)
-              messageSet.foreach { msgAndOffset =>
-                require(msgAndOffset.message.key != null, "Offset entry key should not be null")
-                val baseKey = GroupMetadataManager.readMessageKey(msgAndOffset.message.key)
+              val fileRecords = log.read(currOffset, config.loadBufferSize, minOneMessage = true).records.asInstanceOf[FileRecords]
+              fileRecords.readInto(buffer, 0)
+
+              MemoryRecords.readableRecords(buffer).deepIterator.asScala.foreach { entry =>
+                val record = entry.record
+
+                require(record.hasKey, "Offset entry key should not be null")
+                val baseKey = GroupMetadataManager.readMessageKey(record.key)
 
                 if (baseKey.isInstanceOf[OffsetKey]) {
                   // load offset
                   val key = baseKey.key.asInstanceOf[GroupTopicPartition]
-                  if (msgAndOffset.message.payload == null) {
+                  if (record.hasNullValue) {
                     loadedOffsets.remove(key)
                     removedOffsets.add(key)
                   } else {
-                    val value = GroupMetadataManager.readOffsetMessageValue(msgAndOffset.message.payload)
+                    val value = GroupMetadataManager.readOffsetMessageValue(record.value)
                     loadedOffsets.put(key, value)
                     removedOffsets.remove(key)
                   }
                 } else {
                   // load group metadata
                   val groupId = baseKey.key.asInstanceOf[String]
-                  val groupMetadata = GroupMetadataManager.readGroupMessageValue(groupId, msgAndOffset.message.payload)
+                  val groupMetadata = GroupMetadataManager.readGroupMessageValue(groupId, record.value)
                   if (groupMetadata != null) {
                     trace(s"Loaded group metadata for group ${groupMetadata.groupId} with generation ${groupMetadata.generationId}")
                     removedGroups.remove(groupId)
@@ -444,7 +429,7 @@ class GroupMetadataManager(val brokerId: Int,
                   }
                 }
 
-                currOffset = msgAndOffset.nextOffset
+                currOffset = entry.nextOffset
               }
             }
 
@@ -467,8 +452,8 @@ class GroupMetadataManager(val brokerId: Int,
 
             removedGroups.foreach { groupId =>
               if (groupMetadataCache.contains(groupId))
-                throw new IllegalStateException(s"Unexpected unload of active group ${groupId} while " +
-                  s"loading partition ${topicPartition}")
+                throw new IllegalStateException(s"Unexpected unload of active group $groupId while " +
+                  s"loading partition $topicPartition")
             }
 
             if (!shuttingDown.get())
@@ -572,15 +557,15 @@ class GroupMetadataManager(val brokerId: Int,
       }
 
       val offsetsPartition = partitionFor(groupId)
-      getMessageFormatVersionAndTimestamp(offsetsPartition) match {
-        case Some((magicValue, timestamp)) =>
+      getMagicAndTimestamp(offsetsPartition) match {
+        case Some((magicValue, timestampType, timestamp)) =>
           val partitionOpt = replicaManager.getPartition(Topic.GroupMetadataTopicName, offsetsPartition)
           partitionOpt.foreach { partition =>
             val appendPartition = TopicAndPartition(Topic.GroupMetadataTopicName, offsetsPartition)
             val tombstones = expiredOffsets.map { case (topicPartition, offsetAndMetadata) =>
               trace(s"Removing expired offset and metadata for $groupId, $topicPartition: $offsetAndMetadata")
               val commitKey = GroupMetadataManager.offsetCommitKey(groupId, topicPartition.topic, topicPartition.partition)
-              new Message(bytes = null, key = commitKey, timestamp = timestamp, magicValue = magicValue)
+              Record.create(magicValue, timestampType, timestamp, commitKey, null)
             }.toBuffer
             trace(s"Marked ${expiredOffsets.size} offsets in $appendPartition for deletion.")
 
@@ -590,8 +575,7 @@ class GroupMetadataManager(val brokerId: Int,
               // Append the tombstone messages to the partition. It is okay if the replicas don't receive these (say,
               // if we crash or leaders move) since the new leaders will still expire the consumers with heartbeat and
               // retry removing this group.
-              tombstones += new Message(bytes = null, key = GroupMetadataManager.groupMetadataKey(group.groupId),
-                timestamp = timestamp, magicValue = magicValue)
+              tombstones += Record.create(magicValue, timestampType, timestamp, GroupMetadataManager.groupMetadataKey(group.groupId), null)
               trace(s"Group $groupId removed from the metadata cache and marked for deletion in $appendPartition.")
             }
 
@@ -599,7 +583,7 @@ class GroupMetadataManager(val brokerId: Int,
               try {
                 // do not need to require acks since even if the tombstone is lost,
                 // it will be appended again in the next purge cycle
-                partition.appendMessagesToLeader(new ByteBufferMessageSet(config.offsetsTopicCompressionCodec, tombstones: _*))
+                partition.appendRecordsToLeader(MemoryRecords.withRecords(timestampType, compressionType, tombstones: _*))
                 offsetsRemoved += expiredOffsets.size
                 trace(s"Successfully appended ${tombstones.size} tombstones to $appendPartition for expired offsets and/or metadata for group $groupId")
               } catch {
@@ -663,16 +647,11 @@ class GroupMetadataManager(val brokerId: Int,
    * @param   partition  Partition of GroupMetadataTopic
    * @return  Option[(MessageFormatVersion, TimeStamp)] if replica is local, None otherwise
    */
-  private def getMessageFormatVersionAndTimestamp(partition: Int): Option[(Byte, Long)] = {
+  private def getMagicAndTimestamp(partition: Int): Option[(Byte, TimestampType, Long)] = {
     val groupMetadataTopicAndPartition = TopicAndPartition(Topic.GroupMetadataTopicName, partition)
-    replicaManager.getMessageFormatVersion(groupMetadataTopicAndPartition).map { messageFormatVersion =>
-      val timestamp = {
-        if (messageFormatVersion == Message.MagicValue_V0)
-          Message.NoTimestamp
-        else
-          time.milliseconds()
-      }
-      (messageFormatVersion, timestamp)
+    replicaManager.getMagicAndTimestampType(groupMetadataTopicAndPartition).map { case (messageFormatVersion, timestampType) =>
+      val timestamp = if (messageFormatVersion == Record.MAGIC_VALUE_V0) Record.NO_TIMESTAMP else time.milliseconds()
+      (messageFormatVersion, timestampType, timestamp)
     }
   }
 
@@ -964,7 +943,7 @@ object GroupMetadataManager {
    * @return an offset-metadata object from the message
    */
   def readOffsetMessageValue(buffer: ByteBuffer): OffsetAndMetadata = {
-    if(buffer == null) { // tombstone
+    if (buffer == null) { // tombstone
       null
     } else {
       val version = buffer.getShort
@@ -997,7 +976,7 @@ object GroupMetadataManager {
    * @return a group metadata object from the message
    */
   def readGroupMessageValue(groupId: String, buffer: ByteBuffer): GroupMetadata = {
-    if(buffer == null) { // tombstone
+    if (buffer == null) { // tombstone
       null
     } else {
       val version = buffer.getShort
@@ -1016,23 +995,22 @@ object GroupMetadataManager {
         group.leaderId = value.get(LEADER_KEY).asInstanceOf[String]
         group.protocol = value.get(PROTOCOL_KEY).asInstanceOf[String]
 
-        memberMetadataArray.foreach {
-          case memberMetadataObj =>
-            val memberMetadata = memberMetadataObj.asInstanceOf[Struct]
-            val memberId = memberMetadata.get(MEMBER_ID_KEY).asInstanceOf[String]
-            val clientId = memberMetadata.get(CLIENT_ID_KEY).asInstanceOf[String]
-            val clientHost = memberMetadata.get(CLIENT_HOST_KEY).asInstanceOf[String]
-            val sessionTimeout = memberMetadata.get(SESSION_TIMEOUT_KEY).asInstanceOf[Int]
-            val rebalanceTimeout = if (version == 0) sessionTimeout else memberMetadata.get(REBALANCE_TIMEOUT_KEY).asInstanceOf[Int]
+        memberMetadataArray.foreach { memberMetadataObj =>
+          val memberMetadata = memberMetadataObj.asInstanceOf[Struct]
+          val memberId = memberMetadata.get(MEMBER_ID_KEY).asInstanceOf[String]
+          val clientId = memberMetadata.get(CLIENT_ID_KEY).asInstanceOf[String]
+          val clientHost = memberMetadata.get(CLIENT_HOST_KEY).asInstanceOf[String]
+          val sessionTimeout = memberMetadata.get(SESSION_TIMEOUT_KEY).asInstanceOf[Int]
+          val rebalanceTimeout = if (version == 0) sessionTimeout else memberMetadata.get(REBALANCE_TIMEOUT_KEY).asInstanceOf[Int]
 
-            val subscription = Utils.toArray(memberMetadata.get(SUBSCRIPTION_KEY).asInstanceOf[ByteBuffer])
+          val subscription = Utils.toArray(memberMetadata.get(SUBSCRIPTION_KEY).asInstanceOf[ByteBuffer])
 
-            val member = new MemberMetadata(memberId, groupId, clientId, clientHost, rebalanceTimeout, sessionTimeout,
-              protocolType, List((group.protocol, subscription)))
+          val member = new MemberMetadata(memberId, groupId, clientId, clientHost, rebalanceTimeout, sessionTimeout,
+            protocolType, List((group.protocol, subscription)))
 
-            member.assignment = Utils.toArray(memberMetadata.get(ASSIGNMENT_KEY).asInstanceOf[ByteBuffer])
+          member.assignment = Utils.toArray(memberMetadata.get(ASSIGNMENT_KEY).asInstanceOf[ByteBuffer])
 
-            group.add(memberId, member)
+          group.add(memberId, member)
         }
 
         group
@@ -1087,7 +1065,7 @@ object GroupMetadataManager {
 
 }
 
-case class DelayedStore(messageSet: Map[TopicPartition, MessageSet],
+case class DelayedStore(partitionRecords: Map[TopicPartition, MemoryRecords],
                         callback: Map[TopicPartition, PartitionResponse] => Unit)
 
 case class GroupTopicPartition(group: String, topicPartition: TopicPartition) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/67f1e5b9/core/src/main/scala/kafka/log/FileMessageSet.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/FileMessageSet.scala b/core/src/main/scala/kafka/log/FileMessageSet.scala
deleted file mode 100755
index 506f5b9..0000000
--- a/core/src/main/scala/kafka/log/FileMessageSet.scala
+++ /dev/null
@@ -1,445 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kafka.log
-
-import java.io._
-import java.nio._
-import java.nio.channels._
-import java.util.concurrent.atomic._
-import java.util.concurrent.TimeUnit
-
-import kafka.utils._
-import kafka.message._
-import kafka.common.KafkaException
-import kafka.metrics.{KafkaMetricsGroup, KafkaTimer}
-import org.apache.kafka.common.errors.CorruptRecordException
-import org.apache.kafka.common.record.FileRecords
-import org.apache.kafka.common.utils.Utils
-
-import scala.collection.mutable.ArrayBuffer
-
-/**
- * An on-disk message set. An optional start and end position can be applied to the message set
- * which will allow slicing a subset of the file.
- * @param file The file name for the underlying log data
- * @param channel the underlying file channel used
- * @param start A lower bound on the absolute position in the file from which the message set begins
- * @param end The upper bound on the absolute position in the file at which the message set ends
- * @param isSlice Should the start and end parameters be used for slicing?
- */
-@nonthreadsafe
-class FileMessageSet private[kafka](@volatile var file: File,
-                                    private[log] val channel: FileChannel,
-                                    private[log] val start: Int,
-                                    private[log] val end: Int,
-                                    isSlice: Boolean) extends MessageSet {
-  /* the size of the message set in bytes */
-  private val _size =
-    if(isSlice)
-      new AtomicInteger(end - start) // don't check the file size if this is just a slice view
-    else
-      new AtomicInteger(math.min(channel.size.toInt, end) - start)
-
-  /* if this is not a slice, update the file pointer to the end of the file */
-  if (!isSlice)
-    /* set the file position to the last byte in the file */
-    channel.position(math.min(channel.size.toInt, end))
-
-  /**
-   * Create a file message set with no slicing.
-   */
-  def this(file: File, channel: FileChannel) =
-    this(file, channel, start = 0, end = Int.MaxValue, isSlice = false)
-
-  /**
-   * Create a file message set with no slicing
-   */
-  def this(file: File) =
-    this(file, FileMessageSet.openChannel(file, mutable = true))
-
-  /**
-   * Create a file message set with no slicing, and with initFileSize and preallocate.
-   * For windows NTFS and some old LINUX file system, set preallocate to true and initFileSize
-   * with one value (for example 512 * 1024 *1024 ) can improve the kafka produce performance.
-   * If it's new file and preallocate is true, end will be set to 0.  Otherwise set to Int.MaxValue.
-   */
-  def this(file: File, fileAlreadyExists: Boolean, initFileSize: Int, preallocate: Boolean) =
-      this(file,
-        channel = FileMessageSet.openChannel(file, mutable = true, fileAlreadyExists, initFileSize, preallocate),
-        start = 0,
-        end = if (!fileAlreadyExists && preallocate) 0 else Int.MaxValue,
-        isSlice = false)
-
-  /**
-   * Create a file message set with mutable option
-   */
-  def this(file: File, mutable: Boolean) = this(file, FileMessageSet.openChannel(file, mutable))
-
-  /**
-   * Create a slice view of the file message set that begins and ends at the given byte offsets
-   */
-  def this(file: File, channel: FileChannel, start: Int, end: Int) =
-    this(file, channel, start, end, isSlice = true)
-
-  /**
-   * Return a message set which is a view into this set starting from the given position and with the given size limit.
-   *
-   * If the size is beyond the end of the file, the end will be based on the size of the file at the time of the read.
-   *
-   * If this message set is already sliced, the position will be taken relative to that slicing.
-   *
-   * @param position The start position to begin the read from
-   * @param size The number of bytes after the start position to include
-   *
-   * @return A sliced wrapper on this message set limited based on the given position and size
-   */
-  def read(position: Int, size: Int): FileMessageSet = {
-    if(position < 0)
-      throw new IllegalArgumentException("Invalid position: " + position)
-    if(size < 0)
-      throw new IllegalArgumentException("Invalid size: " + size)
-    new FileMessageSet(file,
-                       channel,
-                       start = this.start + position,
-                       end = {
-                         // Handle the integer overflow
-                         if (this.start + position + size < 0)
-                           sizeInBytes()
-                         else
-                           math.min(this.start + position + size, sizeInBytes())
-                       })
-  }
-
-  override def asRecords: FileRecords = new FileRecords(file, channel, start, end, isSlice)
-
-  /**
-   * Search forward for the file position of the last offset that is greater than or equal to the target offset
-   * and return its physical position and the size of the message (including log overhead) at the returned offset. If
-   * no such offsets are found, return null.
-   *
-   * @param targetOffset The offset to search for.
-   * @param startingPosition The starting position in the file to begin searching from.
-   */
-  def searchForOffsetWithSize(targetOffset: Long, startingPosition: Int): (OffsetPosition, Int) = {
-    var position = startingPosition
-    val buffer = ByteBuffer.allocate(MessageSet.LogOverhead)
-    val size = sizeInBytes()
-    while(position + MessageSet.LogOverhead < size) {
-      buffer.rewind()
-      channel.read(buffer, position)
-      if(buffer.hasRemaining)
-        throw new IllegalStateException("Failed to read complete buffer for targetOffset %d startPosition %d in %s"
-          .format(targetOffset, startingPosition, file.getAbsolutePath))
-      buffer.rewind()
-      val offset = buffer.getLong()
-      val messageSize = buffer.getInt()
-      if (messageSize < Message.MinMessageOverhead)
-        throw new IllegalStateException("Invalid message size: " + messageSize)
-      if (offset >= targetOffset)
-        return (OffsetPosition(offset, position), messageSize + MessageSet.LogOverhead)
-      position += MessageSet.LogOverhead + messageSize
-    }
-    null
-  }
-
-  /**
-   * Search forward for the message whose timestamp is greater than or equals to the target timestamp.
-   *
-   * @param targetTimestamp The timestamp to search for.
-   * @param startingPosition The starting position to search.
-   * @return The timestamp and offset of the message found. None, if no message is found.
-   */
-  def searchForTimestamp(targetTimestamp: Long, startingPosition: Int): Option[TimestampOffset] = {
-    val messagesToSearch = read(startingPosition, sizeInBytes)
-    for (messageAndOffset <- messagesToSearch) {
-      val message = messageAndOffset.message
-      if (message.timestamp >= targetTimestamp) {
-        // We found a message
-        message.compressionCodec match {
-          case NoCompressionCodec =>
-            return Some(TimestampOffset(messageAndOffset.message.timestamp, messageAndOffset.offset))
-          case _ =>
-            // Iterate over the inner messages to get the exact offset.
-            for (innerMessageAndOffset <- ByteBufferMessageSet.deepIterator(messageAndOffset)) {
-              val timestamp = innerMessageAndOffset.message.timestamp
-              if (timestamp >= targetTimestamp)
-                return Some(TimestampOffset(innerMessageAndOffset.message.timestamp, innerMessageAndOffset.offset))
-            }
-            throw new IllegalStateException(s"The message set (max timestamp = ${message.timestamp}, max offset = ${messageAndOffset.offset}" +
-                s" should contain target timestamp $targetTimestamp but it does not.")
-        }
-      }
-    }
-    None
-  }
-
-  /**
-   * Return the largest timestamp of the messages after a given position in this file message set.
-   * @param startingPosition The starting position.
-   * @return The largest timestamp of the messages after the given position.
-   */
-  def largestTimestampAfter(startingPosition: Int): TimestampOffset = {
-    var maxTimestamp = Message.NoTimestamp
-    var offsetOfMaxTimestamp = -1L
-    val messagesToSearch = read(startingPosition, Int.MaxValue)
-    for (messageAndOffset <- messagesToSearch) {
-      if (messageAndOffset.message.timestamp > maxTimestamp) {
-        maxTimestamp = messageAndOffset.message.timestamp
-        offsetOfMaxTimestamp = messageAndOffset.offset
-      }
-    }
-    TimestampOffset(maxTimestamp, offsetOfMaxTimestamp)
-  }
-
-  /**
-    * This method is called before we write messages to the socket using zero-copy transfer. We need to
-    * make sure all the messages in the message set have the expected magic value.
-    *
-    * @param expectedMagicValue the magic value expected
-    * @return true if all messages have expected magic value, false otherwise
-    */
-  override def isMagicValueInAllWrapperMessages(expectedMagicValue: Byte): Boolean = {
-    var location = start
-    val offsetAndSizeBuffer = ByteBuffer.allocate(MessageSet.LogOverhead)
-    val crcAndMagicByteBuffer = ByteBuffer.allocate(Message.CrcLength + Message.MagicLength)
-    while (location < end) {
-      offsetAndSizeBuffer.rewind()
-      channel.read(offsetAndSizeBuffer, location)
-      if (offsetAndSizeBuffer.hasRemaining)
-        return true
-      offsetAndSizeBuffer.rewind()
-      offsetAndSizeBuffer.getLong // skip offset field
-      val messageSize = offsetAndSizeBuffer.getInt
-      if (messageSize < Message.MinMessageOverhead)
-        throw new IllegalStateException("Invalid message size: " + messageSize)
-      crcAndMagicByteBuffer.rewind()
-      channel.read(crcAndMagicByteBuffer, location + MessageSet.LogOverhead)
-      if (crcAndMagicByteBuffer.get(Message.MagicOffset) != expectedMagicValue)
-        return false
-      location += (MessageSet.LogOverhead + messageSize)
-    }
-    true
-  }
-
-  /**
-   * Convert this message set to use the specified message format.
-   */
-  def toMessageFormat(toMagicValue: Byte): MessageSet = {
-    val offsets = new ArrayBuffer[Long]
-    val newMessages = new ArrayBuffer[Message]
-    this.foreach { messageAndOffset =>
-      val message = messageAndOffset.message
-      if (message.compressionCodec == NoCompressionCodec) {
-        newMessages += message.toFormatVersion(toMagicValue)
-        offsets += messageAndOffset.offset
-      } else {
-        // File message set only has shallow iterator. We need to do deep iteration here if needed.
-        val deepIter = ByteBufferMessageSet.deepIterator(messageAndOffset)
-        for (innerMessageAndOffset <- deepIter) {
-          newMessages += innerMessageAndOffset.message.toFormatVersion(toMagicValue)
-          offsets += innerMessageAndOffset.offset
-        }
-      }
-    }
-
-    if (sizeInBytes > 0 && newMessages.isEmpty) {
-      // This indicates that the message is too large. We just return all the bytes in the file message set.
-      this
-    } else {
-      // We use the offset seq to assign offsets so the offset of the messages does not change.
-      new ByteBufferMessageSet(
-        compressionCodec = this.headOption.map(_.message.compressionCodec).getOrElse(NoCompressionCodec),
-        offsetSeq = offsets,
-        newMessages: _*)
-    }
-  }
-
-  /**
-   * Get a shallow iterator over the messages in the set.
-   */
-  override def iterator: Iterator[MessageAndOffset] = iterator(Int.MaxValue)
-
-  /**
-   * Get an iterator over the messages in the set. We only do shallow iteration here.
-   * @param maxMessageSize A limit on allowable message size to avoid allocating unbounded memory.
-   * If we encounter a message larger than this we throw an InvalidMessageException.
-   * @return The iterator.
-   */
-  def iterator(maxMessageSize: Int): Iterator[MessageAndOffset] = {
-    new IteratorTemplate[MessageAndOffset] {
-      var location = start
-      val sizeOffsetLength = 12
-      val sizeOffsetBuffer = ByteBuffer.allocate(sizeOffsetLength)
-
-      override def makeNext(): MessageAndOffset = {
-        if(location + sizeOffsetLength >= end)
-          return allDone()
-
-        // read the size of the item
-        sizeOffsetBuffer.rewind()
-        channel.read(sizeOffsetBuffer, location)
-        if(sizeOffsetBuffer.hasRemaining)
-          return allDone()
-
-        sizeOffsetBuffer.rewind()
-        val offset = sizeOffsetBuffer.getLong()
-        val size = sizeOffsetBuffer.getInt()
-        if(size < Message.MinMessageOverhead || location + sizeOffsetLength + size > end)
-          return allDone()
-        if(size > maxMessageSize)
-          throw new CorruptRecordException("Message size exceeds the largest allowable message size (%d).".format(maxMessageSize))
-
-        // read the item itself
-        val buffer = ByteBuffer.allocate(size)
-        channel.read(buffer, location + sizeOffsetLength)
-        if(buffer.hasRemaining)
-          return allDone()
-        buffer.rewind()
-
-        // increment the location and return the item
-        location += size + sizeOffsetLength
-        MessageAndOffset(new Message(buffer), offset)
-      }
-    }
-  }
-
-  /**
-   * The number of bytes taken up by this file set
-   */
-  def sizeInBytes(): Int = _size.get()
-
-  /**
-   * Append these messages to the message set
-   */
-  def append(messages: ByteBufferMessageSet) {
-    val written = messages.writeFullyTo(channel)
-    _size.getAndAdd(written)
-  }
-
-  /**
-   * Commit all written data to the physical disk
-   */
-  def flush() = {
-    channel.force(true)
-  }
-
-  /**
-   * Close this message set
-   */
-  def close() {
-    flush()
-    trim()
-    channel.close()
-  }
-
-  /**
-   * Trim file when close or roll to next file
-   */
-  def trim() {
-    truncateTo(sizeInBytes())
-  }
-
-  /**
-   * Delete this message set from the filesystem
-   * @return True iff this message set was deleted.
-   */
-  def delete(): Boolean = {
-    CoreUtils.swallow(channel.close())
-    file.delete()
-  }
-
-  /**
-   * Truncate this file message set to the given size in bytes. Note that this API does no checking that the
-   * given size falls on a valid message boundary.
-   * In some versions of the JDK truncating to the same size as the file message set will cause an
-   * update of the files mtime, so truncate is only performed if the targetSize is smaller than the
-   * size of the underlying FileChannel.
-   * It is expected that no other threads will do writes to the log when this function is called.
-   * @param targetSize The size to truncate to. Must be between 0 and sizeInBytes.
-   * @return The number of bytes truncated off
-   */
-  def truncateTo(targetSize: Int): Int = {
-    val originalSize = sizeInBytes
-    if(targetSize > originalSize || targetSize < 0)
-      throw new KafkaException("Attempt to truncate log segment to " + targetSize + " bytes failed, " +
-                               " size of this log segment is " + originalSize + " bytes.")
-    if (targetSize < channel.size.toInt) {
-      channel.truncate(targetSize)
-      channel.position(targetSize)
-      _size.set(targetSize)
-    }
-    originalSize - targetSize
-  }
-
-  /**
-   * Read from the underlying file into the buffer starting at the given position
-   */
-  def readInto(buffer: ByteBuffer, relativePosition: Int): ByteBuffer = {
-    channel.read(buffer, relativePosition + this.start)
-    buffer.flip()
-    buffer
-  }
-
-  /**
-   * Rename the file that backs this message set
-   * @throws IOException if rename fails.
-   */
-  def renameTo(f: File) {
-    try Utils.atomicMoveWithFallback(file.toPath, f.toPath)
-    finally this.file = f
-  }
-
-}
-
-object FileMessageSet extends Logging
-{
-  //preserve the previous logger name after moving logger aspect from FileMessageSet to companion
-  override val loggerName = classOf[FileMessageSet].getName
-
-  /**
-   * Open a channel for the given file
-   * For windows NTFS and some old LINUX file system, set preallocate to true and initFileSize
-   * with one value (for example 512 * 1025 *1024 ) can improve the kafka produce performance.
-   * @param file File path
-   * @param mutable mutable
-   * @param fileAlreadyExists File already exists or not
-   * @param initFileSize The size used for pre allocate file, for example 512 * 1025 *1024
-   * @param preallocate Pre allocate file or not, gotten from configuration.
-   */
-  def openChannel(file: File, mutable: Boolean, fileAlreadyExists: Boolean = false, initFileSize: Int = 0, preallocate: Boolean = false): FileChannel = {
-    if (mutable) {
-      if (fileAlreadyExists)
-        new RandomAccessFile(file, "rw").getChannel()
-      else {
-        if (preallocate) {
-          val randomAccessFile = new RandomAccessFile(file, "rw")
-          randomAccessFile.setLength(initFileSize)
-          randomAccessFile.getChannel()
-        }
-        else
-          new RandomAccessFile(file, "rw").getChannel()
-      }
-    }
-    else
-      new FileInputStream(file).getChannel()
-  }
-}
-
-object LogFlushStats extends KafkaMetricsGroup {
-  val logFlushTimer = new KafkaTimer(newTimer("LogFlushRateAndTimeMs", TimeUnit.MILLISECONDS, TimeUnit.SECONDS))
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/67f1e5b9/core/src/main/scala/kafka/log/Log.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala
index 6acc8d2..d58a066 100644
--- a/core/src/main/scala/kafka/log/Log.scala
+++ b/core/src/main/scala/kafka/log/Log.scala
@@ -19,7 +19,6 @@ package kafka.log
 
 import kafka.api.KAFKA_0_10_0_IV0
 import kafka.utils._
-import kafka.message._
 import kafka.common._
 import kafka.metrics.KafkaMetricsGroup
 import kafka.server.{BrokerTopicStats, FetchDataInfo, LogOffsetMetadata}
@@ -29,16 +28,18 @@ import java.util.concurrent.atomic._
 import java.text.NumberFormat
 
 import org.apache.kafka.common.errors.{CorruptRecordException, OffsetOutOfRangeException, RecordBatchTooLargeException, RecordTooLargeException, UnsupportedForMessageFormatException}
-import org.apache.kafka.common.record.TimestampType
+import org.apache.kafka.common.record._
 import org.apache.kafka.common.requests.ListOffsetRequest
 
 import scala.collection.Seq
 import scala.collection.JavaConverters._
 import com.yammer.metrics.core.Gauge
 import org.apache.kafka.common.utils.{Time, Utils}
+import kafka.message.{BrokerCompressionCodec, CompressionCodec, NoCompressionCodec}
 
 object LogAppendInfo {
-  val UnknownLogAppendInfo = LogAppendInfo(-1, -1, Message.NoTimestamp, -1L, Message.NoTimestamp, NoCompressionCodec, NoCompressionCodec, -1, -1, offsetsMonotonic = false)
+  val UnknownLogAppendInfo = LogAppendInfo(-1, -1, Record.NO_TIMESTAMP, -1L, Record.NO_TIMESTAMP,
+    NoCompressionCodec, NoCompressionCodec, -1, -1, offsetsMonotonic = false)
 }
 
 /**
@@ -243,7 +244,7 @@ class Log(@volatile var dir: File,
       val index =  new OffsetIndex(indexFile, baseOffset = startOffset, maxIndexSize = config.maxIndexSize)
       val timeIndexFile = new File(CoreUtils.replaceSuffix(logFile.getPath, LogFileSuffix, TimeIndexFileSuffix) + SwapFileSuffix)
       val timeIndex = new TimeIndex(timeIndexFile, baseOffset = startOffset, maxIndexSize = config.maxIndexSize)
-      val swapSegment = new LogSegment(new FileMessageSet(file = swapFile),
+      val swapSegment = new LogSegment(FileRecords.open(swapFile),
                                        index = index,
                                        timeIndex = timeIndex,
                                        baseOffset = startOffset,
@@ -338,20 +339,20 @@ class Log(@volatile var dir: File,
    * This method will generally be responsible for assigning offsets to the messages,
    * however if the assignOffsets=false flag is passed we will only check that the existing offsets are valid.
    *
-   * @param messages The message set to append
+   * @param records The log records to append
    * @param assignOffsets Should the log assign offsets to this message set or blindly apply what it is given
    * @throws KafkaStorageException If the append fails due to an I/O error.
    * @return Information about the appended messages including the first and last offset.
    */
-  def append(messages: ByteBufferMessageSet, assignOffsets: Boolean = true): LogAppendInfo = {
-    val appendInfo = analyzeAndValidateMessageSet(messages)
+  def append(records: MemoryRecords, assignOffsets: Boolean = true): LogAppendInfo = {
+    val appendInfo = analyzeAndValidateRecords(records)
 
     // if we have any valid messages, append them to the log
     if (appendInfo.shallowCount == 0)
       return appendInfo
 
     // trim any invalid bytes or partial messages before appending it to the on-disk log
-    var validMessages = trimInvalidBytes(messages, appendInfo)
+    var validRecords = trimInvalidBytes(records, appendInfo)
 
     try {
       // they are valid, insert them in the log
@@ -363,20 +364,21 @@ class Log(@volatile var dir: File,
           appendInfo.firstOffset = offset.value
           val now = time.milliseconds
           val validateAndOffsetAssignResult = try {
-            validMessages.validateMessagesAndAssignOffsets(offset,
-                                                           now,
-                                                           appendInfo.sourceCodec,
-                                                           appendInfo.targetCodec,
-                                                           config.compact,
-                                                           config.messageFormatVersion.messageFormatVersion,
-                                                           config.messageTimestampType,
-                                                           config.messageTimestampDifferenceMaxMs)
+            LogValidator.validateMessagesAndAssignOffsets(validRecords,
+                                                          offset,
+                                                          now,
+                                                          appendInfo.sourceCodec,
+                                                          appendInfo.targetCodec,
+                                                          config.compact,
+                                                          config.messageFormatVersion.messageFormatVersion,
+                                                          config.messageTimestampType,
+                                                          config.messageTimestampDifferenceMaxMs)
           } catch {
             case e: IOException => throw new KafkaException("Error in validating messages while appending to log '%s'".format(name), e)
           }
-          validMessages = validateAndOffsetAssignResult.validatedMessages
+          validRecords = validateAndOffsetAssignResult.validatedRecords
           appendInfo.maxTimestamp = validateAndOffsetAssignResult.maxTimestamp
-          appendInfo.offsetOfMaxTimestamp = validateAndOffsetAssignResult.offsetOfMaxTimestamp
+          appendInfo.offsetOfMaxTimestamp = validateAndOffsetAssignResult.shallowOffsetOfMaxTimestamp
           appendInfo.lastOffset = offset.value - 1
           if (config.messageTimestampType == TimestampType.LOG_APPEND_TIME)
             appendInfo.logAppendTime = now
@@ -384,14 +386,14 @@ class Log(@volatile var dir: File,
           // re-validate message sizes if there's a possibility that they have changed (due to re-compression or message
           // format conversion)
           if (validateAndOffsetAssignResult.messageSizeMaybeChanged) {
-            for (messageAndOffset <- validMessages.shallowIterator) {
-              if (MessageSet.entrySize(messageAndOffset.message) > config.maxMessageSize) {
+            for (logEntry <- validRecords.shallowIterator.asScala) {
+              if (logEntry.sizeInBytes > config.maxMessageSize) {
                 // we record the original message set size instead of the trimmed size
                 // to be consistent with pre-compression bytesRejectedRate recording
-                BrokerTopicStats.getBrokerTopicStats(topicAndPartition.topic).bytesRejectedRate.mark(messages.sizeInBytes)
-                BrokerTopicStats.getBrokerAllTopicsStats.bytesRejectedRate.mark(messages.sizeInBytes)
+                BrokerTopicStats.getBrokerTopicStats(topicAndPartition.topic).bytesRejectedRate.mark(records.sizeInBytes)
+                BrokerTopicStats.getBrokerAllTopicsStats.bytesRejectedRate.mark(records.sizeInBytes)
                 throw new RecordTooLargeException("Message size is %d bytes which exceeds the maximum configured message size of %d."
-                  .format(MessageSet.entrySize(messageAndOffset.message), config.maxMessageSize))
+                  .format(logEntry.sizeInBytes, config.maxMessageSize))
               }
             }
           }
@@ -399,28 +401,27 @@ class Log(@volatile var dir: File,
         } else {
           // we are taking the offsets we are given
           if (!appendInfo.offsetsMonotonic || appendInfo.firstOffset < nextOffsetMetadata.messageOffset)
-            throw new IllegalArgumentException("Out of order offsets found in " + messages)
+            throw new IllegalArgumentException("Out of order offsets found in " + records.deepIterator.asScala.map(_.offset))
         }
 
         // check messages set size may be exceed config.segmentSize
-        if (validMessages.sizeInBytes > config.segmentSize) {
+        if (validRecords.sizeInBytes > config.segmentSize) {
           throw new RecordBatchTooLargeException("Message set size is %d bytes which exceeds the maximum configured segment size of %d."
-            .format(validMessages.sizeInBytes, config.segmentSize))
+            .format(validRecords.sizeInBytes, config.segmentSize))
         }
 
         // maybe roll the log if this segment is full
-        val segment = maybeRoll(messagesSize = validMessages.sizeInBytes,
-                                maxTimestampInMessages = appendInfo.maxTimestamp)
+        val segment = maybeRoll(messagesSize = validRecords.sizeInBytes, maxTimestampInMessages = appendInfo.maxTimestamp)
 
         // now append to the log
         segment.append(firstOffset = appendInfo.firstOffset, largestTimestamp = appendInfo.maxTimestamp,
-          offsetOfLargestTimestamp = appendInfo.offsetOfMaxTimestamp, messages = validMessages)
+          shallowOffsetOfMaxTimestamp = appendInfo.offsetOfMaxTimestamp, records = validRecords)
 
         // increment the log end offset
         updateLogEndOffset(appendInfo.lastOffset + 1)
 
         trace("Appended message set to log %s with first offset: %d, next offset: %d, and messages: %s"
-          .format(this.name, appendInfo.firstOffset, nextOffsetMetadata.messageOffset, validMessages))
+          .format(this.name, appendInfo.firstOffset, nextOffsetMetadata.messageOffset, validRecords))
 
         if (unflushedMessages >= config.flushInterval)
           flush()
@@ -449,73 +450,74 @@ class Log(@volatile var dir: File,
    * <li> Whether any compression codec is used (if many are used, then the last one is given)
    * </ol>
    */
-  private def analyzeAndValidateMessageSet(messages: ByteBufferMessageSet): LogAppendInfo = {
+  private def analyzeAndValidateRecords(records: MemoryRecords): LogAppendInfo = {
     var shallowMessageCount = 0
     var validBytesCount = 0
     var firstOffset, lastOffset = -1L
     var sourceCodec: CompressionCodec = NoCompressionCodec
     var monotonic = true
-    var maxTimestamp = Message.NoTimestamp
+    var maxTimestamp = Record.NO_TIMESTAMP
     var offsetOfMaxTimestamp = -1L
-    for(messageAndOffset <- messages.shallowIterator) {
+    for (entry <- records.shallowIterator.asScala) {
       // update the first offset if on the first message
       if(firstOffset < 0)
-        firstOffset = messageAndOffset.offset
+        firstOffset = entry.offset
       // check that offsets are monotonically increasing
-      if(lastOffset >= messageAndOffset.offset)
+      if(lastOffset >= entry.offset)
         monotonic = false
       // update the last offset seen
-      lastOffset = messageAndOffset.offset
+      lastOffset = entry.offset
 
-      val m = messageAndOffset.message
+      val record = entry.record
 
       // Check if the message sizes are valid.
-      val messageSize = MessageSet.entrySize(m)
+      val messageSize = entry.sizeInBytes
       if(messageSize > config.maxMessageSize) {
-        BrokerTopicStats.getBrokerTopicStats(topicAndPartition.topic).bytesRejectedRate.mark(messages.sizeInBytes)
-        BrokerTopicStats.getBrokerAllTopicsStats.bytesRejectedRate.mark(messages.sizeInBytes)
+        BrokerTopicStats.getBrokerTopicStats(topicAndPartition.topic).bytesRejectedRate.mark(records.sizeInBytes)
+        BrokerTopicStats.getBrokerAllTopicsStats.bytesRejectedRate.mark(records.sizeInBytes)
         throw new RecordTooLargeException("Message size is %d bytes which exceeds the maximum configured message size of %d."
           .format(messageSize, config.maxMessageSize))
       }
 
       // check the validity of the message by checking CRC
-      m.ensureValid()
-      if (m.timestamp > maxTimestamp) {
-        maxTimestamp = m.timestamp
+      record.ensureValid()
+      if (record.timestamp > maxTimestamp) {
+        maxTimestamp = record.timestamp
         offsetOfMaxTimestamp = lastOffset
       }
       shallowMessageCount += 1
       validBytesCount += messageSize
 
-      val messageCodec = m.compressionCodec
-      if(messageCodec != NoCompressionCodec)
+      val messageCodec = CompressionCodec.getCompressionCodec(record.compressionType.id)
+      if (messageCodec != NoCompressionCodec)
         sourceCodec = messageCodec
     }
 
     // Apply broker-side compression if any
     val targetCodec = BrokerCompressionCodec.getTargetCompressionCodec(config.compressionType, sourceCodec)
 
-    LogAppendInfo(firstOffset, lastOffset, maxTimestamp, offsetOfMaxTimestamp, Message.NoTimestamp, sourceCodec, targetCodec, shallowMessageCount, validBytesCount, monotonic)
+    LogAppendInfo(firstOffset, lastOffset, maxTimestamp, offsetOfMaxTimestamp, Record.NO_TIMESTAMP, sourceCodec,
+      targetCodec, shallowMessageCount, validBytesCount, monotonic)
   }
 
   /**
    * Trim any invalid bytes from the end of this message set (if there are any)
    *
-   * @param messages The message set to trim
+   * @param records The records to trim
    * @param info The general information of the message set
    * @return A trimmed message set. This may be the same as what was passed in or it may not.
    */
-  private def trimInvalidBytes(messages: ByteBufferMessageSet, info: LogAppendInfo): ByteBufferMessageSet = {
-    val messageSetValidBytes = info.validBytes
-    if(messageSetValidBytes < 0)
-      throw new CorruptRecordException("Illegal length of message set " + messageSetValidBytes + " Message set cannot be appended to log. Possible causes are corrupted produce requests")
-    if(messageSetValidBytes == messages.sizeInBytes) {
-      messages
+  private def trimInvalidBytes(records: MemoryRecords, info: LogAppendInfo): MemoryRecords = {
+    val validBytes = info.validBytes
+    if (validBytes < 0)
+      throw new CorruptRecordException("Illegal length of message set " + validBytes + " Message set cannot be appended to log. Possible causes are corrupted produce requests")
+    if (validBytes == records.sizeInBytes) {
+      records
     } else {
       // trim invalid bytes
-      val validByteBuffer = messages.buffer.duplicate()
-      validByteBuffer.limit(messageSetValidBytes)
-      new ByteBufferMessageSet(validByteBuffer)
+      val validByteBuffer = records.buffer.duplicate()
+      validByteBuffer.limit(validBytes)
+      MemoryRecords.readableRecords(validByteBuffer)
     }
   }
 
@@ -538,7 +540,7 @@ class Log(@volatile var dir: File,
     val currentNextOffsetMetadata = nextOffsetMetadata
     val next = currentNextOffsetMetadata.messageOffset
     if(startOffset == next)
-      return FetchDataInfo(currentNextOffsetMetadata, MessageSet.Empty)
+      return FetchDataInfo(currentNextOffsetMetadata, MemoryRecords.EMPTY)
 
     var entry = segments.floorEntry(startOffset)
 
@@ -578,7 +580,7 @@ class Log(@volatile var dir: File,
     // okay we are beyond the end of the last segment with no data fetched although the start offset is in range,
     // this can happen when all messages with offset larger than start offsets have been deleted.
     // In this case, we will return the empty set with log end offset metadata
-    FetchDataInfo(nextOffsetMetadata, MessageSet.Empty)
+    FetchDataInfo(nextOffsetMetadata, MemoryRecords.EMPTY)
   }
 
   /**
@@ -610,9 +612,9 @@ class Log(@volatile var dir: File,
     val segmentsCopy = logSegments.toBuffer
     // For the earliest and latest, we do not need to return the timestamp.
     if (targetTimestamp == ListOffsetRequest.EARLIEST_TIMESTAMP)
-        return Some(TimestampOffset(Message.NoTimestamp, segmentsCopy.head.baseOffset))
+        return Some(TimestampOffset(Record.NO_TIMESTAMP, segmentsCopy.head.baseOffset))
     else if (targetTimestamp == ListOffsetRequest.LATEST_TIMESTAMP)
-        return Some(TimestampOffset(Message.NoTimestamp, logEndOffset))
+        return Some(TimestampOffset(Record.NO_TIMESTAMP, logEndOffset))
 
     val targetSeg = {
       // Get all the segments whose largest timestamp is smaller than target timestamp
@@ -656,7 +658,7 @@ class Log(@volatile var dir: File,
         if (segments.size == numToDelete)
           roll()
         // remove the segments for lookups
-        deletable.foreach(deleteSegment(_))
+        deletable.foreach(deleteSegment)
       }
       numToDelete
     }
@@ -865,7 +867,7 @@ class Log(@volatile var dir: File,
         truncateFullyAndStartAt(targetOffset)
       } else {
         val deletable = logSegments.filter(segment => segment.baseOffset > targetOffset)
-        deletable.foreach(deleteSegment(_))
+        deletable.foreach(deleteSegment)
         activeSegment.truncateTo(targetOffset)
         updateLogEndOffset(targetOffset)
         this.recoveryPoint = math.min(targetOffset, this.recoveryPoint)
@@ -882,7 +884,7 @@ class Log(@volatile var dir: File,
     debug("Truncate and start log '" + name + "' to " + newOffset)
     lock synchronized {
       val segmentsToDelete = logSegments.toList
-      segmentsToDelete.foreach(deleteSegment(_))
+      segmentsToDelete.foreach(deleteSegment)
       addSegment(new LogSegment(dir,
                                 newOffset,
                                 indexIntervalBytes = config.indexInterval,

http://git-wip-us.apache.org/repos/asf/kafka/blob/67f1e5b9/core/src/main/scala/kafka/log/LogCleaner.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/LogCleaner.scala b/core/src/main/scala/kafka/log/LogCleaner.scala
index 4a76b0c..c5a73d5 100644
--- a/core/src/main/scala/kafka/log/LogCleaner.scala
+++ b/core/src/main/scala/kafka/log/LogCleaner.scala
@@ -17,20 +17,21 @@
 
 package kafka.log
 
-import java.io.{DataOutputStream, File}
+import java.io.File
 import java.nio._
 import java.util.Date
 import java.util.concurrent.{CountDownLatch, TimeUnit}
 
 import com.yammer.metrics.core.Gauge
 import kafka.common._
-import kafka.message._
 import kafka.metrics.KafkaMetricsGroup
 import kafka.utils._
+import org.apache.kafka.common.record.{FileRecords, LogEntry, MemoryRecords}
 import org.apache.kafka.common.utils.Time
+import MemoryRecords.LogEntryFilter
 
-import scala.Iterable
 import scala.collection._
+import JavaConverters._
 
 /**
  * The cleaner is responsible for removing obsolete records from logs which have the dedupe retention strategy.
@@ -390,10 +391,10 @@ private[log] class Cleaner(val id: Int,
     val timeIndexFile = new File(segments.head.timeIndex.file.getPath + Log.CleanedFileSuffix)
     indexFile.delete()
     timeIndexFile.delete()
-    val messages = new FileMessageSet(logFile, fileAlreadyExists = false, initFileSize = log.initFileSize(), preallocate = log.config.preallocate)
+    val records = FileRecords.open(logFile, false, log.initFileSize(), log.config.preallocate)
     val index = new OffsetIndex(indexFile, segments.head.baseOffset, segments.head.index.maxIndexSize)
     val timeIndex = new TimeIndex(timeIndexFile, segments.head.baseOffset, segments.head.timeIndex.maxIndexSize)
-    val cleaned = new LogSegment(messages, index, timeIndex, segments.head.baseOffset, segments.head.indexIntervalBytes, log.config.randomSegmentJitter, time)
+    val cleaned = new LogSegment(records, index, timeIndex, segments.head.baseOffset, segments.head.indexIntervalBytes, log.config.randomSegmentJitter, time)
 
     try {
       // clean segments into the new destination segment
@@ -449,8 +450,12 @@ private[log] class Cleaner(val id: Int,
                              retainDeletes: Boolean,
                              maxLogMessageSize: Int,
                              stats: CleanerStats) {
-    def shouldRetain(messageAndOffset: MessageAndOffset): Boolean =
-      shouldRetainMessage(source, map, retainDeletes, messageAndOffset, stats)
+    def shouldRetainEntry(logEntry: LogEntry): Boolean =
+      shouldRetainMessage(source, map, retainDeletes, logEntry, stats)
+
+    class LogCleanerFilter extends LogEntryFilter {
+      def shouldRetain(logEntry: LogEntry): Boolean = shouldRetainEntry(logEntry)
+    }
 
     var position = 0
     while (position < source.log.sizeInBytes) {
@@ -460,10 +465,9 @@ private[log] class Cleaner(val id: Int,
       writeBuffer.clear()
 
       source.log.readInto(readBuffer, position)
-      val messages = new ByteBufferMessageSet(readBuffer)
-      throttler.maybeThrottle(messages.sizeInBytes)
-      val result = messages.filterInto(writeBuffer, shouldRetain)
-
+      val records = MemoryRecords.readableRecords(readBuffer)
+      throttler.maybeThrottle(records.sizeInBytes)
+      val result = records.filterTo(new LogCleanerFilter, writeBuffer)
       stats.readMessages(result.messagesRead, result.bytesRead)
       stats.recopyMessages(result.messagesRetained, result.bytesRetained)
 
@@ -472,9 +476,10 @@ private[log] class Cleaner(val id: Int,
       // if any messages are to be retained, write them out
       if (writeBuffer.position > 0) {
         writeBuffer.flip()
-        val retained = new ByteBufferMessageSet(writeBuffer)
-        dest.append(firstOffset = retained.head.offset, largestTimestamp = result.maxTimestamp,
-          offsetOfLargestTimestamp = result.offsetOfMaxTimestamp, messages = retained)
+
+        val retained = MemoryRecords.readableRecords(writeBuffer)
+        dest.append(firstOffset = retained.deepIterator().next().offset, largestTimestamp = result.maxTimestamp,
+          shallowOffsetOfMaxTimestamp = result.shallowOffsetOfMaxTimestamp, records = retained)
         throttler.maybeThrottle(writeBuffer.limit)
       }
       
@@ -488,21 +493,22 @@ private[log] class Cleaner(val id: Int,
   private def shouldRetainMessage(source: kafka.log.LogSegment,
                                   map: kafka.log.OffsetMap,
                                   retainDeletes: Boolean,
-                                  entry: kafka.message.MessageAndOffset,
+                                  entry: LogEntry,
                                   stats: CleanerStats): Boolean = {
     val pastLatestOffset = entry.offset > map.latestOffset
     if (pastLatestOffset)
       return true
 
-    val key = entry.message.key
-    if (key != null) {
+
+    if (entry.record.hasKey) {
+      val key = entry.record.key
       val foundOffset = map.get(key)
       /* two cases in which we can get rid of a message:
        *   1) if there exists a message with the same key but higher offset
        *   2) if the message is a delete "tombstone" marker and enough time has passed
        */
       val redundant = foundOffset >= 0 && entry.offset < foundOffset
-      val obsoleteDelete = !retainDeletes && entry.message.isNull
+      val obsoleteDelete = !retainDeletes && entry.record.hasNullValue
       !redundant && !obsoleteDelete
     } else {
       stats.invalidMessage()
@@ -620,12 +626,12 @@ private[log] class Cleaner(val id: Int,
       checkDone(topicAndPartition)
       readBuffer.clear()
       segment.log.readInto(readBuffer, position)
-      val messages = new ByteBufferMessageSet(readBuffer)
-      throttler.maybeThrottle(messages.sizeInBytes)
+      val records = MemoryRecords.readableRecords(readBuffer)
+      throttler.maybeThrottle(records.sizeInBytes)
 
       val startPosition = position
-      for (entry <- messages) {
-        val message = entry.message
+      for (entry <- records.deepIterator.asScala) {
+        val message = entry.record
         if (message.hasKey && entry.offset >= start) {
           if (map.size < maxDesiredMapSize)
             map.put(message.key, entry.offset)
@@ -634,8 +640,9 @@ private[log] class Cleaner(val id: Int,
         }
         stats.indexMessagesRead(1)
       }
-      position += messages.validBytes
-      stats.indexBytesRead(messages.validBytes)
+      val bytesRead = records.validBytes
+      position += bytesRead
+      stats.indexBytesRead(bytesRead)
 
       // if we didn't read even one complete message, our read buffer may be too small
       if(position == startPosition)

http://git-wip-us.apache.org/repos/asf/kafka/blob/67f1e5b9/core/src/main/scala/kafka/log/LogManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/LogManager.scala b/core/src/main/scala/kafka/log/LogManager.scala
index ed79946..953fca4 100755
--- a/core/src/main/scala/kafka/log/LogManager.scala
+++ b/core/src/main/scala/kafka/log/LogManager.scala
@@ -440,7 +440,7 @@ class LogManager(val logDirs: Array[File],
         removedLog.dir = renamedDir
         // change the file pointers for log and index file
         for (logSegment <- removedLog.logSegments) {
-          logSegment.log.file = new File(renamedDir, logSegment.log.file.getName)
+          logSegment.log.setFile(new File(renamedDir, logSegment.log.file.getName))
           logSegment.index.file = new File(renamedDir, logSegment.index.file.getName)
         }