You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2017/03/10 01:40:06 UTC
kafka git commit: KAFKA-4861;
GroupMetadataManager record is rejected if broker configured with
LogAppendTime
Repository: kafka
Updated Branches:
refs/heads/trunk dbcbd7920 -> 7565dcd8b
KAFKA-4861; GroupMetadataManager record is rejected if broker configured with LogAppendTime
The record should be created with CreateTime (like in the producer). The conversion to
LogAppendTime happens automatically (if necessary).
Author: Ismael Juma <is...@juma.me.uk>
Reviewers: Jason Gustafson <ja...@confluent.io>
Closes #2657 from ijuma/kafka-4861-log-append-time-breaks-group-data-manager
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/7565dcd8
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/7565dcd8
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/7565dcd8
Branch: refs/heads/trunk
Commit: 7565dcd8b0547f91a5d9d19771d9cd6693079d01
Parents: dbcbd79
Author: Ismael Juma <is...@juma.me.uk>
Authored: Thu Mar 9 16:45:41 2017 -0800
Committer: Jason Gustafson <ja...@confluent.io>
Committed: Thu Mar 9 16:45:41 2017 -0800
----------------------------------------------------------------------
.../coordinator/GroupMetadataManager.scala | 33 ++++----
.../src/main/scala/kafka/server/KafkaApis.scala | 2 +-
.../scala/kafka/server/ReplicaManager.scala | 6 +-
.../kafka/api/BaseConsumerTest.scala | 2 +-
.../kafka/api/ConsumerBounceTest.scala | 3 +-
.../kafka/api/FixedPortTestUtils.scala | 9 +-
.../kafka/api/IntegrationTestHarness.scala | 2 +-
.../kafka/api/LogAppendTimeTest.scala | 86 ++++++++++++++++++++
.../GroupCoordinatorResponseTest.scala | 12 +--
.../coordinator/GroupMetadataManagerTest.scala | 16 ++--
.../integration/KafkaServerTestHarness.scala | 4 +-
11 files changed, 128 insertions(+), 47 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/7565dcd8/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala b/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala
index a6ed6a9..d48328d 100644
--- a/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala
+++ b/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala
@@ -133,8 +133,8 @@ class GroupMetadataManager(val brokerId: Int,
def prepareStoreGroup(group: GroupMetadata,
groupAssignment: Map[String, Array[Byte]],
responseCallback: Errors => Unit): Option[DelayedStore] = {
- getMagicAndTimestamp(partitionFor(group.groupId)) match {
- case Some((magicValue, timestampType, timestamp)) =>
+ getMagic(partitionFor(group.groupId)) match {
+ case Some(magicValue) =>
val groupMetadataValueVersion = {
if (interBrokerProtocolVersion < KAFKA_0_10_1_IV0)
0.toShort
@@ -142,6 +142,9 @@ class GroupMetadataManager(val brokerId: Int,
GroupMetadataManager.CURRENT_GROUP_VALUE_SCHEMA_VERSION
}
+ // We always use CREATE_TIME, like the producer. The conversion to LOG_APPEND_TIME (if necessary) happens automatically.
+ val timestampType = TimestampType.CREATE_TIME
+ val timestamp = time.milliseconds()
val record = Record.create(magicValue, timestampType, timestamp,
GroupMetadataManager.groupMetadataKey(group.groupId),
GroupMetadataManager.groupMetadataValue(group, groupAssignment, version = groupMetadataValueVersion))
@@ -231,8 +234,11 @@ class GroupMetadataManager(val brokerId: Int,
}
// construct the message set to append
- getMagicAndTimestamp(partitionFor(group.groupId)) match {
- case Some((magicValue, timestampType, timestamp)) =>
+ getMagic(partitionFor(group.groupId)) match {
+ case Some(magicValue) =>
+ // We always use CREATE_TIME, like the producer. The conversion to LOG_APPEND_TIME (if necessary) happens automatically.
+ val timestampType = TimestampType.CREATE_TIME
+ val timestamp = time.milliseconds()
val records = filteredOffsetMetadata.map { case (topicPartition, offsetAndMetadata) =>
Record.create(magicValue, timestampType, timestamp,
GroupMetadataManager.offsetCommitKey(group.groupId, topicPartition),
@@ -575,8 +581,12 @@ class GroupMetadataManager(val brokerId: Int,
val offsetsPartition = partitionFor(groupId)
val appendPartition = new TopicPartition(Topic.GroupMetadataTopicName, offsetsPartition)
- getMagicAndTimestamp(offsetsPartition) match {
- case Some((magicValue, timestampType, timestamp)) =>
+ getMagic(offsetsPartition) match {
+ case Some(magicValue) =>
+ // We always use CREATE_TIME, like the producer. The conversion to LOG_APPEND_TIME (if necessary) happens automatically.
+ val timestampType = TimestampType.CREATE_TIME
+ val timestamp = time.milliseconds()
+
val partitionOpt = replicaManager.getPartition(appendPartition)
partitionOpt.foreach { partition =>
val tombstones = removedOffsets.map { case (topicPartition, offsetAndMetadata) =>
@@ -652,15 +662,10 @@ class GroupMetadataManager(val brokerId: Int,
* Check if the replica is local and return the message format version and timestamp
*
* @param partition Partition of GroupMetadataTopic
- * @return Option[(MessageFormatVersion, TimeStamp)] if replica is local, None otherwise
+ * @return Some(MessageFormatVersion) if replica is local, None otherwise
*/
- private def getMagicAndTimestamp(partition: Int): Option[(Byte, TimestampType, Long)] = {
- val groupMetadataTopicPartition = new TopicPartition(Topic.GroupMetadataTopicName, partition)
- replicaManager.getMagicAndTimestampType(groupMetadataTopicPartition).map { case (messageFormatVersion, timestampType) =>
- val timestamp = if (messageFormatVersion == Record.MAGIC_VALUE_V0) Record.NO_TIMESTAMP else time.milliseconds()
- (messageFormatVersion, timestampType, timestamp)
- }
- }
+ private def getMagic(partition: Int): Option[Byte] =
+ replicaManager.getMagic(new TopicPartition(Topic.GroupMetadataTopicName, partition))
/**
* Add the partition into the owned list
http://git-wip-us.apache.org/repos/asf/kafka/blob/7565dcd8/core/src/main/scala/kafka/server/KafkaApis.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index fa5afe5..24a224a 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -470,7 +470,7 @@ class KafkaApis(val requestChannel: RequestChannel,
// Please note that if the message format is changed from a higher version back to lower version this
// test might break because some messages in new message format can be delivered to consumers before 0.10.0.0
// without format down conversion.
- val convertedData = if (versionId <= 1 && replicaManager.getMagicAndTimestampType(tp).exists(_._1 > Record.MAGIC_VALUE_V0) &&
+ val convertedData = if (versionId <= 1 && replicaManager.getMagic(tp).exists(_ > Record.MAGIC_VALUE_V0) &&
!data.records.hasMatchingShallowMagic(Record.MAGIC_VALUE_V0)) {
trace(s"Down converting message to V0 for fetch request from $clientId")
val downConvertedRecords = data.records.toMessageFormat(Record.MAGIC_VALUE_V0, TimestampType.NO_TIMESTAMP_TYPE)
http://git-wip-us.apache.org/repos/asf/kafka/blob/7565dcd8/core/src/main/scala/kafka/server/ReplicaManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala
index 1cec4a2..4ab8c2a 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -618,10 +618,8 @@ class ReplicaManager(val config: KafkaConfig,
quota.isThrottled(topicPartition) && quota.isQuotaExceeded && !isReplicaInSync
}
- def getMagicAndTimestampType(topicPartition: TopicPartition): Option[(Byte, TimestampType)] =
- getReplica(topicPartition).flatMap { replica =>
- replica.log.map(log => (log.config.messageFormatVersion.messageFormatVersion, log.config.messageTimestampType))
- }
+ def getMagic(topicPartition: TopicPartition): Option[Byte] =
+ getReplica(topicPartition).flatMap(_.log.map(_.config.messageFormatVersion.messageFormatVersion))
def maybeUpdateMetadataCache(correlationId: Int, updateMetadataRequest: UpdateMetadataRequest, metadataCache: MetadataCache) : Seq[TopicPartition] = {
replicaStateChangeLock synchronized {
http://git-wip-us.apache.org/repos/asf/kafka/blob/7565dcd8/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala b/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala
index 802bab8..27b89d5 100644
--- a/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala
+++ b/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala
@@ -33,7 +33,7 @@ import org.apache.kafka.common.errors.WakeupException
/**
* Integration tests for the new consumer that cover basic usage as well as server failures
*/
-abstract class BaseConsumerTest extends IntegrationTestHarness with Logging {
+abstract class BaseConsumerTest extends IntegrationTestHarness {
val epsilon = 0.1
val producerCount = 1
http://git-wip-us.apache.org/repos/asf/kafka/blob/7565dcd8/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala b/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala
index f0e0c9e..4ec77a1 100644
--- a/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala
+++ b/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala
@@ -46,7 +46,6 @@ class ConsumerBounceTest extends IntegrationTestHarness with Logging {
val executor = Executors.newScheduledThreadPool(2)
// configure the servers and clients
- this.serverConfig.setProperty(KafkaConfig.ControlledShutdownEnableProp, "false") // speed up shutdown
this.serverConfig.setProperty(KafkaConfig.OffsetsTopicReplicationFactorProp, "3") // don't want to lose offset
this.serverConfig.setProperty(KafkaConfig.OffsetsTopicPartitionsProp, "1")
this.serverConfig.setProperty(KafkaConfig.GroupMinSessionTimeoutMsProp, "10") // set small enough session timeout
@@ -59,7 +58,7 @@ class ConsumerBounceTest extends IntegrationTestHarness with Logging {
this.consumerConfig.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
override def generateConfigs() = {
- FixedPortTestUtils.createBrokerConfigs(serverCount, zkConnect,enableControlledShutdown = false)
+ FixedPortTestUtils.createBrokerConfigs(serverCount, zkConnect, enableControlledShutdown = false)
.map(KafkaConfig.fromProps(_, serverConfig))
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/7565dcd8/core/src/test/scala/integration/kafka/api/FixedPortTestUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/FixedPortTestUtils.scala b/core/src/test/scala/integration/kafka/api/FixedPortTestUtils.scala
index d15a01d..bf5f8c1 100644
--- a/core/src/test/scala/integration/kafka/api/FixedPortTestUtils.scala
+++ b/core/src/test/scala/integration/kafka/api/FixedPortTestUtils.scala
@@ -34,9 +34,7 @@ object FixedPortTestUtils {
sockets.foreach(_.close())
ports
} catch {
- case e: IOException => {
- throw new RuntimeException(e)
- }
+ case e: IOException => throw new RuntimeException(e)
}
}
@@ -45,8 +43,9 @@ object FixedPortTestUtils {
enableControlledShutdown: Boolean = true,
enableDeleteTopic: Boolean = false): Seq[Properties] = {
val ports = FixedPortTestUtils.choosePorts(numConfigs)
- (0 until numConfigs)
- .map(node => TestUtils.createBrokerConfig(node, zkConnect, enableControlledShutdown, enableDeleteTopic, ports(node)))
+ (0 until numConfigs).map { node =>
+ TestUtils.createBrokerConfig(node, zkConnect, enableControlledShutdown, enableDeleteTopic, ports(node))
+ }
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/7565dcd8/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala b/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala
index 46465e8..5c8ceea 100644
--- a/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala
+++ b/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala
@@ -45,7 +45,7 @@ trait IntegrationTestHarness extends KafkaServerTestHarness {
val consumers = Buffer[KafkaConsumer[Array[Byte], Array[Byte]]]()
val producers = Buffer[KafkaProducer[Array[Byte], Array[Byte]]]()
- override def generateConfigs() = {
+ override def generateConfigs = {
val cfgs = TestUtils.createBrokerConfigs(serverCount, zkConnect, interBrokerSecurityProtocol = Some(securityProtocol),
trustStoreFile = trustStoreFile, saslProperties = serverSaslProperties)
cfgs.foreach { config =>
http://git-wip-us.apache.org/repos/asf/kafka/blob/7565dcd8/core/src/test/scala/integration/kafka/api/LogAppendTimeTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/LogAppendTimeTest.scala b/core/src/test/scala/integration/kafka/api/LogAppendTimeTest.scala
new file mode 100644
index 0000000..4a97bea
--- /dev/null
+++ b/core/src/test/scala/integration/kafka/api/LogAppendTimeTest.scala
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.api
+
+import java.util.Collections
+import java.util.concurrent.TimeUnit
+
+import kafka.server.KafkaConfig
+import kafka.utils.TestUtils
+import org.apache.kafka.clients.consumer.ConsumerRecord
+import org.apache.kafka.clients.producer.ProducerRecord
+import org.apache.kafka.common.record.TimestampType
+import org.junit.{Before, Test}
+import org.junit.Assert.{assertEquals, assertNotEquals, assertTrue}
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable.ArrayBuffer
+
+/**
+ * Tests where the broker is configured to use LogAppendTime. For tests where LogAppendTime is configured via topic
+ * level configs, see the *ProducerSendTest classes.
+ */
+class LogAppendTimeTest extends IntegrationTestHarness {
+ val producerCount: Int = 1
+ val consumerCount: Int = 1
+ val serverCount: Int = 2
+
+ // This will be used for the offsets topic as well
+ serverConfig.put(KafkaConfig.LogMessageTimestampTypeProp, TimestampType.LOG_APPEND_TIME.name)
+ serverConfig.put(KafkaConfig.OffsetsTopicReplicationFactorProp, "2")
+
+ private val topic = "topic"
+
+ @Before
+ override def setUp() {
+ super.setUp()
+ TestUtils.createTopic(zkUtils, topic, servers = servers)
+ }
+
+ @Test
+ def testProduceConsume() {
+ val producer = producers.head
+ val now = System.currentTimeMillis()
+ val createTime = now - TimeUnit.DAYS.toMillis(1)
+ val producerRecords = (1 to 10).map(i => new ProducerRecord(topic, null, createTime, s"key$i".getBytes,
+ s"value$i".getBytes))
+ val recordMetadatas = producerRecords.map(producer.send).map(_.get(10, TimeUnit.SECONDS))
+ recordMetadatas.foreach { recordMetadata =>
+ assertTrue(recordMetadata.timestamp >= now)
+ assertTrue(recordMetadata.timestamp < now + TimeUnit.SECONDS.toMillis(60))
+ }
+
+ val consumer = consumers.head
+ consumer.subscribe(Collections.singleton(topic))
+ val consumerRecords = new ArrayBuffer[ConsumerRecord[Array[Byte], Array[Byte]]]
+ TestUtils.waitUntilTrue(() => {
+ consumerRecords ++= consumer.poll(50).asScala
+ consumerRecords.size == producerRecords.size
+ }, s"Consumed ${consumerRecords.size} records until timeout instead of the expected ${producerRecords.size} records")
+
+ consumerRecords.zipWithIndex.foreach { case (consumerRecord, index) =>
+ val producerRecord = producerRecords(index)
+ val recordMetadata = recordMetadatas(index)
+ assertEquals(new String(producerRecord.key), new String(consumerRecord.key))
+ assertEquals(new String(producerRecord.value), new String(consumerRecord.value))
+ assertNotEquals(producerRecord.timestamp, consumerRecord.timestamp)
+ assertEquals(recordMetadata.timestamp, consumerRecord.timestamp)
+ assertEquals(TimestampType.LOG_APPEND_TIME, consumerRecord.timestampType)
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/7565dcd8/core/src/test/scala/unit/kafka/coordinator/GroupCoordinatorResponseTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/coordinator/GroupCoordinatorResponseTest.scala b/core/src/test/scala/unit/kafka/coordinator/GroupCoordinatorResponseTest.scala
index 7b40187..22cb899 100644
--- a/core/src/test/scala/unit/kafka/coordinator/GroupCoordinatorResponseTest.scala
+++ b/core/src/test/scala/unit/kafka/coordinator/GroupCoordinatorResponseTest.scala
@@ -305,8 +305,7 @@ class GroupCoordinatorResponseTest extends JUnitSuite {
EasyMock.reset(replicaManager)
EasyMock.expect(replicaManager.getPartition(new TopicPartition(Topic.GroupMetadataTopicName, groupPartitionId))).andReturn(None)
- EasyMock.expect(replicaManager.getMagicAndTimestampType(EasyMock.anyObject()))
- .andReturn(Some(Record.MAGIC_VALUE_V1, TimestampType.CREATE_TIME)).anyTimes()
+ EasyMock.expect(replicaManager.getMagic(EasyMock.anyObject())).andReturn(Some(Record.MAGIC_VALUE_V1)).anyTimes()
EasyMock.replay(replicaManager)
timer.advanceClock(DefaultSessionTimeout + 100)
@@ -1055,8 +1054,7 @@ class GroupCoordinatorResponseTest extends JUnitSuite {
new PartitionResponse(Errors.NONE, 0L, Record.NO_TIMESTAMP)
)
)})
- EasyMock.expect(replicaManager.getMagicAndTimestampType(EasyMock.anyObject()))
- .andReturn(Some(Record.MAGIC_VALUE_V1, TimestampType.CREATE_TIME)).anyTimes()
+ EasyMock.expect(replicaManager.getMagic(EasyMock.anyObject())).andReturn(Some(Record.MAGIC_VALUE_V1)).anyTimes()
EasyMock.replay(replicaManager)
groupCoordinator.handleSyncGroup(groupId, generation, leaderId, assignment, responseCallback)
@@ -1137,8 +1135,7 @@ class GroupCoordinatorResponseTest extends JUnitSuite {
new PartitionResponse(Errors.NONE, 0L, Record.NO_TIMESTAMP)
)
)})
- EasyMock.expect(replicaManager.getMagicAndTimestampType(EasyMock.anyObject()))
- .andReturn(Some(Record.MAGIC_VALUE_V1, TimestampType.CREATE_TIME)).anyTimes()
+ EasyMock.expect(replicaManager.getMagic(EasyMock.anyObject())).andReturn(Some(Record.MAGIC_VALUE_V1)).anyTimes()
EasyMock.replay(replicaManager)
groupCoordinator.handleCommitOffsets(groupId, consumerId, generationId, offsets, responseCallback)
@@ -1149,8 +1146,7 @@ class GroupCoordinatorResponseTest extends JUnitSuite {
val (responseFuture, responseCallback) = setupHeartbeatCallback
EasyMock.expect(replicaManager.getPartition(new TopicPartition(Topic.GroupMetadataTopicName, groupPartitionId))).andReturn(None)
- EasyMock.expect(replicaManager.getMagicAndTimestampType(EasyMock.anyObject()))
- .andReturn(Some(Record.MAGIC_VALUE_V1, TimestampType.CREATE_TIME)).anyTimes()
+ EasyMock.expect(replicaManager.getMagic(EasyMock.anyObject())).andReturn(Some(Record.MAGIC_VALUE_V1)).anyTimes()
EasyMock.replay(replicaManager)
groupCoordinator.handleLeaveGroup(groupId, consumerId, responseCallback)
http://git-wip-us.apache.org/repos/asf/kafka/blob/7565dcd8/core/src/test/scala/unit/kafka/coordinator/GroupMetadataManagerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/coordinator/GroupMetadataManagerTest.scala b/core/src/test/scala/unit/kafka/coordinator/GroupMetadataManagerTest.scala
index 86189aa..8cfae8d 100644
--- a/core/src/test/scala/unit/kafka/coordinator/GroupMetadataManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/coordinator/GroupMetadataManagerTest.scala
@@ -321,7 +321,7 @@ class GroupMetadataManagerTest {
@Test
def testStoreNonEmptyGroupWhenCoordinatorHasMoved() {
- EasyMock.expect(replicaManager.getMagicAndTimestampType(EasyMock.anyObject())).andReturn(None)
+ EasyMock.expect(replicaManager.getMagic(EasyMock.anyObject())).andReturn(None)
val memberId = "memberId"
val clientId = "clientId"
val clientHost = "localhost"
@@ -390,7 +390,7 @@ class GroupMetadataManagerTest {
@Test
def testCommitOffsetWhenCoordinatorHasMoved() {
- EasyMock.expect(replicaManager.getMagicAndTimestampType(EasyMock.anyObject())).andReturn(None)
+ EasyMock.expect(replicaManager.getMagic(EasyMock.anyObject())).andReturn(None)
val memberId = ""
val generationId = -1
val topicPartition = new TopicPartition("foo", 0)
@@ -538,8 +538,7 @@ class GroupMetadataManagerTest {
EasyMock.reset(partition)
val recordsCapture: Capture[MemoryRecords] = EasyMock.newCapture()
- EasyMock.expect(replicaManager.getMagicAndTimestampType(EasyMock.anyObject()))
- .andStubReturn(Some(Record.MAGIC_VALUE_V1, TimestampType.CREATE_TIME))
+ EasyMock.expect(replicaManager.getMagic(EasyMock.anyObject())).andStubReturn(Some(Record.MAGIC_VALUE_V1))
EasyMock.expect(replicaManager.getPartition(new TopicPartition(Topic.GroupMetadataTopicName, groupPartitionId))).andStubReturn(Some(partition))
EasyMock.expect(partition.appendRecordsToLeader(EasyMock.capture(recordsCapture), EasyMock.anyInt()))
.andReturn(LogAppendInfo.UnknownLogAppendInfo)
@@ -584,8 +583,7 @@ class GroupMetadataManagerTest {
EasyMock.reset(partition)
val recordsCapture: Capture[MemoryRecords] = EasyMock.newCapture()
- EasyMock.expect(replicaManager.getMagicAndTimestampType(EasyMock.anyObject()))
- .andStubReturn(Some(Record.MAGIC_VALUE_V1, TimestampType.LOG_APPEND_TIME))
+ EasyMock.expect(replicaManager.getMagic(EasyMock.anyObject())).andStubReturn(Some(Record.MAGIC_VALUE_V1))
EasyMock.expect(replicaManager.getPartition(new TopicPartition(Topic.GroupMetadataTopicName, groupPartitionId))).andStubReturn(Some(partition))
EasyMock.expect(partition.appendRecordsToLeader(EasyMock.capture(recordsCapture), EasyMock.anyInt()))
.andReturn(LogAppendInfo.UnknownLogAppendInfo)
@@ -602,7 +600,8 @@ class GroupMetadataManagerTest {
assertTrue(metadataTombstone.hasKey)
assertTrue(metadataTombstone.hasNullValue)
assertEquals(Record.MAGIC_VALUE_V1, metadataTombstone.magic)
- assertEquals(TimestampType.LOG_APPEND_TIME, metadataTombstone.timestampType)
+ // Use CREATE_TIME, like the producer. The conversion to LOG_APPEND_TIME (if necessary) happens automatically.
+ assertEquals(TimestampType.CREATE_TIME, metadataTombstone.timestampType)
assertTrue(metadataTombstone.timestamp > 0)
val groupKey = GroupMetadataManager.readMessageKey(metadataTombstone.key).asInstanceOf[GroupMetadataKey]
@@ -762,8 +761,7 @@ class GroupMetadataManagerTest {
new PartitionResponse(error, 0L, Record.NO_TIMESTAMP)
)
)})
- EasyMock.expect(replicaManager.getMagicAndTimestampType(EasyMock.anyObject()))
- .andStubReturn(Some(Record.MAGIC_VALUE_V1, TimestampType.CREATE_TIME))
+ EasyMock.expect(replicaManager.getMagic(EasyMock.anyObject())).andStubReturn(Some(Record.MAGIC_VALUE_V1))
}
private def buildStableGroupRecordWithMember(memberId: String): Record = {
http://git-wip-us.apache.org/repos/asf/kafka/blob/7565dcd8/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala b/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala
index 6fa7ad5..9f40ec6 100755
--- a/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala
+++ b/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala
@@ -47,7 +47,7 @@ trait KafkaServerTestHarness extends ZooKeeperTestHarness {
* Implementations must override this method to return a set of KafkaConfigs. This method will be invoked for every
* test and should not reuse previous configurations unless they select their ports randomly when servers are started.
*/
- def generateConfigs(): Seq[KafkaConfig]
+ def generateConfigs: Seq[KafkaConfig]
/**
* Override this in case ACLs or security credentials must be set before `servers` are started.
@@ -64,7 +64,7 @@ trait KafkaServerTestHarness extends ZooKeeperTestHarness {
def configs: Seq[KafkaConfig] = {
if (instanceConfigs == null)
- instanceConfigs = generateConfigs()
+ instanceConfigs = generateConfigs
instanceConfigs
}