You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2016/12/13 18:41:33 UTC
[6/9] kafka git commit: KAFKA-4390;
Replace MessageSet usage with client-side alternatives
http://git-wip-us.apache.org/repos/asf/kafka/blob/67f1e5b9/clients/src/test/java/org/apache/kafka/common/record/RecordTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/record/RecordTest.java b/clients/src/test/java/org/apache/kafka/common/record/RecordTest.java
index 6482529..551d820 100644
--- a/clients/src/test/java/org/apache/kafka/common/record/RecordTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/record/RecordTest.java
@@ -35,18 +35,22 @@ import org.junit.runners.Parameterized.Parameters;
@RunWith(value = Parameterized.class)
public class RecordTest {
+ private byte magic;
private long timestamp;
private ByteBuffer key;
private ByteBuffer value;
private CompressionType compression;
+ private TimestampType timestampType;
private Record record;
- public RecordTest(long timestamp, byte[] key, byte[] value, CompressionType compression) {
+ public RecordTest(byte magic, long timestamp, byte[] key, byte[] value, CompressionType compression) {
+ this.magic = magic;
this.timestamp = timestamp;
+ this.timestampType = TimestampType.CREATE_TIME;
this.key = key == null ? null : ByteBuffer.wrap(key);
this.value = value == null ? null : ByteBuffer.wrap(value);
this.compression = compression;
- this.record = new Record(timestamp, key, value, compression);
+ this.record = Record.create(magic, timestamp, key, value, compression, timestampType);
}
@Test
@@ -56,22 +60,33 @@ public class RecordTest {
assertEquals(key, record.key());
if (key != null)
assertEquals(key.limit(), record.keySize());
- assertEquals(Record.CURRENT_MAGIC_VALUE, record.magic());
+ assertEquals(magic, record.magic());
assertEquals(value, record.value());
if (value != null)
assertEquals(value.limit(), record.valueSize());
+ if (magic > 0) {
+ assertEquals(timestamp, record.timestamp());
+ assertEquals(timestampType, record.timestampType());
+ } else {
+ assertEquals(Record.NO_TIMESTAMP, record.timestamp());
+ assertEquals(TimestampType.NO_TIMESTAMP_TYPE, record.timestampType());
+ }
}
@Test
public void testChecksum() {
assertEquals(record.checksum(), record.computeChecksum());
+
+ byte attributes = Record.computeAttributes(magic, this.compression, TimestampType.CREATE_TIME);
assertEquals(record.checksum(), Record.computeChecksum(
- this.timestamp,
- this.key == null ? null : this.key.array(),
- this.value == null ? null : this.value.array(),
- this.compression, 0, -1));
+ magic,
+ attributes,
+ this.timestamp,
+ this.key == null ? null : this.key.array(),
+ this.value == null ? null : this.value.array()
+ ));
assertTrue(record.isValid());
- for (int i = Record.CRC_OFFSET + Record.CRC_LENGTH; i < record.size(); i++) {
+ for (int i = Record.CRC_OFFSET + Record.CRC_LENGTH; i < record.sizeInBytes(); i++) {
Record copy = copyOf(record);
copy.buffer().put(i, (byte) 69);
assertFalse(copy.isValid());
@@ -85,7 +100,7 @@ public class RecordTest {
}
private Record copyOf(Record record) {
- ByteBuffer buffer = ByteBuffer.allocate(record.size());
+ ByteBuffer buffer = ByteBuffer.allocate(record.sizeInBytes());
record.buffer().put(buffer);
buffer.rewind();
record.buffer().rewind();
@@ -101,12 +116,13 @@ public class RecordTest {
public static Collection<Object[]> data() {
byte[] payload = new byte[1000];
Arrays.fill(payload, (byte) 1);
- List<Object[]> values = new ArrayList<Object[]>();
- for (long timestamp : Arrays.asList(Record.NO_TIMESTAMP, 0L, 1L))
- for (byte[] key : Arrays.asList(null, "".getBytes(), "key".getBytes(), payload))
- for (byte[] value : Arrays.asList(null, "".getBytes(), "value".getBytes(), payload))
- for (CompressionType compression : CompressionType.values())
- values.add(new Object[] {timestamp, key, value, compression});
+ List<Object[]> values = new ArrayList<>();
+ for (byte magic : Arrays.asList(Record.MAGIC_VALUE_V0, Record.MAGIC_VALUE_V1))
+ for (long timestamp : Arrays.asList(Record.NO_TIMESTAMP, 0L, 1L))
+ for (byte[] key : Arrays.asList(null, "".getBytes(), "key".getBytes(), payload))
+ for (byte[] value : Arrays.asList(null, "".getBytes(), "value".getBytes(), payload))
+ for (CompressionType compression : CompressionType.values())
+ values.add(new Object[] {magic, timestamp, key, value, compression});
return values;
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/67f1e5b9/clients/src/test/java/org/apache/kafka/common/record/SimpleRecordTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/record/SimpleRecordTest.java b/clients/src/test/java/org/apache/kafka/common/record/SimpleRecordTest.java
index aabadfe..427c743 100644
--- a/clients/src/test/java/org/apache/kafka/common/record/SimpleRecordTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/record/SimpleRecordTest.java
@@ -20,35 +20,29 @@ import org.junit.Test;
import java.nio.ByteBuffer;
+import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
public class SimpleRecordTest {
/* This scenario can happen if the record size field is corrupt and we end up allocating a buffer that is too small */
- @Test
+ @Test(expected = InvalidRecordException.class)
public void testIsValidWithTooSmallBuffer() {
ByteBuffer buffer = ByteBuffer.allocate(2);
Record record = new Record(buffer);
assertFalse(record.isValid());
- try {
- record.ensureValid();
- fail("InvalidRecordException should have been thrown");
- } catch (InvalidRecordException e) { }
+ record.ensureValid();
}
- @Test
+ @Test(expected = InvalidRecordException.class)
public void testIsValidWithChecksumMismatch() {
ByteBuffer buffer = ByteBuffer.allocate(4);
// set checksum
buffer.putInt(2);
Record record = new Record(buffer);
assertFalse(record.isValid());
- try {
- record.ensureValid();
- fail("InvalidRecordException should have been thrown");
- } catch (InvalidRecordException e) { }
+ record.ensureValid();
}
@Test
@@ -63,4 +57,40 @@ public class SimpleRecordTest {
record.ensureValid();
}
+ @Test
+ public void testConvertFromV0ToV1() {
+ byte[][] keys = new byte[][] {"a".getBytes(), "".getBytes(), null, "b".getBytes()};
+ byte[][] values = new byte[][] {"1".getBytes(), "".getBytes(), "2".getBytes(), null};
+
+ for (int i = 0; i < keys.length; i++) {
+ Record record = Record.create(Record.MAGIC_VALUE_V0, Record.NO_TIMESTAMP, keys[i], values[i]);
+ Record converted = record.convert(Record.MAGIC_VALUE_V1);
+
+ assertEquals(Record.MAGIC_VALUE_V1, converted.magic());
+ assertEquals(Record.NO_TIMESTAMP, converted.timestamp());
+ assertEquals(record.key(), converted.key());
+ assertEquals(record.value(), converted.value());
+ assertTrue(record.isValid());
+ assertEquals(record.convertedSize(Record.MAGIC_VALUE_V1), converted.sizeInBytes());
+ }
+ }
+
+ @Test
+ public void testConvertFromV1ToV0() {
+ byte[][] keys = new byte[][] {"a".getBytes(), "".getBytes(), null, "b".getBytes()};
+ byte[][] values = new byte[][] {"1".getBytes(), "".getBytes(), "2".getBytes(), null};
+
+ for (int i = 0; i < keys.length; i++) {
+ Record record = Record.create(Record.MAGIC_VALUE_V1, System.currentTimeMillis(), keys[i], values[i]);
+ Record converted = record.convert(Record.MAGIC_VALUE_V0);
+
+ assertEquals(Record.MAGIC_VALUE_V0, converted.magic());
+ assertEquals(Record.NO_TIMESTAMP, converted.timestamp());
+ assertEquals(record.key(), converted.key());
+ assertEquals(record.value(), converted.value());
+ assertTrue(record.isValid());
+ assertEquals(record.convertedSize(Record.MAGIC_VALUE_V0), converted.sizeInBytes());
+ }
+ }
+
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/67f1e5b9/clients/src/test/java/org/apache/kafka/common/record/TimestampTypeTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/record/TimestampTypeTest.java b/clients/src/test/java/org/apache/kafka/common/record/TimestampTypeTest.java
new file mode 100644
index 0000000..4759715
--- /dev/null
+++ b/clients/src/test/java/org/apache/kafka/common/record/TimestampTypeTest.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+package org.apache.kafka.common.record;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+public class TimestampTypeTest {
+
+ @Test
+ public void toAndFromAttributesCreateTime() {
+ byte attributes = TimestampType.CREATE_TIME.updateAttributes((byte) 0);
+ assertEquals(TimestampType.CREATE_TIME, TimestampType.forAttributes(attributes));
+ }
+
+ @Test
+ public void toAndFromAttributesLogAppendTime() {
+ byte attributes = TimestampType.LOG_APPEND_TIME.updateAttributes((byte) 0);
+ assertEquals(TimestampType.LOG_APPEND_TIME, TimestampType.forAttributes(attributes));
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/67f1e5b9/clients/src/test/java/org/apache/kafka/test/TestUtils.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/test/TestUtils.java b/clients/src/test/java/org/apache/kafka/test/TestUtils.java
index d3280e5..4e80b61 100644
--- a/clients/src/test/java/org/apache/kafka/test/TestUtils.java
+++ b/clients/src/test/java/org/apache/kafka/test/TestUtils.java
@@ -23,8 +23,10 @@ import org.apache.kafka.common.Node;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.record.CompressionType;
import org.apache.kafka.common.record.MemoryRecords;
+import org.apache.kafka.common.record.MemoryRecordsBuilder;
import org.apache.kafka.common.record.Record;
import org.apache.kafka.common.record.Records;
+import org.apache.kafka.common.record.TimestampType;
import org.apache.kafka.common.utils.Utils;
import javax.xml.bind.DatatypeConverter;
@@ -35,6 +37,7 @@ import java.nio.file.Files;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Collections;
+import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
@@ -46,6 +49,7 @@ import java.util.regex.Pattern;
import static java.util.Arrays.asList;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
@@ -185,13 +189,13 @@ public class TestUtils {
public static ByteBuffer partitionRecordsBuffer(final long offset, final CompressionType compressionType, final Record... records) {
int bufferSize = 0;
for (final Record record : records)
- bufferSize += Records.LOG_OVERHEAD + record.size();
+ bufferSize += Records.LOG_OVERHEAD + record.sizeInBytes();
final ByteBuffer buffer = ByteBuffer.allocate(bufferSize);
- final MemoryRecords memoryRecords = MemoryRecords.emptyRecords(buffer, compressionType);
+ MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, compressionType, TimestampType.CREATE_TIME);
+ long nextOffset = offset;
for (final Record record : records)
- memoryRecords.append(offset, record);
- memoryRecords.close();
- return memoryRecords.buffer();
+ builder.append(nextOffset++, record);
+ return builder.build().buffer();
}
public static Properties producerConfig(final String bootstrapServers,
@@ -309,4 +313,22 @@ public class TestUtils {
fail(clusterId + " cannot be converted back to UUID.");
}
}
+
+ /**
+ * Throw an exception if the two iterators are of differing lengths or contain
+ * different messages on their Nth element
+ */
+ public static <T> void checkEquals(Iterator<T> s1, Iterator<T> s2) {
+ while (s1.hasNext() && s2.hasNext())
+ assertEquals(s1.next(), s2.next());
+ assertFalse("Iterators have uneven length--first has more", s1.hasNext());
+ assertFalse("Iterators have uneven length--second has more", s2.hasNext());
+ }
+
+ public static <T> List<T> toList(Iterator<T> iterator) {
+ List<T> res = new ArrayList<>();
+ while (iterator.hasNext())
+ res.add(iterator.next());
+ return res;
+ }
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/67f1e5b9/core/src/main/scala/kafka/api/ApiVersion.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/ApiVersion.scala b/core/src/main/scala/kafka/api/ApiVersion.scala
index 895c1b1..4052639 100644
--- a/core/src/main/scala/kafka/api/ApiVersion.scala
+++ b/core/src/main/scala/kafka/api/ApiVersion.scala
@@ -17,7 +17,7 @@
package kafka.api
-import kafka.message.Message
+import org.apache.kafka.common.record.Record
/**
* This class contains the different Kafka versions.
@@ -87,54 +87,54 @@ sealed trait ApiVersion extends Ordered[ApiVersion] {
// Keep the IDs in order of versions
case object KAFKA_0_8_0 extends ApiVersion {
val version: String = "0.8.0.X"
- val messageFormatVersion: Byte = Message.MagicValue_V0
+ val messageFormatVersion: Byte = Record.MAGIC_VALUE_V0
val id: Int = 0
}
case object KAFKA_0_8_1 extends ApiVersion {
val version: String = "0.8.1.X"
- val messageFormatVersion: Byte = Message.MagicValue_V0
+ val messageFormatVersion: Byte = Record.MAGIC_VALUE_V0
val id: Int = 1
}
case object KAFKA_0_8_2 extends ApiVersion {
val version: String = "0.8.2.X"
- val messageFormatVersion: Byte = Message.MagicValue_V0
+ val messageFormatVersion: Byte = Record.MAGIC_VALUE_V0
val id: Int = 2
}
case object KAFKA_0_9_0 extends ApiVersion {
val version: String = "0.9.0.X"
- val messageFormatVersion: Byte = Message.MagicValue_V0
+ val messageFormatVersion: Byte = Record.MAGIC_VALUE_V0
val id: Int = 3
}
case object KAFKA_0_10_0_IV0 extends ApiVersion {
val version: String = "0.10.0-IV0"
- val messageFormatVersion: Byte = Message.MagicValue_V1
+ val messageFormatVersion: Byte = Record.MAGIC_VALUE_V1
val id: Int = 4
}
case object KAFKA_0_10_0_IV1 extends ApiVersion {
val version: String = "0.10.0-IV1"
- val messageFormatVersion: Byte = Message.MagicValue_V1
+ val messageFormatVersion: Byte = Record.MAGIC_VALUE_V1
val id: Int = 5
}
case object KAFKA_0_10_1_IV0 extends ApiVersion {
val version: String = "0.10.1-IV0"
- val messageFormatVersion: Byte = Message.MagicValue_V1
+ val messageFormatVersion: Byte = Record.MAGIC_VALUE_V1
val id: Int = 6
}
case object KAFKA_0_10_1_IV1 extends ApiVersion {
val version: String = "0.10.1-IV1"
- val messageFormatVersion: Byte = Message.MagicValue_V1
+ val messageFormatVersion: Byte = Record.MAGIC_VALUE_V1
val id: Int = 7
}
case object KAFKA_0_10_1_IV2 extends ApiVersion {
val version: String = "0.10.1-IV2"
- val messageFormatVersion: Byte = Message.MagicValue_V1
+ val messageFormatVersion: Byte = Record.MAGIC_VALUE_V1
val id: Int = 8
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/67f1e5b9/core/src/main/scala/kafka/cluster/Partition.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala
index 7e52a91..9eb92cd 100755
--- a/core/src/main/scala/kafka/cluster/Partition.scala
+++ b/core/src/main/scala/kafka/cluster/Partition.scala
@@ -25,7 +25,6 @@ import kafka.log.LogConfig
import kafka.server._
import kafka.metrics.KafkaMetricsGroup
import kafka.controller.KafkaController
-import kafka.message.ByteBufferMessageSet
import java.io.IOException
import java.util.concurrent.locks.ReentrantReadWriteLock
@@ -34,6 +33,7 @@ import org.apache.kafka.common.protocol.Errors
import scala.collection.JavaConverters._
import com.yammer.metrics.core.Gauge
+import org.apache.kafka.common.record.MemoryRecords
import org.apache.kafka.common.requests.PartitionState
import org.apache.kafka.common.utils.Time
@@ -190,7 +190,7 @@ class Partition(val topic: String,
allReplicas.foreach(replica => getOrCreateReplica(replica))
val newInSyncReplicas = partitionStateInfo.isr.asScala.map(r => getOrCreateReplica(r)).toSet
// remove assigned replicas that have been removed by the controller
- (assignedReplicas().map(_.brokerId) -- allReplicas).foreach(removeReplica(_))
+ (assignedReplicas().map(_.brokerId) -- allReplicas).foreach(removeReplica)
inSyncReplicas = newInSyncReplicas
leaderEpoch = partitionStateInfo.leaderEpoch
zkVersion = partitionStateInfo.zkVersion
@@ -440,7 +440,7 @@ class Partition(val topic: String,
laggingReplicas
}
- def appendMessagesToLeader(messages: ByteBufferMessageSet, requiredAcks: Int = 0) = {
+ def appendRecordsToLeader(records: MemoryRecords, requiredAcks: Int = 0) = {
val (info, leaderHWIncremented) = inReadLock(leaderIsrUpdateLock) {
val leaderReplicaOpt = leaderReplicaIfLocal()
leaderReplicaOpt match {
@@ -455,7 +455,7 @@ class Partition(val topic: String,
.format(topic, partitionId, inSyncSize, minIsr))
}
- val info = log.append(messages, assignOffsets = true)
+ val info = log.append(records, assignOffsets = true)
// probably unblock some follower fetch requests since log end offset has been updated
replicaManager.tryCompleteDelayedFetch(TopicPartitionOperationKey(this.topic, this.partitionId))
// we may need to increment high watermark since ISR could be down to 1
@@ -480,7 +480,7 @@ class Partition(val topic: String,
newLeaderAndIsr, controllerEpoch, zkVersion)
if(updateSucceeded) {
- replicaManager.recordIsrChange(new TopicAndPartition(topic, partitionId))
+ replicaManager.recordIsrChange(TopicAndPartition(topic, partitionId))
inSyncReplicas = newIsr
zkVersion = newVersion
trace("ISR updated to [%s] and zkVersion updated to [%d]".format(newIsr.mkString(","), zkVersion))
http://git-wip-us.apache.org/repos/asf/kafka/blob/67f1e5b9/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala b/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala
index c47efb7..f702b9d 100644
--- a/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala
+++ b/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala
@@ -17,14 +17,16 @@
package kafka.consumer
-import kafka.api.{OffsetRequest, Request, FetchRequestBuilder, FetchResponsePartitionData}
+import kafka.api.{FetchRequestBuilder, FetchResponsePartitionData, OffsetRequest, Request}
import kafka.cluster.BrokerEndPoint
import kafka.message.ByteBufferMessageSet
-import kafka.server.{PartitionFetchState, AbstractFetcherThread}
+import kafka.server.{AbstractFetcherThread, PartitionFetchState}
import kafka.common.{ErrorMapping, TopicAndPartition}
+
import scala.collection.Map
import ConsumerFetcherThread._
import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.record.MemoryRecords
class ConsumerFetcherThread(name: String,
val config: ConsumerConfig,
@@ -81,7 +83,7 @@ class ConsumerFetcherThread(name: String,
case OffsetRequest.LargestTimeString => OffsetRequest.LatestTime
case _ => OffsetRequest.LatestTime
}
- val topicAndPartition = new TopicAndPartition(topicPartition.topic, topicPartition.partition)
+ val topicAndPartition = TopicAndPartition(topicPartition.topic, topicPartition.partition)
val newOffset = simpleConsumer.earliestOrLatestOffset(topicAndPartition, startTimestamp, Request.OrdinaryConsumerId)
val pti = partitionMap(topicPartition)
pti.resetFetchOffset(newOffset)
@@ -123,7 +125,7 @@ object ConsumerFetcherThread {
class PartitionData(val underlying: FetchResponsePartitionData) extends AbstractFetcherThread.PartitionData {
def errorCode: Short = underlying.error
- def toByteBufferMessageSet: ByteBufferMessageSet = underlying.messages.asInstanceOf[ByteBufferMessageSet]
+ def toRecords: MemoryRecords = underlying.messages.asInstanceOf[ByteBufferMessageSet].asRecords
def highWatermark: Long = underlying.hw
def exception: Option[Throwable] =
if (errorCode == ErrorMapping.NoError) None else Some(ErrorMapping.exceptionFor(errorCode))
http://git-wip-us.apache.org/repos/asf/kafka/blob/67f1e5b9/core/src/main/scala/kafka/coordinator/GroupCoordinator.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/GroupCoordinator.scala b/core/src/main/scala/kafka/coordinator/GroupCoordinator.scala
index 0c53345..db40482 100644
--- a/core/src/main/scala/kafka/coordinator/GroupCoordinator.scala
+++ b/core/src/main/scala/kafka/coordinator/GroupCoordinator.scala
@@ -458,14 +458,14 @@ class GroupCoordinator(val brokerId: Int,
def handleFetchOffsets(groupId: String,
partitions: Seq[TopicPartition]): Map[TopicPartition, OffsetFetchResponse.PartitionData] = {
if (!isActive.get) {
- partitions.map { case topicPartition =>
+ partitions.map { topicPartition =>
(topicPartition, new OffsetFetchResponse.PartitionData(OffsetFetchResponse.INVALID_OFFSET, "", Errors.GROUP_COORDINATOR_NOT_AVAILABLE.code))}.toMap
} else if (!isCoordinatorForGroup(groupId)) {
debug("Could not fetch offsets for group %s (not group coordinator).".format(groupId))
- partitions.map { case topicPartition =>
+ partitions.map { topicPartition =>
(topicPartition, new OffsetFetchResponse.PartitionData(OffsetFetchResponse.INVALID_OFFSET, "", Errors.NOT_COORDINATOR_FOR_GROUP.code))}.toMap
} else if (isCoordinatorLoadingInProgress(groupId)) {
- partitions.map { case topicPartition =>
+ partitions.map { topicPartition =>
(topicPartition, new OffsetFetchResponse.PartitionData(OffsetFetchResponse.INVALID_OFFSET, "", Errors.GROUP_LOAD_IN_PROGRESS.code))}.toMap
} else {
// return offsets blindly regardless the current group state since the group may be using
http://git-wip-us.apache.org/repos/asf/kafka/blob/67f1e5b9/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala b/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala
index e55bcaa..a97b527 100644
--- a/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala
+++ b/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala
@@ -17,38 +17,31 @@
package kafka.coordinator
-import org.apache.kafka.common.TopicPartition
-import org.apache.kafka.common.protocol.Errors
-import org.apache.kafka.common.protocol.types.{ArrayOf, Field, Schema, Struct}
-import org.apache.kafka.common.protocol.types.Type.STRING
-import org.apache.kafka.common.protocol.types.Type.NULLABLE_STRING
-import org.apache.kafka.common.protocol.types.Type.INT32
-import org.apache.kafka.common.protocol.types.Type.INT64
-import org.apache.kafka.common.protocol.types.Type.BYTES
-import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse
-import org.apache.kafka.common.requests.OffsetFetchResponse
-import org.apache.kafka.common.utils.Utils
-import org.apache.kafka.common.utils.Time
-import org.apache.kafka.clients.consumer.ConsumerRecord
-import kafka.utils._
-import kafka.common._
-import kafka.message._
-import kafka.log.FileMessageSet
-import kafka.metrics.KafkaMetricsGroup
-import kafka.common.TopicAndPartition
-import kafka.common.MessageFormatter
-import kafka.server.ReplicaManager
-
-import scala.collection._
import java.io.PrintStream
import java.nio.ByteBuffer
-import java.util.concurrent.atomic.AtomicBoolean
import java.util.concurrent.TimeUnit
+import java.util.concurrent.atomic.AtomicBoolean
import java.util.concurrent.locks.ReentrantLock
import com.yammer.metrics.core.Gauge
import kafka.api.{ApiVersion, KAFKA_0_10_1_IV0}
+import kafka.common.{MessageFormatter, TopicAndPartition, _}
+import kafka.metrics.KafkaMetricsGroup
+import kafka.server.ReplicaManager
import kafka.utils.CoreUtils.inLock
+import kafka.utils._
+import org.apache.kafka.clients.consumer.ConsumerRecord
+import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.protocol.Errors
+import org.apache.kafka.common.protocol.types.Type._
+import org.apache.kafka.common.protocol.types.{ArrayOf, Field, Schema, Struct}
+import org.apache.kafka.common.record._
+import org.apache.kafka.common.requests.OffsetFetchResponse
+import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse
+import org.apache.kafka.common.utils.{Time, Utils}
+
+import scala.collection.JavaConverters._
+import scala.collection._
class GroupMetadataManager(val brokerId: Int,
val interBrokerProtocolVersion: ApiVersion,
@@ -57,6 +50,8 @@ class GroupMetadataManager(val brokerId: Int,
zkUtils: ZkUtils,
time: Time) extends Logging with KafkaMetricsGroup {
+ private val compressionType: CompressionType = CompressionType.forId(config.offsetsTopicCompressionCodec.codec)
+
private val groupMetadataCache = new Pool[String, GroupMetadata]
/* lock protecting access to loading and owned partition sets */
@@ -135,13 +130,11 @@ class GroupMetadataManager(val brokerId: Int,
}
}
-
def prepareStoreGroup(group: GroupMetadata,
groupAssignment: Map[String, Array[Byte]],
responseCallback: Errors => Unit): Option[DelayedStore] = {
- val magicValueAndTimestampOpt = getMessageFormatVersionAndTimestamp(partitionFor(group.groupId))
- magicValueAndTimestampOpt match {
- case Some((magicValue, timestamp)) =>
+ getMagicAndTimestamp(partitionFor(group.groupId)) match {
+ case Some((magicValue, timestampType, timestamp)) =>
val groupMetadataValueVersion = {
if (interBrokerProtocolVersion < KAFKA_0_10_1_IV0)
0.toShort
@@ -149,17 +142,12 @@ class GroupMetadataManager(val brokerId: Int,
GroupMetadataManager.CURRENT_GROUP_VALUE_SCHEMA_VERSION
}
- val message = new Message(
- key = GroupMetadataManager.groupMetadataKey(group.groupId),
- bytes = GroupMetadataManager.groupMetadataValue(group, groupAssignment, version = groupMetadataValueVersion),
- timestamp = timestamp,
- magicValue = magicValue)
+ val record = Record.create(magicValue, timestampType, timestamp,
+ GroupMetadataManager.groupMetadataKey(group.groupId),
+ GroupMetadataManager.groupMetadataValue(group, groupAssignment, version = groupMetadataValueVersion))
val groupMetadataPartition = new TopicPartition(Topic.GroupMetadataTopicName, partitionFor(group.groupId))
-
- val groupMetadataMessageSet = Map(groupMetadataPartition ->
- new ByteBufferMessageSet(config.offsetsTopicCompressionCodec, message))
-
+ val groupMetadataRecords = Map(groupMetadataPartition -> MemoryRecords.withRecords(timestampType, compressionType, record))
val generationId = group.generationId
// set the callback function to insert the created group into cache after log append completed
@@ -212,7 +200,7 @@ class GroupMetadataManager(val brokerId: Int,
responseCallback(responseError)
}
- Some(DelayedStore(groupMetadataMessageSet, putCacheCallback))
+ Some(DelayedStore(groupMetadataRecords, putCacheCallback))
case None =>
responseCallback(Errors.NOT_COORDINATOR_FOR_GROUP)
@@ -222,11 +210,11 @@ class GroupMetadataManager(val brokerId: Int,
def store(delayedStore: DelayedStore) {
// call replica manager to append the group message
- replicaManager.appendMessages(
+ replicaManager.appendRecords(
config.offsetCommitTimeoutMs.toLong,
config.offsetCommitRequiredAcks,
true, // allow appending to internal offset topic
- delayedStore.messageSet,
+ delayedStore.partitionRecords,
delayedStore.callback)
}
@@ -244,22 +232,17 @@ class GroupMetadataManager(val brokerId: Int,
}
// construct the message set to append
- val magicValueAndTimestampOpt = getMessageFormatVersionAndTimestamp(partitionFor(group.groupId))
- magicValueAndTimestampOpt match {
- case Some((magicValue, timestamp)) =>
- val messages = filteredOffsetMetadata.map { case (topicAndPartition, offsetAndMetadata) =>
- new Message(
- key = GroupMetadataManager.offsetCommitKey(group.groupId, topicAndPartition.topic, topicAndPartition.partition),
- bytes = GroupMetadataManager.offsetCommitValue(offsetAndMetadata),
- timestamp = timestamp,
- magicValue = magicValue
- )
+ getMagicAndTimestamp(partitionFor(group.groupId)) match {
+ case Some((magicValue, timestampType, timestamp)) =>
+ val records = filteredOffsetMetadata.map { case (topicAndPartition, offsetAndMetadata) =>
+ Record.create(magicValue, timestampType, timestamp,
+ GroupMetadataManager.offsetCommitKey(group.groupId, topicAndPartition.topic, topicAndPartition.partition),
+ GroupMetadataManager.offsetCommitValue(offsetAndMetadata))
}.toSeq
val offsetTopicPartition = new TopicPartition(Topic.GroupMetadataTopicName, partitionFor(group.groupId))
- val offsetsAndMetadataMessageSet = Map(offsetTopicPartition ->
- new ByteBufferMessageSet(config.offsetsTopicCompressionCodec, messages:_*))
+ val entries = Map(offsetTopicPartition -> MemoryRecords.withRecords(timestampType, compressionType, records:_*))
// set the callback function to insert offsets into cache after log append completed
def putCacheCallback(responseStatus: Map[TopicPartition, PartitionResponse]) {
@@ -330,7 +313,7 @@ class GroupMetadataManager(val brokerId: Int,
group.prepareOffsetCommit(offsetMetadata)
}
- Some(DelayedStore(offsetsAndMetadataMessageSet, putCacheCallback))
+ Some(DelayedStore(entries, putCacheCallback))
case None =>
val commitStatus = offsetMetadata.map { case (topicAndPartition, offsetAndMetadata) =>
@@ -412,28 +395,30 @@ class GroupMetadataManager(val brokerId: Int,
while (currOffset < getHighWatermark(offsetsPartition) && !shuttingDown.get()) {
buffer.clear()
- val messages = log.read(currOffset, config.loadBufferSize, minOneMessage = true).messageSet.asInstanceOf[FileMessageSet]
- messages.readInto(buffer, 0)
- val messageSet = new ByteBufferMessageSet(buffer)
- messageSet.foreach { msgAndOffset =>
- require(msgAndOffset.message.key != null, "Offset entry key should not be null")
- val baseKey = GroupMetadataManager.readMessageKey(msgAndOffset.message.key)
+ val fileRecords = log.read(currOffset, config.loadBufferSize, minOneMessage = true).records.asInstanceOf[FileRecords]
+ fileRecords.readInto(buffer, 0)
+
+ MemoryRecords.readableRecords(buffer).deepIterator.asScala.foreach { entry =>
+ val record = entry.record
+
+ require(record.hasKey, "Offset entry key should not be null")
+ val baseKey = GroupMetadataManager.readMessageKey(record.key)
if (baseKey.isInstanceOf[OffsetKey]) {
// load offset
val key = baseKey.key.asInstanceOf[GroupTopicPartition]
- if (msgAndOffset.message.payload == null) {
+ if (record.hasNullValue) {
loadedOffsets.remove(key)
removedOffsets.add(key)
} else {
- val value = GroupMetadataManager.readOffsetMessageValue(msgAndOffset.message.payload)
+ val value = GroupMetadataManager.readOffsetMessageValue(record.value)
loadedOffsets.put(key, value)
removedOffsets.remove(key)
}
} else {
// load group metadata
val groupId = baseKey.key.asInstanceOf[String]
- val groupMetadata = GroupMetadataManager.readGroupMessageValue(groupId, msgAndOffset.message.payload)
+ val groupMetadata = GroupMetadataManager.readGroupMessageValue(groupId, record.value)
if (groupMetadata != null) {
trace(s"Loaded group metadata for group ${groupMetadata.groupId} with generation ${groupMetadata.generationId}")
removedGroups.remove(groupId)
@@ -444,7 +429,7 @@ class GroupMetadataManager(val brokerId: Int,
}
}
- currOffset = msgAndOffset.nextOffset
+ currOffset = entry.nextOffset
}
}
@@ -467,8 +452,8 @@ class GroupMetadataManager(val brokerId: Int,
removedGroups.foreach { groupId =>
if (groupMetadataCache.contains(groupId))
- throw new IllegalStateException(s"Unexpected unload of active group ${groupId} while " +
- s"loading partition ${topicPartition}")
+ throw new IllegalStateException(s"Unexpected unload of active group $groupId while " +
+ s"loading partition $topicPartition")
}
if (!shuttingDown.get())
@@ -572,15 +557,15 @@ class GroupMetadataManager(val brokerId: Int,
}
val offsetsPartition = partitionFor(groupId)
- getMessageFormatVersionAndTimestamp(offsetsPartition) match {
- case Some((magicValue, timestamp)) =>
+ getMagicAndTimestamp(offsetsPartition) match {
+ case Some((magicValue, timestampType, timestamp)) =>
val partitionOpt = replicaManager.getPartition(Topic.GroupMetadataTopicName, offsetsPartition)
partitionOpt.foreach { partition =>
val appendPartition = TopicAndPartition(Topic.GroupMetadataTopicName, offsetsPartition)
val tombstones = expiredOffsets.map { case (topicPartition, offsetAndMetadata) =>
trace(s"Removing expired offset and metadata for $groupId, $topicPartition: $offsetAndMetadata")
val commitKey = GroupMetadataManager.offsetCommitKey(groupId, topicPartition.topic, topicPartition.partition)
- new Message(bytes = null, key = commitKey, timestamp = timestamp, magicValue = magicValue)
+ Record.create(magicValue, timestampType, timestamp, commitKey, null)
}.toBuffer
trace(s"Marked ${expiredOffsets.size} offsets in $appendPartition for deletion.")
@@ -590,8 +575,7 @@ class GroupMetadataManager(val brokerId: Int,
// Append the tombstone messages to the partition. It is okay if the replicas don't receive these (say,
// if we crash or leaders move) since the new leaders will still expire the consumers with heartbeat and
// retry removing this group.
- tombstones += new Message(bytes = null, key = GroupMetadataManager.groupMetadataKey(group.groupId),
- timestamp = timestamp, magicValue = magicValue)
+ tombstones += Record.create(magicValue, timestampType, timestamp, GroupMetadataManager.groupMetadataKey(group.groupId), null)
trace(s"Group $groupId removed from the metadata cache and marked for deletion in $appendPartition.")
}
@@ -599,7 +583,7 @@ class GroupMetadataManager(val brokerId: Int,
try {
// do not need to require acks since even if the tombstone is lost,
// it will be appended again in the next purge cycle
- partition.appendMessagesToLeader(new ByteBufferMessageSet(config.offsetsTopicCompressionCodec, tombstones: _*))
+ partition.appendRecordsToLeader(MemoryRecords.withRecords(timestampType, compressionType, tombstones: _*))
offsetsRemoved += expiredOffsets.size
trace(s"Successfully appended ${tombstones.size} tombstones to $appendPartition for expired offsets and/or metadata for group $groupId")
} catch {
@@ -663,16 +647,11 @@ class GroupMetadataManager(val brokerId: Int,
* @param partition Partition of GroupMetadataTopic
* @return Option[(MessageFormatVersion, TimeStamp)] if replica is local, None otherwise
*/
- private def getMessageFormatVersionAndTimestamp(partition: Int): Option[(Byte, Long)] = {
+ private def getMagicAndTimestamp(partition: Int): Option[(Byte, TimestampType, Long)] = {
val groupMetadataTopicAndPartition = TopicAndPartition(Topic.GroupMetadataTopicName, partition)
- replicaManager.getMessageFormatVersion(groupMetadataTopicAndPartition).map { messageFormatVersion =>
- val timestamp = {
- if (messageFormatVersion == Message.MagicValue_V0)
- Message.NoTimestamp
- else
- time.milliseconds()
- }
- (messageFormatVersion, timestamp)
+ replicaManager.getMagicAndTimestampType(groupMetadataTopicAndPartition).map { case (messageFormatVersion, timestampType) =>
+ val timestamp = if (messageFormatVersion == Record.MAGIC_VALUE_V0) Record.NO_TIMESTAMP else time.milliseconds()
+ (messageFormatVersion, timestampType, timestamp)
}
}
@@ -964,7 +943,7 @@ object GroupMetadataManager {
* @return an offset-metadata object from the message
*/
def readOffsetMessageValue(buffer: ByteBuffer): OffsetAndMetadata = {
- if(buffer == null) { // tombstone
+ if (buffer == null) { // tombstone
null
} else {
val version = buffer.getShort
@@ -997,7 +976,7 @@ object GroupMetadataManager {
* @return a group metadata object from the message
*/
def readGroupMessageValue(groupId: String, buffer: ByteBuffer): GroupMetadata = {
- if(buffer == null) { // tombstone
+ if (buffer == null) { // tombstone
null
} else {
val version = buffer.getShort
@@ -1016,23 +995,22 @@ object GroupMetadataManager {
group.leaderId = value.get(LEADER_KEY).asInstanceOf[String]
group.protocol = value.get(PROTOCOL_KEY).asInstanceOf[String]
- memberMetadataArray.foreach {
- case memberMetadataObj =>
- val memberMetadata = memberMetadataObj.asInstanceOf[Struct]
- val memberId = memberMetadata.get(MEMBER_ID_KEY).asInstanceOf[String]
- val clientId = memberMetadata.get(CLIENT_ID_KEY).asInstanceOf[String]
- val clientHost = memberMetadata.get(CLIENT_HOST_KEY).asInstanceOf[String]
- val sessionTimeout = memberMetadata.get(SESSION_TIMEOUT_KEY).asInstanceOf[Int]
- val rebalanceTimeout = if (version == 0) sessionTimeout else memberMetadata.get(REBALANCE_TIMEOUT_KEY).asInstanceOf[Int]
+ memberMetadataArray.foreach { memberMetadataObj =>
+ val memberMetadata = memberMetadataObj.asInstanceOf[Struct]
+ val memberId = memberMetadata.get(MEMBER_ID_KEY).asInstanceOf[String]
+ val clientId = memberMetadata.get(CLIENT_ID_KEY).asInstanceOf[String]
+ val clientHost = memberMetadata.get(CLIENT_HOST_KEY).asInstanceOf[String]
+ val sessionTimeout = memberMetadata.get(SESSION_TIMEOUT_KEY).asInstanceOf[Int]
+ val rebalanceTimeout = if (version == 0) sessionTimeout else memberMetadata.get(REBALANCE_TIMEOUT_KEY).asInstanceOf[Int]
- val subscription = Utils.toArray(memberMetadata.get(SUBSCRIPTION_KEY).asInstanceOf[ByteBuffer])
+ val subscription = Utils.toArray(memberMetadata.get(SUBSCRIPTION_KEY).asInstanceOf[ByteBuffer])
- val member = new MemberMetadata(memberId, groupId, clientId, clientHost, rebalanceTimeout, sessionTimeout,
- protocolType, List((group.protocol, subscription)))
+ val member = new MemberMetadata(memberId, groupId, clientId, clientHost, rebalanceTimeout, sessionTimeout,
+ protocolType, List((group.protocol, subscription)))
- member.assignment = Utils.toArray(memberMetadata.get(ASSIGNMENT_KEY).asInstanceOf[ByteBuffer])
+ member.assignment = Utils.toArray(memberMetadata.get(ASSIGNMENT_KEY).asInstanceOf[ByteBuffer])
- group.add(memberId, member)
+ group.add(memberId, member)
}
group
@@ -1087,7 +1065,7 @@ object GroupMetadataManager {
}
-case class DelayedStore(messageSet: Map[TopicPartition, MessageSet],
+case class DelayedStore(partitionRecords: Map[TopicPartition, MemoryRecords],
callback: Map[TopicPartition, PartitionResponse] => Unit)
case class GroupTopicPartition(group: String, topicPartition: TopicPartition) {
http://git-wip-us.apache.org/repos/asf/kafka/blob/67f1e5b9/core/src/main/scala/kafka/log/FileMessageSet.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/FileMessageSet.scala b/core/src/main/scala/kafka/log/FileMessageSet.scala
deleted file mode 100755
index 506f5b9..0000000
--- a/core/src/main/scala/kafka/log/FileMessageSet.scala
+++ /dev/null
@@ -1,445 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kafka.log
-
-import java.io._
-import java.nio._
-import java.nio.channels._
-import java.util.concurrent.atomic._
-import java.util.concurrent.TimeUnit
-
-import kafka.utils._
-import kafka.message._
-import kafka.common.KafkaException
-import kafka.metrics.{KafkaMetricsGroup, KafkaTimer}
-import org.apache.kafka.common.errors.CorruptRecordException
-import org.apache.kafka.common.record.FileRecords
-import org.apache.kafka.common.utils.Utils
-
-import scala.collection.mutable.ArrayBuffer
-
-/**
- * An on-disk message set. An optional start and end position can be applied to the message set
- * which will allow slicing a subset of the file.
- * @param file The file name for the underlying log data
- * @param channel the underlying file channel used
- * @param start A lower bound on the absolute position in the file from which the message set begins
- * @param end The upper bound on the absolute position in the file at which the message set ends
- * @param isSlice Should the start and end parameters be used for slicing?
- */
-@nonthreadsafe
-class FileMessageSet private[kafka](@volatile var file: File,
- private[log] val channel: FileChannel,
- private[log] val start: Int,
- private[log] val end: Int,
- isSlice: Boolean) extends MessageSet {
- /* the size of the message set in bytes */
- private val _size =
- if(isSlice)
- new AtomicInteger(end - start) // don't check the file size if this is just a slice view
- else
- new AtomicInteger(math.min(channel.size.toInt, end) - start)
-
- /* if this is not a slice, update the file pointer to the end of the file */
- if (!isSlice)
- /* set the file position to the last byte in the file */
- channel.position(math.min(channel.size.toInt, end))
-
- /**
- * Create a file message set with no slicing.
- */
- def this(file: File, channel: FileChannel) =
- this(file, channel, start = 0, end = Int.MaxValue, isSlice = false)
-
- /**
- * Create a file message set with no slicing
- */
- def this(file: File) =
- this(file, FileMessageSet.openChannel(file, mutable = true))
-
- /**
- * Create a file message set with no slicing, and with initFileSize and preallocate.
- * For windows NTFS and some old LINUX file system, set preallocate to true and initFileSize
- * with one value (for example 512 * 1024 *1024 ) can improve the kafka produce performance.
- * If it's new file and preallocate is true, end will be set to 0. Otherwise set to Int.MaxValue.
- */
- def this(file: File, fileAlreadyExists: Boolean, initFileSize: Int, preallocate: Boolean) =
- this(file,
- channel = FileMessageSet.openChannel(file, mutable = true, fileAlreadyExists, initFileSize, preallocate),
- start = 0,
- end = if (!fileAlreadyExists && preallocate) 0 else Int.MaxValue,
- isSlice = false)
-
- /**
- * Create a file message set with mutable option
- */
- def this(file: File, mutable: Boolean) = this(file, FileMessageSet.openChannel(file, mutable))
-
- /**
- * Create a slice view of the file message set that begins and ends at the given byte offsets
- */
- def this(file: File, channel: FileChannel, start: Int, end: Int) =
- this(file, channel, start, end, isSlice = true)
-
- /**
- * Return a message set which is a view into this set starting from the given position and with the given size limit.
- *
- * If the size is beyond the end of the file, the end will be based on the size of the file at the time of the read.
- *
- * If this message set is already sliced, the position will be taken relative to that slicing.
- *
- * @param position The start position to begin the read from
- * @param size The number of bytes after the start position to include
- *
- * @return A sliced wrapper on this message set limited based on the given position and size
- */
- def read(position: Int, size: Int): FileMessageSet = {
- if(position < 0)
- throw new IllegalArgumentException("Invalid position: " + position)
- if(size < 0)
- throw new IllegalArgumentException("Invalid size: " + size)
- new FileMessageSet(file,
- channel,
- start = this.start + position,
- end = {
- // Handle the integer overflow
- if (this.start + position + size < 0)
- sizeInBytes()
- else
- math.min(this.start + position + size, sizeInBytes())
- })
- }
-
- override def asRecords: FileRecords = new FileRecords(file, channel, start, end, isSlice)
-
- /**
- * Search forward for the file position of the last offset that is greater than or equal to the target offset
- * and return its physical position and the size of the message (including log overhead) at the returned offset. If
- * no such offsets are found, return null.
- *
- * @param targetOffset The offset to search for.
- * @param startingPosition The starting position in the file to begin searching from.
- */
- def searchForOffsetWithSize(targetOffset: Long, startingPosition: Int): (OffsetPosition, Int) = {
- var position = startingPosition
- val buffer = ByteBuffer.allocate(MessageSet.LogOverhead)
- val size = sizeInBytes()
- while(position + MessageSet.LogOverhead < size) {
- buffer.rewind()
- channel.read(buffer, position)
- if(buffer.hasRemaining)
- throw new IllegalStateException("Failed to read complete buffer for targetOffset %d startPosition %d in %s"
- .format(targetOffset, startingPosition, file.getAbsolutePath))
- buffer.rewind()
- val offset = buffer.getLong()
- val messageSize = buffer.getInt()
- if (messageSize < Message.MinMessageOverhead)
- throw new IllegalStateException("Invalid message size: " + messageSize)
- if (offset >= targetOffset)
- return (OffsetPosition(offset, position), messageSize + MessageSet.LogOverhead)
- position += MessageSet.LogOverhead + messageSize
- }
- null
- }
-
- /**
- * Search forward for the message whose timestamp is greater than or equals to the target timestamp.
- *
- * @param targetTimestamp The timestamp to search for.
- * @param startingPosition The starting position to search.
- * @return The timestamp and offset of the message found. None, if no message is found.
- */
- def searchForTimestamp(targetTimestamp: Long, startingPosition: Int): Option[TimestampOffset] = {
- val messagesToSearch = read(startingPosition, sizeInBytes)
- for (messageAndOffset <- messagesToSearch) {
- val message = messageAndOffset.message
- if (message.timestamp >= targetTimestamp) {
- // We found a message
- message.compressionCodec match {
- case NoCompressionCodec =>
- return Some(TimestampOffset(messageAndOffset.message.timestamp, messageAndOffset.offset))
- case _ =>
- // Iterate over the inner messages to get the exact offset.
- for (innerMessageAndOffset <- ByteBufferMessageSet.deepIterator(messageAndOffset)) {
- val timestamp = innerMessageAndOffset.message.timestamp
- if (timestamp >= targetTimestamp)
- return Some(TimestampOffset(innerMessageAndOffset.message.timestamp, innerMessageAndOffset.offset))
- }
- throw new IllegalStateException(s"The message set (max timestamp = ${message.timestamp}, max offset = ${messageAndOffset.offset}" +
- s" should contain target timestamp $targetTimestamp but it does not.")
- }
- }
- }
- None
- }
-
- /**
- * Return the largest timestamp of the messages after a given position in this file message set.
- * @param startingPosition The starting position.
- * @return The largest timestamp of the messages after the given position.
- */
- def largestTimestampAfter(startingPosition: Int): TimestampOffset = {
- var maxTimestamp = Message.NoTimestamp
- var offsetOfMaxTimestamp = -1L
- val messagesToSearch = read(startingPosition, Int.MaxValue)
- for (messageAndOffset <- messagesToSearch) {
- if (messageAndOffset.message.timestamp > maxTimestamp) {
- maxTimestamp = messageAndOffset.message.timestamp
- offsetOfMaxTimestamp = messageAndOffset.offset
- }
- }
- TimestampOffset(maxTimestamp, offsetOfMaxTimestamp)
- }
-
- /**
- * This method is called before we write messages to the socket using zero-copy transfer. We need to
- * make sure all the messages in the message set have the expected magic value.
- *
- * @param expectedMagicValue the magic value expected
- * @return true if all messages have expected magic value, false otherwise
- */
- override def isMagicValueInAllWrapperMessages(expectedMagicValue: Byte): Boolean = {
- var location = start
- val offsetAndSizeBuffer = ByteBuffer.allocate(MessageSet.LogOverhead)
- val crcAndMagicByteBuffer = ByteBuffer.allocate(Message.CrcLength + Message.MagicLength)
- while (location < end) {
- offsetAndSizeBuffer.rewind()
- channel.read(offsetAndSizeBuffer, location)
- if (offsetAndSizeBuffer.hasRemaining)
- return true
- offsetAndSizeBuffer.rewind()
- offsetAndSizeBuffer.getLong // skip offset field
- val messageSize = offsetAndSizeBuffer.getInt
- if (messageSize < Message.MinMessageOverhead)
- throw new IllegalStateException("Invalid message size: " + messageSize)
- crcAndMagicByteBuffer.rewind()
- channel.read(crcAndMagicByteBuffer, location + MessageSet.LogOverhead)
- if (crcAndMagicByteBuffer.get(Message.MagicOffset) != expectedMagicValue)
- return false
- location += (MessageSet.LogOverhead + messageSize)
- }
- true
- }
-
- /**
- * Convert this message set to use the specified message format.
- */
- def toMessageFormat(toMagicValue: Byte): MessageSet = {
- val offsets = new ArrayBuffer[Long]
- val newMessages = new ArrayBuffer[Message]
- this.foreach { messageAndOffset =>
- val message = messageAndOffset.message
- if (message.compressionCodec == NoCompressionCodec) {
- newMessages += message.toFormatVersion(toMagicValue)
- offsets += messageAndOffset.offset
- } else {
- // File message set only has shallow iterator. We need to do deep iteration here if needed.
- val deepIter = ByteBufferMessageSet.deepIterator(messageAndOffset)
- for (innerMessageAndOffset <- deepIter) {
- newMessages += innerMessageAndOffset.message.toFormatVersion(toMagicValue)
- offsets += innerMessageAndOffset.offset
- }
- }
- }
-
- if (sizeInBytes > 0 && newMessages.isEmpty) {
- // This indicates that the message is too large. We just return all the bytes in the file message set.
- this
- } else {
- // We use the offset seq to assign offsets so the offset of the messages does not change.
- new ByteBufferMessageSet(
- compressionCodec = this.headOption.map(_.message.compressionCodec).getOrElse(NoCompressionCodec),
- offsetSeq = offsets,
- newMessages: _*)
- }
- }
-
- /**
- * Get a shallow iterator over the messages in the set.
- */
- override def iterator: Iterator[MessageAndOffset] = iterator(Int.MaxValue)
-
- /**
- * Get an iterator over the messages in the set. We only do shallow iteration here.
- * @param maxMessageSize A limit on allowable message size to avoid allocating unbounded memory.
- * If we encounter a message larger than this we throw an InvalidMessageException.
- * @return The iterator.
- */
- def iterator(maxMessageSize: Int): Iterator[MessageAndOffset] = {
- new IteratorTemplate[MessageAndOffset] {
- var location = start
- val sizeOffsetLength = 12
- val sizeOffsetBuffer = ByteBuffer.allocate(sizeOffsetLength)
-
- override def makeNext(): MessageAndOffset = {
- if(location + sizeOffsetLength >= end)
- return allDone()
-
- // read the size of the item
- sizeOffsetBuffer.rewind()
- channel.read(sizeOffsetBuffer, location)
- if(sizeOffsetBuffer.hasRemaining)
- return allDone()
-
- sizeOffsetBuffer.rewind()
- val offset = sizeOffsetBuffer.getLong()
- val size = sizeOffsetBuffer.getInt()
- if(size < Message.MinMessageOverhead || location + sizeOffsetLength + size > end)
- return allDone()
- if(size > maxMessageSize)
- throw new CorruptRecordException("Message size exceeds the largest allowable message size (%d).".format(maxMessageSize))
-
- // read the item itself
- val buffer = ByteBuffer.allocate(size)
- channel.read(buffer, location + sizeOffsetLength)
- if(buffer.hasRemaining)
- return allDone()
- buffer.rewind()
-
- // increment the location and return the item
- location += size + sizeOffsetLength
- MessageAndOffset(new Message(buffer), offset)
- }
- }
- }
-
- /**
- * The number of bytes taken up by this file set
- */
- def sizeInBytes(): Int = _size.get()
-
- /**
- * Append these messages to the message set
- */
- def append(messages: ByteBufferMessageSet) {
- val written = messages.writeFullyTo(channel)
- _size.getAndAdd(written)
- }
-
- /**
- * Commit all written data to the physical disk
- */
- def flush() = {
- channel.force(true)
- }
-
- /**
- * Close this message set
- */
- def close() {
- flush()
- trim()
- channel.close()
- }
-
- /**
- * Trim file when close or roll to next file
- */
- def trim() {
- truncateTo(sizeInBytes())
- }
-
- /**
- * Delete this message set from the filesystem
- * @return True iff this message set was deleted.
- */
- def delete(): Boolean = {
- CoreUtils.swallow(channel.close())
- file.delete()
- }
-
- /**
- * Truncate this file message set to the given size in bytes. Note that this API does no checking that the
- * given size falls on a valid message boundary.
- * In some versions of the JDK truncating to the same size as the file message set will cause an
- * update of the files mtime, so truncate is only performed if the targetSize is smaller than the
- * size of the underlying FileChannel.
- * It is expected that no other threads will do writes to the log when this function is called.
- * @param targetSize The size to truncate to. Must be between 0 and sizeInBytes.
- * @return The number of bytes truncated off
- */
- def truncateTo(targetSize: Int): Int = {
- val originalSize = sizeInBytes
- if(targetSize > originalSize || targetSize < 0)
- throw new KafkaException("Attempt to truncate log segment to " + targetSize + " bytes failed, " +
- " size of this log segment is " + originalSize + " bytes.")
- if (targetSize < channel.size.toInt) {
- channel.truncate(targetSize)
- channel.position(targetSize)
- _size.set(targetSize)
- }
- originalSize - targetSize
- }
-
- /**
- * Read from the underlying file into the buffer starting at the given position
- */
- def readInto(buffer: ByteBuffer, relativePosition: Int): ByteBuffer = {
- channel.read(buffer, relativePosition + this.start)
- buffer.flip()
- buffer
- }
-
- /**
- * Rename the file that backs this message set
- * @throws IOException if rename fails.
- */
- def renameTo(f: File) {
- try Utils.atomicMoveWithFallback(file.toPath, f.toPath)
- finally this.file = f
- }
-
-}
-
-object FileMessageSet extends Logging
-{
- //preserve the previous logger name after moving logger aspect from FileMessageSet to companion
- override val loggerName = classOf[FileMessageSet].getName
-
- /**
- * Open a channel for the given file
- * For windows NTFS and some old LINUX file system, set preallocate to true and initFileSize
- * with one value (for example 512 * 1025 *1024 ) can improve the kafka produce performance.
- * @param file File path
- * @param mutable mutable
- * @param fileAlreadyExists File already exists or not
- * @param initFileSize The size used for pre allocate file, for example 512 * 1025 *1024
- * @param preallocate Pre allocate file or not, gotten from configuration.
- */
- def openChannel(file: File, mutable: Boolean, fileAlreadyExists: Boolean = false, initFileSize: Int = 0, preallocate: Boolean = false): FileChannel = {
- if (mutable) {
- if (fileAlreadyExists)
- new RandomAccessFile(file, "rw").getChannel()
- else {
- if (preallocate) {
- val randomAccessFile = new RandomAccessFile(file, "rw")
- randomAccessFile.setLength(initFileSize)
- randomAccessFile.getChannel()
- }
- else
- new RandomAccessFile(file, "rw").getChannel()
- }
- }
- else
- new FileInputStream(file).getChannel()
- }
-}
-
-object LogFlushStats extends KafkaMetricsGroup {
- val logFlushTimer = new KafkaTimer(newTimer("LogFlushRateAndTimeMs", TimeUnit.MILLISECONDS, TimeUnit.SECONDS))
-}
http://git-wip-us.apache.org/repos/asf/kafka/blob/67f1e5b9/core/src/main/scala/kafka/log/Log.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala
index 6acc8d2..d58a066 100644
--- a/core/src/main/scala/kafka/log/Log.scala
+++ b/core/src/main/scala/kafka/log/Log.scala
@@ -19,7 +19,6 @@ package kafka.log
import kafka.api.KAFKA_0_10_0_IV0
import kafka.utils._
-import kafka.message._
import kafka.common._
import kafka.metrics.KafkaMetricsGroup
import kafka.server.{BrokerTopicStats, FetchDataInfo, LogOffsetMetadata}
@@ -29,16 +28,18 @@ import java.util.concurrent.atomic._
import java.text.NumberFormat
import org.apache.kafka.common.errors.{CorruptRecordException, OffsetOutOfRangeException, RecordBatchTooLargeException, RecordTooLargeException, UnsupportedForMessageFormatException}
-import org.apache.kafka.common.record.TimestampType
+import org.apache.kafka.common.record._
import org.apache.kafka.common.requests.ListOffsetRequest
import scala.collection.Seq
import scala.collection.JavaConverters._
import com.yammer.metrics.core.Gauge
import org.apache.kafka.common.utils.{Time, Utils}
+import kafka.message.{BrokerCompressionCodec, CompressionCodec, NoCompressionCodec}
object LogAppendInfo {
- val UnknownLogAppendInfo = LogAppendInfo(-1, -1, Message.NoTimestamp, -1L, Message.NoTimestamp, NoCompressionCodec, NoCompressionCodec, -1, -1, offsetsMonotonic = false)
+ val UnknownLogAppendInfo = LogAppendInfo(-1, -1, Record.NO_TIMESTAMP, -1L, Record.NO_TIMESTAMP,
+ NoCompressionCodec, NoCompressionCodec, -1, -1, offsetsMonotonic = false)
}
/**
@@ -243,7 +244,7 @@ class Log(@volatile var dir: File,
val index = new OffsetIndex(indexFile, baseOffset = startOffset, maxIndexSize = config.maxIndexSize)
val timeIndexFile = new File(CoreUtils.replaceSuffix(logFile.getPath, LogFileSuffix, TimeIndexFileSuffix) + SwapFileSuffix)
val timeIndex = new TimeIndex(timeIndexFile, baseOffset = startOffset, maxIndexSize = config.maxIndexSize)
- val swapSegment = new LogSegment(new FileMessageSet(file = swapFile),
+ val swapSegment = new LogSegment(FileRecords.open(swapFile),
index = index,
timeIndex = timeIndex,
baseOffset = startOffset,
@@ -338,20 +339,20 @@ class Log(@volatile var dir: File,
* This method will generally be responsible for assigning offsets to the messages,
* however if the assignOffsets=false flag is passed we will only check that the existing offsets are valid.
*
- * @param messages The message set to append
+ * @param records The log records to append
* @param assignOffsets Should the log assign offsets to this message set or blindly apply what it is given
* @throws KafkaStorageException If the append fails due to an I/O error.
* @return Information about the appended messages including the first and last offset.
*/
- def append(messages: ByteBufferMessageSet, assignOffsets: Boolean = true): LogAppendInfo = {
- val appendInfo = analyzeAndValidateMessageSet(messages)
+ def append(records: MemoryRecords, assignOffsets: Boolean = true): LogAppendInfo = {
+ val appendInfo = analyzeAndValidateRecords(records)
// if we have any valid messages, append them to the log
if (appendInfo.shallowCount == 0)
return appendInfo
// trim any invalid bytes or partial messages before appending it to the on-disk log
- var validMessages = trimInvalidBytes(messages, appendInfo)
+ var validRecords = trimInvalidBytes(records, appendInfo)
try {
// they are valid, insert them in the log
@@ -363,20 +364,21 @@ class Log(@volatile var dir: File,
appendInfo.firstOffset = offset.value
val now = time.milliseconds
val validateAndOffsetAssignResult = try {
- validMessages.validateMessagesAndAssignOffsets(offset,
- now,
- appendInfo.sourceCodec,
- appendInfo.targetCodec,
- config.compact,
- config.messageFormatVersion.messageFormatVersion,
- config.messageTimestampType,
- config.messageTimestampDifferenceMaxMs)
+ LogValidator.validateMessagesAndAssignOffsets(validRecords,
+ offset,
+ now,
+ appendInfo.sourceCodec,
+ appendInfo.targetCodec,
+ config.compact,
+ config.messageFormatVersion.messageFormatVersion,
+ config.messageTimestampType,
+ config.messageTimestampDifferenceMaxMs)
} catch {
case e: IOException => throw new KafkaException("Error in validating messages while appending to log '%s'".format(name), e)
}
- validMessages = validateAndOffsetAssignResult.validatedMessages
+ validRecords = validateAndOffsetAssignResult.validatedRecords
appendInfo.maxTimestamp = validateAndOffsetAssignResult.maxTimestamp
- appendInfo.offsetOfMaxTimestamp = validateAndOffsetAssignResult.offsetOfMaxTimestamp
+ appendInfo.offsetOfMaxTimestamp = validateAndOffsetAssignResult.shallowOffsetOfMaxTimestamp
appendInfo.lastOffset = offset.value - 1
if (config.messageTimestampType == TimestampType.LOG_APPEND_TIME)
appendInfo.logAppendTime = now
@@ -384,14 +386,14 @@ class Log(@volatile var dir: File,
// re-validate message sizes if there's a possibility that they have changed (due to re-compression or message
// format conversion)
if (validateAndOffsetAssignResult.messageSizeMaybeChanged) {
- for (messageAndOffset <- validMessages.shallowIterator) {
- if (MessageSet.entrySize(messageAndOffset.message) > config.maxMessageSize) {
+ for (logEntry <- validRecords.shallowIterator.asScala) {
+ if (logEntry.sizeInBytes > config.maxMessageSize) {
// we record the original message set size instead of the trimmed size
// to be consistent with pre-compression bytesRejectedRate recording
- BrokerTopicStats.getBrokerTopicStats(topicAndPartition.topic).bytesRejectedRate.mark(messages.sizeInBytes)
- BrokerTopicStats.getBrokerAllTopicsStats.bytesRejectedRate.mark(messages.sizeInBytes)
+ BrokerTopicStats.getBrokerTopicStats(topicAndPartition.topic).bytesRejectedRate.mark(records.sizeInBytes)
+ BrokerTopicStats.getBrokerAllTopicsStats.bytesRejectedRate.mark(records.sizeInBytes)
throw new RecordTooLargeException("Message size is %d bytes which exceeds the maximum configured message size of %d."
- .format(MessageSet.entrySize(messageAndOffset.message), config.maxMessageSize))
+ .format(logEntry.sizeInBytes, config.maxMessageSize))
}
}
}
@@ -399,28 +401,27 @@ class Log(@volatile var dir: File,
} else {
// we are taking the offsets we are given
if (!appendInfo.offsetsMonotonic || appendInfo.firstOffset < nextOffsetMetadata.messageOffset)
- throw new IllegalArgumentException("Out of order offsets found in " + messages)
+ throw new IllegalArgumentException("Out of order offsets found in " + records.deepIterator.asScala.map(_.offset))
}
// check messages set size may be exceed config.segmentSize
- if (validMessages.sizeInBytes > config.segmentSize) {
+ if (validRecords.sizeInBytes > config.segmentSize) {
throw new RecordBatchTooLargeException("Message set size is %d bytes which exceeds the maximum configured segment size of %d."
- .format(validMessages.sizeInBytes, config.segmentSize))
+ .format(validRecords.sizeInBytes, config.segmentSize))
}
// maybe roll the log if this segment is full
- val segment = maybeRoll(messagesSize = validMessages.sizeInBytes,
- maxTimestampInMessages = appendInfo.maxTimestamp)
+ val segment = maybeRoll(messagesSize = validRecords.sizeInBytes, maxTimestampInMessages = appendInfo.maxTimestamp)
// now append to the log
segment.append(firstOffset = appendInfo.firstOffset, largestTimestamp = appendInfo.maxTimestamp,
- offsetOfLargestTimestamp = appendInfo.offsetOfMaxTimestamp, messages = validMessages)
+ shallowOffsetOfMaxTimestamp = appendInfo.offsetOfMaxTimestamp, records = validRecords)
// increment the log end offset
updateLogEndOffset(appendInfo.lastOffset + 1)
trace("Appended message set to log %s with first offset: %d, next offset: %d, and messages: %s"
- .format(this.name, appendInfo.firstOffset, nextOffsetMetadata.messageOffset, validMessages))
+ .format(this.name, appendInfo.firstOffset, nextOffsetMetadata.messageOffset, validRecords))
if (unflushedMessages >= config.flushInterval)
flush()
@@ -449,73 +450,74 @@ class Log(@volatile var dir: File,
* <li> Whether any compression codec is used (if many are used, then the last one is given)
* </ol>
*/
- private def analyzeAndValidateMessageSet(messages: ByteBufferMessageSet): LogAppendInfo = {
+ private def analyzeAndValidateRecords(records: MemoryRecords): LogAppendInfo = {
var shallowMessageCount = 0
var validBytesCount = 0
var firstOffset, lastOffset = -1L
var sourceCodec: CompressionCodec = NoCompressionCodec
var monotonic = true
- var maxTimestamp = Message.NoTimestamp
+ var maxTimestamp = Record.NO_TIMESTAMP
var offsetOfMaxTimestamp = -1L
- for(messageAndOffset <- messages.shallowIterator) {
+ for (entry <- records.shallowIterator.asScala) {
// update the first offset if on the first message
if(firstOffset < 0)
- firstOffset = messageAndOffset.offset
+ firstOffset = entry.offset
// check that offsets are monotonically increasing
- if(lastOffset >= messageAndOffset.offset)
+ if(lastOffset >= entry.offset)
monotonic = false
// update the last offset seen
- lastOffset = messageAndOffset.offset
+ lastOffset = entry.offset
- val m = messageAndOffset.message
+ val record = entry.record
// Check if the message sizes are valid.
- val messageSize = MessageSet.entrySize(m)
+ val messageSize = entry.sizeInBytes
if(messageSize > config.maxMessageSize) {
- BrokerTopicStats.getBrokerTopicStats(topicAndPartition.topic).bytesRejectedRate.mark(messages.sizeInBytes)
- BrokerTopicStats.getBrokerAllTopicsStats.bytesRejectedRate.mark(messages.sizeInBytes)
+ BrokerTopicStats.getBrokerTopicStats(topicAndPartition.topic).bytesRejectedRate.mark(records.sizeInBytes)
+ BrokerTopicStats.getBrokerAllTopicsStats.bytesRejectedRate.mark(records.sizeInBytes)
throw new RecordTooLargeException("Message size is %d bytes which exceeds the maximum configured message size of %d."
.format(messageSize, config.maxMessageSize))
}
// check the validity of the message by checking CRC
- m.ensureValid()
- if (m.timestamp > maxTimestamp) {
- maxTimestamp = m.timestamp
+ record.ensureValid()
+ if (record.timestamp > maxTimestamp) {
+ maxTimestamp = record.timestamp
offsetOfMaxTimestamp = lastOffset
}
shallowMessageCount += 1
validBytesCount += messageSize
- val messageCodec = m.compressionCodec
- if(messageCodec != NoCompressionCodec)
+ val messageCodec = CompressionCodec.getCompressionCodec(record.compressionType.id)
+ if (messageCodec != NoCompressionCodec)
sourceCodec = messageCodec
}
// Apply broker-side compression if any
val targetCodec = BrokerCompressionCodec.getTargetCompressionCodec(config.compressionType, sourceCodec)
- LogAppendInfo(firstOffset, lastOffset, maxTimestamp, offsetOfMaxTimestamp, Message.NoTimestamp, sourceCodec, targetCodec, shallowMessageCount, validBytesCount, monotonic)
+ LogAppendInfo(firstOffset, lastOffset, maxTimestamp, offsetOfMaxTimestamp, Record.NO_TIMESTAMP, sourceCodec,
+ targetCodec, shallowMessageCount, validBytesCount, monotonic)
}
/**
* Trim any invalid bytes from the end of this message set (if there are any)
*
- * @param messages The message set to trim
+ * @param records The records to trim
* @param info The general information of the message set
* @return A trimmed message set. This may be the same as what was passed in or it may not.
*/
- private def trimInvalidBytes(messages: ByteBufferMessageSet, info: LogAppendInfo): ByteBufferMessageSet = {
- val messageSetValidBytes = info.validBytes
- if(messageSetValidBytes < 0)
- throw new CorruptRecordException("Illegal length of message set " + messageSetValidBytes + " Message set cannot be appended to log. Possible causes are corrupted produce requests")
- if(messageSetValidBytes == messages.sizeInBytes) {
- messages
+ private def trimInvalidBytes(records: MemoryRecords, info: LogAppendInfo): MemoryRecords = {
+ val validBytes = info.validBytes
+ if (validBytes < 0)
+ throw new CorruptRecordException("Illegal length of message set " + validBytes + " Message set cannot be appended to log. Possible causes are corrupted produce requests")
+ if (validBytes == records.sizeInBytes) {
+ records
} else {
// trim invalid bytes
- val validByteBuffer = messages.buffer.duplicate()
- validByteBuffer.limit(messageSetValidBytes)
- new ByteBufferMessageSet(validByteBuffer)
+ val validByteBuffer = records.buffer.duplicate()
+ validByteBuffer.limit(validBytes)
+ MemoryRecords.readableRecords(validByteBuffer)
}
}
@@ -538,7 +540,7 @@ class Log(@volatile var dir: File,
val currentNextOffsetMetadata = nextOffsetMetadata
val next = currentNextOffsetMetadata.messageOffset
if(startOffset == next)
- return FetchDataInfo(currentNextOffsetMetadata, MessageSet.Empty)
+ return FetchDataInfo(currentNextOffsetMetadata, MemoryRecords.EMPTY)
var entry = segments.floorEntry(startOffset)
@@ -578,7 +580,7 @@ class Log(@volatile var dir: File,
// okay we are beyond the end of the last segment with no data fetched although the start offset is in range,
// this can happen when all messages with offset larger than start offsets have been deleted.
// In this case, we will return the empty set with log end offset metadata
- FetchDataInfo(nextOffsetMetadata, MessageSet.Empty)
+ FetchDataInfo(nextOffsetMetadata, MemoryRecords.EMPTY)
}
/**
@@ -610,9 +612,9 @@ class Log(@volatile var dir: File,
val segmentsCopy = logSegments.toBuffer
// For the earliest and latest, we do not need to return the timestamp.
if (targetTimestamp == ListOffsetRequest.EARLIEST_TIMESTAMP)
- return Some(TimestampOffset(Message.NoTimestamp, segmentsCopy.head.baseOffset))
+ return Some(TimestampOffset(Record.NO_TIMESTAMP, segmentsCopy.head.baseOffset))
else if (targetTimestamp == ListOffsetRequest.LATEST_TIMESTAMP)
- return Some(TimestampOffset(Message.NoTimestamp, logEndOffset))
+ return Some(TimestampOffset(Record.NO_TIMESTAMP, logEndOffset))
val targetSeg = {
// Get all the segments whose largest timestamp is smaller than target timestamp
@@ -656,7 +658,7 @@ class Log(@volatile var dir: File,
if (segments.size == numToDelete)
roll()
// remove the segments for lookups
- deletable.foreach(deleteSegment(_))
+ deletable.foreach(deleteSegment)
}
numToDelete
}
@@ -865,7 +867,7 @@ class Log(@volatile var dir: File,
truncateFullyAndStartAt(targetOffset)
} else {
val deletable = logSegments.filter(segment => segment.baseOffset > targetOffset)
- deletable.foreach(deleteSegment(_))
+ deletable.foreach(deleteSegment)
activeSegment.truncateTo(targetOffset)
updateLogEndOffset(targetOffset)
this.recoveryPoint = math.min(targetOffset, this.recoveryPoint)
@@ -882,7 +884,7 @@ class Log(@volatile var dir: File,
debug("Truncate and start log '" + name + "' to " + newOffset)
lock synchronized {
val segmentsToDelete = logSegments.toList
- segmentsToDelete.foreach(deleteSegment(_))
+ segmentsToDelete.foreach(deleteSegment)
addSegment(new LogSegment(dir,
newOffset,
indexIntervalBytes = config.indexInterval,
http://git-wip-us.apache.org/repos/asf/kafka/blob/67f1e5b9/core/src/main/scala/kafka/log/LogCleaner.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/LogCleaner.scala b/core/src/main/scala/kafka/log/LogCleaner.scala
index 4a76b0c..c5a73d5 100644
--- a/core/src/main/scala/kafka/log/LogCleaner.scala
+++ b/core/src/main/scala/kafka/log/LogCleaner.scala
@@ -17,20 +17,21 @@
package kafka.log
-import java.io.{DataOutputStream, File}
+import java.io.File
import java.nio._
import java.util.Date
import java.util.concurrent.{CountDownLatch, TimeUnit}
import com.yammer.metrics.core.Gauge
import kafka.common._
-import kafka.message._
import kafka.metrics.KafkaMetricsGroup
import kafka.utils._
+import org.apache.kafka.common.record.{FileRecords, LogEntry, MemoryRecords}
import org.apache.kafka.common.utils.Time
+import MemoryRecords.LogEntryFilter
-import scala.Iterable
import scala.collection._
+import JavaConverters._
/**
* The cleaner is responsible for removing obsolete records from logs which have the dedupe retention strategy.
@@ -390,10 +391,10 @@ private[log] class Cleaner(val id: Int,
val timeIndexFile = new File(segments.head.timeIndex.file.getPath + Log.CleanedFileSuffix)
indexFile.delete()
timeIndexFile.delete()
- val messages = new FileMessageSet(logFile, fileAlreadyExists = false, initFileSize = log.initFileSize(), preallocate = log.config.preallocate)
+ val records = FileRecords.open(logFile, false, log.initFileSize(), log.config.preallocate)
val index = new OffsetIndex(indexFile, segments.head.baseOffset, segments.head.index.maxIndexSize)
val timeIndex = new TimeIndex(timeIndexFile, segments.head.baseOffset, segments.head.timeIndex.maxIndexSize)
- val cleaned = new LogSegment(messages, index, timeIndex, segments.head.baseOffset, segments.head.indexIntervalBytes, log.config.randomSegmentJitter, time)
+ val cleaned = new LogSegment(records, index, timeIndex, segments.head.baseOffset, segments.head.indexIntervalBytes, log.config.randomSegmentJitter, time)
try {
// clean segments into the new destination segment
@@ -449,8 +450,12 @@ private[log] class Cleaner(val id: Int,
retainDeletes: Boolean,
maxLogMessageSize: Int,
stats: CleanerStats) {
- def shouldRetain(messageAndOffset: MessageAndOffset): Boolean =
- shouldRetainMessage(source, map, retainDeletes, messageAndOffset, stats)
+ def shouldRetainEntry(logEntry: LogEntry): Boolean =
+ shouldRetainMessage(source, map, retainDeletes, logEntry, stats)
+
+ class LogCleanerFilter extends LogEntryFilter {
+ def shouldRetain(logEntry: LogEntry): Boolean = shouldRetainEntry(logEntry)
+ }
var position = 0
while (position < source.log.sizeInBytes) {
@@ -460,10 +465,9 @@ private[log] class Cleaner(val id: Int,
writeBuffer.clear()
source.log.readInto(readBuffer, position)
- val messages = new ByteBufferMessageSet(readBuffer)
- throttler.maybeThrottle(messages.sizeInBytes)
- val result = messages.filterInto(writeBuffer, shouldRetain)
-
+ val records = MemoryRecords.readableRecords(readBuffer)
+ throttler.maybeThrottle(records.sizeInBytes)
+ val result = records.filterTo(new LogCleanerFilter, writeBuffer)
stats.readMessages(result.messagesRead, result.bytesRead)
stats.recopyMessages(result.messagesRetained, result.bytesRetained)
@@ -472,9 +476,10 @@ private[log] class Cleaner(val id: Int,
// if any messages are to be retained, write them out
if (writeBuffer.position > 0) {
writeBuffer.flip()
- val retained = new ByteBufferMessageSet(writeBuffer)
- dest.append(firstOffset = retained.head.offset, largestTimestamp = result.maxTimestamp,
- offsetOfLargestTimestamp = result.offsetOfMaxTimestamp, messages = retained)
+
+ val retained = MemoryRecords.readableRecords(writeBuffer)
+ dest.append(firstOffset = retained.deepIterator().next().offset, largestTimestamp = result.maxTimestamp,
+ shallowOffsetOfMaxTimestamp = result.shallowOffsetOfMaxTimestamp, records = retained)
throttler.maybeThrottle(writeBuffer.limit)
}
@@ -488,21 +493,22 @@ private[log] class Cleaner(val id: Int,
private def shouldRetainMessage(source: kafka.log.LogSegment,
map: kafka.log.OffsetMap,
retainDeletes: Boolean,
- entry: kafka.message.MessageAndOffset,
+ entry: LogEntry,
stats: CleanerStats): Boolean = {
val pastLatestOffset = entry.offset > map.latestOffset
if (pastLatestOffset)
return true
- val key = entry.message.key
- if (key != null) {
+
+ if (entry.record.hasKey) {
+ val key = entry.record.key
val foundOffset = map.get(key)
/* two cases in which we can get rid of a message:
* 1) if there exists a message with the same key but higher offset
* 2) if the message is a delete "tombstone" marker and enough time has passed
*/
val redundant = foundOffset >= 0 && entry.offset < foundOffset
- val obsoleteDelete = !retainDeletes && entry.message.isNull
+ val obsoleteDelete = !retainDeletes && entry.record.hasNullValue
!redundant && !obsoleteDelete
} else {
stats.invalidMessage()
@@ -620,12 +626,12 @@ private[log] class Cleaner(val id: Int,
checkDone(topicAndPartition)
readBuffer.clear()
segment.log.readInto(readBuffer, position)
- val messages = new ByteBufferMessageSet(readBuffer)
- throttler.maybeThrottle(messages.sizeInBytes)
+ val records = MemoryRecords.readableRecords(readBuffer)
+ throttler.maybeThrottle(records.sizeInBytes)
val startPosition = position
- for (entry <- messages) {
- val message = entry.message
+ for (entry <- records.deepIterator.asScala) {
+ val message = entry.record
if (message.hasKey && entry.offset >= start) {
if (map.size < maxDesiredMapSize)
map.put(message.key, entry.offset)
@@ -634,8 +640,9 @@ private[log] class Cleaner(val id: Int,
}
stats.indexMessagesRead(1)
}
- position += messages.validBytes
- stats.indexBytesRead(messages.validBytes)
+ val bytesRead = records.validBytes
+ position += bytesRead
+ stats.indexBytesRead(bytesRead)
// if we didn't read even one complete message, our read buffer may be too small
if(position == startPosition)
http://git-wip-us.apache.org/repos/asf/kafka/blob/67f1e5b9/core/src/main/scala/kafka/log/LogManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/LogManager.scala b/core/src/main/scala/kafka/log/LogManager.scala
index ed79946..953fca4 100755
--- a/core/src/main/scala/kafka/log/LogManager.scala
+++ b/core/src/main/scala/kafka/log/LogManager.scala
@@ -440,7 +440,7 @@ class LogManager(val logDirs: Array[File],
removedLog.dir = renamedDir
// change the file pointers for log and index file
for (logSegment <- removedLog.logSegments) {
- logSegment.log.file = new File(renamedDir, logSegment.log.file.getName)
+ logSegment.log.setFile(new File(renamedDir, logSegment.log.file.getName))
logSegment.index.file = new File(renamedDir, logSegment.index.file.getName)
}