You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2017/03/10 01:40:06 UTC

kafka git commit: KAFKA-4861; GroupMetadataManager record is rejected if broker configured with LogAppendTime

Repository: kafka
Updated Branches:
  refs/heads/trunk dbcbd7920 -> 7565dcd8b


KAFKA-4861; GroupMetadataManager record is rejected if broker configured with LogAppendTime

The record should be created with CreateTime (like in the producer). The conversion to
LogAppendTime happens automatically (if necessary).

Author: Ismael Juma <is...@juma.me.uk>

Reviewers: Jason Gustafson <ja...@confluent.io>

Closes #2657 from ijuma/kafka-4861-log-append-time-breaks-group-data-manager


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/7565dcd8
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/7565dcd8
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/7565dcd8

Branch: refs/heads/trunk
Commit: 7565dcd8b0547f91a5d9d19771d9cd6693079d01
Parents: dbcbd79
Author: Ismael Juma <is...@juma.me.uk>
Authored: Thu Mar 9 16:45:41 2017 -0800
Committer: Jason Gustafson <ja...@confluent.io>
Committed: Thu Mar 9 16:45:41 2017 -0800

----------------------------------------------------------------------
 .../coordinator/GroupMetadataManager.scala      | 33 ++++----
 .../src/main/scala/kafka/server/KafkaApis.scala |  2 +-
 .../scala/kafka/server/ReplicaManager.scala     |  6 +-
 .../kafka/api/BaseConsumerTest.scala            |  2 +-
 .../kafka/api/ConsumerBounceTest.scala          |  3 +-
 .../kafka/api/FixedPortTestUtils.scala          |  9 +-
 .../kafka/api/IntegrationTestHarness.scala      |  2 +-
 .../kafka/api/LogAppendTimeTest.scala           | 86 ++++++++++++++++++++
 .../GroupCoordinatorResponseTest.scala          | 12 +--
 .../coordinator/GroupMetadataManagerTest.scala  | 16 ++--
 .../integration/KafkaServerTestHarness.scala    |  4 +-
 11 files changed, 128 insertions(+), 47 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/7565dcd8/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala b/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala
index a6ed6a9..d48328d 100644
--- a/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala
+++ b/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala
@@ -133,8 +133,8 @@ class GroupMetadataManager(val brokerId: Int,
   def prepareStoreGroup(group: GroupMetadata,
                         groupAssignment: Map[String, Array[Byte]],
                         responseCallback: Errors => Unit): Option[DelayedStore] = {
-    getMagicAndTimestamp(partitionFor(group.groupId)) match {
-      case Some((magicValue, timestampType, timestamp)) =>
+    getMagic(partitionFor(group.groupId)) match {
+      case Some(magicValue) =>
         val groupMetadataValueVersion = {
           if (interBrokerProtocolVersion < KAFKA_0_10_1_IV0)
             0.toShort
@@ -142,6 +142,9 @@ class GroupMetadataManager(val brokerId: Int,
             GroupMetadataManager.CURRENT_GROUP_VALUE_SCHEMA_VERSION
         }
 
+        // We always use CREATE_TIME, like the producer. The conversion to LOG_APPEND_TIME (if necessary) happens automatically.
+        val timestampType = TimestampType.CREATE_TIME
+        val timestamp = time.milliseconds()
         val record = Record.create(magicValue, timestampType, timestamp,
           GroupMetadataManager.groupMetadataKey(group.groupId),
           GroupMetadataManager.groupMetadataValue(group, groupAssignment, version = groupMetadataValueVersion))
@@ -231,8 +234,11 @@ class GroupMetadataManager(val brokerId: Int,
     }
 
     // construct the message set to append
-    getMagicAndTimestamp(partitionFor(group.groupId)) match {
-      case Some((magicValue, timestampType, timestamp)) =>
+    getMagic(partitionFor(group.groupId)) match {
+      case Some(magicValue) =>
+        // We always use CREATE_TIME, like the producer. The conversion to LOG_APPEND_TIME (if necessary) happens automatically.
+        val timestampType = TimestampType.CREATE_TIME
+        val timestamp = time.milliseconds()
         val records = filteredOffsetMetadata.map { case (topicPartition, offsetAndMetadata) =>
           Record.create(magicValue, timestampType, timestamp,
             GroupMetadataManager.offsetCommitKey(group.groupId, topicPartition),
@@ -575,8 +581,12 @@ class GroupMetadataManager(val brokerId: Int,
 
       val offsetsPartition = partitionFor(groupId)
       val appendPartition = new TopicPartition(Topic.GroupMetadataTopicName, offsetsPartition)
-      getMagicAndTimestamp(offsetsPartition) match {
-        case Some((magicValue, timestampType, timestamp)) =>
+      getMagic(offsetsPartition) match {
+        case Some(magicValue) =>
+          // We always use CREATE_TIME, like the producer. The conversion to LOG_APPEND_TIME (if necessary) happens automatically.
+          val timestampType = TimestampType.CREATE_TIME
+          val timestamp = time.milliseconds()
+
           val partitionOpt = replicaManager.getPartition(appendPartition)
           partitionOpt.foreach { partition =>
             val tombstones = removedOffsets.map { case (topicPartition, offsetAndMetadata) =>
@@ -652,15 +662,10 @@ class GroupMetadataManager(val brokerId: Int,
    * Check if the replica is local and return the message format version and timestamp
    *
    * @param   partition  Partition of GroupMetadataTopic
-   * @return  Option[(MessageFormatVersion, TimeStamp)] if replica is local, None otherwise
+   * @return  Some(MessageFormatVersion) if replica is local, None otherwise
    */
-  private def getMagicAndTimestamp(partition: Int): Option[(Byte, TimestampType, Long)] = {
-    val groupMetadataTopicPartition = new TopicPartition(Topic.GroupMetadataTopicName, partition)
-    replicaManager.getMagicAndTimestampType(groupMetadataTopicPartition).map { case (messageFormatVersion, timestampType) =>
-      val timestamp = if (messageFormatVersion == Record.MAGIC_VALUE_V0) Record.NO_TIMESTAMP else time.milliseconds()
-      (messageFormatVersion, timestampType, timestamp)
-    }
-  }
+  private def getMagic(partition: Int): Option[Byte] =
+    replicaManager.getMagic(new TopicPartition(Topic.GroupMetadataTopicName, partition))
 
   /**
    * Add the partition into the owned list

http://git-wip-us.apache.org/repos/asf/kafka/blob/7565dcd8/core/src/main/scala/kafka/server/KafkaApis.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index fa5afe5..24a224a 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -470,7 +470,7 @@ class KafkaApis(val requestChannel: RequestChannel,
           // Please note that if the message format is changed from a higher version back to lower version this
           // test might break because some messages in new message format can be delivered to consumers before 0.10.0.0
           // without format down conversion.
-          val convertedData = if (versionId <= 1 && replicaManager.getMagicAndTimestampType(tp).exists(_._1 > Record.MAGIC_VALUE_V0) &&
+          val convertedData = if (versionId <= 1 && replicaManager.getMagic(tp).exists(_ > Record.MAGIC_VALUE_V0) &&
             !data.records.hasMatchingShallowMagic(Record.MAGIC_VALUE_V0)) {
             trace(s"Down converting message to V0 for fetch request from $clientId")
             val downConvertedRecords = data.records.toMessageFormat(Record.MAGIC_VALUE_V0, TimestampType.NO_TIMESTAMP_TYPE)

http://git-wip-us.apache.org/repos/asf/kafka/blob/7565dcd8/core/src/main/scala/kafka/server/ReplicaManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala
index 1cec4a2..4ab8c2a 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -618,10 +618,8 @@ class ReplicaManager(val config: KafkaConfig,
     quota.isThrottled(topicPartition) && quota.isQuotaExceeded && !isReplicaInSync
   }
 
-  def getMagicAndTimestampType(topicPartition: TopicPartition): Option[(Byte, TimestampType)] =
-    getReplica(topicPartition).flatMap { replica =>
-      replica.log.map(log => (log.config.messageFormatVersion.messageFormatVersion, log.config.messageTimestampType))
-    }
+  def getMagic(topicPartition: TopicPartition): Option[Byte] =
+    getReplica(topicPartition).flatMap(_.log.map(_.config.messageFormatVersion.messageFormatVersion))
 
   def maybeUpdateMetadataCache(correlationId: Int, updateMetadataRequest: UpdateMetadataRequest, metadataCache: MetadataCache) : Seq[TopicPartition] =  {
     replicaStateChangeLock synchronized {

http://git-wip-us.apache.org/repos/asf/kafka/blob/7565dcd8/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala b/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala
index 802bab8..27b89d5 100644
--- a/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala
+++ b/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala
@@ -33,7 +33,7 @@ import org.apache.kafka.common.errors.WakeupException
 /**
  * Integration tests for the new consumer that cover basic usage as well as server failures
  */
-abstract class BaseConsumerTest extends IntegrationTestHarness with Logging {
+abstract class BaseConsumerTest extends IntegrationTestHarness {
 
   val epsilon = 0.1
   val producerCount = 1

http://git-wip-us.apache.org/repos/asf/kafka/blob/7565dcd8/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala b/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala
index f0e0c9e..4ec77a1 100644
--- a/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala
+++ b/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala
@@ -46,7 +46,6 @@ class ConsumerBounceTest extends IntegrationTestHarness with Logging {
   val executor = Executors.newScheduledThreadPool(2)
 
   // configure the servers and clients
-  this.serverConfig.setProperty(KafkaConfig.ControlledShutdownEnableProp, "false") // speed up shutdown
   this.serverConfig.setProperty(KafkaConfig.OffsetsTopicReplicationFactorProp, "3") // don't want to lose offset
   this.serverConfig.setProperty(KafkaConfig.OffsetsTopicPartitionsProp, "1")
   this.serverConfig.setProperty(KafkaConfig.GroupMinSessionTimeoutMsProp, "10") // set small enough session timeout
@@ -59,7 +58,7 @@ class ConsumerBounceTest extends IntegrationTestHarness with Logging {
   this.consumerConfig.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
 
   override def generateConfigs() = {
-    FixedPortTestUtils.createBrokerConfigs(serverCount, zkConnect,enableControlledShutdown = false)
+    FixedPortTestUtils.createBrokerConfigs(serverCount, zkConnect, enableControlledShutdown = false)
       .map(KafkaConfig.fromProps(_, serverConfig))
   }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/7565dcd8/core/src/test/scala/integration/kafka/api/FixedPortTestUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/FixedPortTestUtils.scala b/core/src/test/scala/integration/kafka/api/FixedPortTestUtils.scala
index d15a01d..bf5f8c1 100644
--- a/core/src/test/scala/integration/kafka/api/FixedPortTestUtils.scala
+++ b/core/src/test/scala/integration/kafka/api/FixedPortTestUtils.scala
@@ -34,9 +34,7 @@ object FixedPortTestUtils {
       sockets.foreach(_.close())
       ports
     } catch {
-      case e: IOException => {
-        throw new RuntimeException(e)
-      }
+      case e: IOException => throw new RuntimeException(e)
     }
   }
 
@@ -45,8 +43,9 @@ object FixedPortTestUtils {
     enableControlledShutdown: Boolean = true,
     enableDeleteTopic: Boolean = false): Seq[Properties] = {
     val ports = FixedPortTestUtils.choosePorts(numConfigs)
-    (0 until numConfigs)
-      .map(node => TestUtils.createBrokerConfig(node, zkConnect, enableControlledShutdown, enableDeleteTopic, ports(node)))
+    (0 until numConfigs).map { node =>
+      TestUtils.createBrokerConfig(node, zkConnect, enableControlledShutdown, enableDeleteTopic, ports(node))
+    }
   }
 
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/7565dcd8/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala b/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala
index 46465e8..5c8ceea 100644
--- a/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala
+++ b/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala
@@ -45,7 +45,7 @@ trait IntegrationTestHarness extends KafkaServerTestHarness {
   val consumers = Buffer[KafkaConsumer[Array[Byte], Array[Byte]]]()
   val producers = Buffer[KafkaProducer[Array[Byte], Array[Byte]]]()
 
-  override def generateConfigs() = {
+  override def generateConfigs = {
     val cfgs = TestUtils.createBrokerConfigs(serverCount, zkConnect, interBrokerSecurityProtocol = Some(securityProtocol),
       trustStoreFile = trustStoreFile, saslProperties = serverSaslProperties)
     cfgs.foreach { config =>

http://git-wip-us.apache.org/repos/asf/kafka/blob/7565dcd8/core/src/test/scala/integration/kafka/api/LogAppendTimeTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/LogAppendTimeTest.scala b/core/src/test/scala/integration/kafka/api/LogAppendTimeTest.scala
new file mode 100644
index 0000000..4a97bea
--- /dev/null
+++ b/core/src/test/scala/integration/kafka/api/LogAppendTimeTest.scala
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.api
+
+import java.util.Collections
+import java.util.concurrent.TimeUnit
+
+import kafka.server.KafkaConfig
+import kafka.utils.TestUtils
+import org.apache.kafka.clients.consumer.ConsumerRecord
+import org.apache.kafka.clients.producer.ProducerRecord
+import org.apache.kafka.common.record.TimestampType
+import org.junit.{Before, Test}
+import org.junit.Assert.{assertEquals, assertNotEquals, assertTrue}
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable.ArrayBuffer
+
+/**
+  * Tests where the broker is configured to use LogAppendTime. For tests where LogAppendTime is configured via topic
+  * level configs, see the *ProducerSendTest classes.
+  */
+class LogAppendTimeTest extends IntegrationTestHarness {
+  val producerCount: Int = 1
+  val consumerCount: Int = 1
+  val serverCount: Int = 2
+
+  // This will be used for the offsets topic as well
+  serverConfig.put(KafkaConfig.LogMessageTimestampTypeProp, TimestampType.LOG_APPEND_TIME.name)
+  serverConfig.put(KafkaConfig.OffsetsTopicReplicationFactorProp, "2")
+
+  private val topic = "topic"
+
+  @Before
+  override def setUp() {
+    super.setUp()
+    TestUtils.createTopic(zkUtils, topic, servers = servers)
+  }
+
+  @Test
+  def testProduceConsume() {
+    val producer = producers.head
+    val now = System.currentTimeMillis()
+    val createTime = now - TimeUnit.DAYS.toMillis(1)
+    val producerRecords = (1 to 10).map(i => new ProducerRecord(topic, null, createTime, s"key$i".getBytes,
+      s"value$i".getBytes))
+    val recordMetadatas = producerRecords.map(producer.send).map(_.get(10, TimeUnit.SECONDS))
+    recordMetadatas.foreach { recordMetadata =>
+      assertTrue(recordMetadata.timestamp >= now)
+      assertTrue(recordMetadata.timestamp < now + TimeUnit.SECONDS.toMillis(60))
+    }
+
+    val consumer = consumers.head
+    consumer.subscribe(Collections.singleton(topic))
+    val consumerRecords = new ArrayBuffer[ConsumerRecord[Array[Byte], Array[Byte]]]
+    TestUtils.waitUntilTrue(() => {
+      consumerRecords ++= consumer.poll(50).asScala
+      consumerRecords.size == producerRecords.size
+    }, s"Consumed ${consumerRecords.size} records until timeout instead of the expected ${producerRecords.size} records")
+
+    consumerRecords.zipWithIndex.foreach { case (consumerRecord, index) =>
+      val producerRecord = producerRecords(index)
+      val recordMetadata = recordMetadatas(index)
+      assertEquals(new String(producerRecord.key), new String(consumerRecord.key))
+      assertEquals(new String(producerRecord.value), new String(consumerRecord.value))
+      assertNotEquals(producerRecord.timestamp, consumerRecord.timestamp)
+      assertEquals(recordMetadata.timestamp, consumerRecord.timestamp)
+      assertEquals(TimestampType.LOG_APPEND_TIME, consumerRecord.timestampType)
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/7565dcd8/core/src/test/scala/unit/kafka/coordinator/GroupCoordinatorResponseTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/coordinator/GroupCoordinatorResponseTest.scala b/core/src/test/scala/unit/kafka/coordinator/GroupCoordinatorResponseTest.scala
index 7b40187..22cb899 100644
--- a/core/src/test/scala/unit/kafka/coordinator/GroupCoordinatorResponseTest.scala
+++ b/core/src/test/scala/unit/kafka/coordinator/GroupCoordinatorResponseTest.scala
@@ -305,8 +305,7 @@ class GroupCoordinatorResponseTest extends JUnitSuite {
 
     EasyMock.reset(replicaManager)
     EasyMock.expect(replicaManager.getPartition(new TopicPartition(Topic.GroupMetadataTopicName, groupPartitionId))).andReturn(None)
-    EasyMock.expect(replicaManager.getMagicAndTimestampType(EasyMock.anyObject()))
-      .andReturn(Some(Record.MAGIC_VALUE_V1, TimestampType.CREATE_TIME)).anyTimes()
+    EasyMock.expect(replicaManager.getMagic(EasyMock.anyObject())).andReturn(Some(Record.MAGIC_VALUE_V1)).anyTimes()
     EasyMock.replay(replicaManager)
 
     timer.advanceClock(DefaultSessionTimeout + 100)
@@ -1055,8 +1054,7 @@ class GroupCoordinatorResponseTest extends JUnitSuite {
           new PartitionResponse(Errors.NONE, 0L, Record.NO_TIMESTAMP)
         )
       )})
-    EasyMock.expect(replicaManager.getMagicAndTimestampType(EasyMock.anyObject()))
-      .andReturn(Some(Record.MAGIC_VALUE_V1, TimestampType.CREATE_TIME)).anyTimes()
+    EasyMock.expect(replicaManager.getMagic(EasyMock.anyObject())).andReturn(Some(Record.MAGIC_VALUE_V1)).anyTimes()
     EasyMock.replay(replicaManager)
 
     groupCoordinator.handleSyncGroup(groupId, generation, leaderId, assignment, responseCallback)
@@ -1137,8 +1135,7 @@ class GroupCoordinatorResponseTest extends JUnitSuite {
           new PartitionResponse(Errors.NONE, 0L, Record.NO_TIMESTAMP)
         )
       )})
-    EasyMock.expect(replicaManager.getMagicAndTimestampType(EasyMock.anyObject()))
-      .andReturn(Some(Record.MAGIC_VALUE_V1, TimestampType.CREATE_TIME)).anyTimes()
+    EasyMock.expect(replicaManager.getMagic(EasyMock.anyObject())).andReturn(Some(Record.MAGIC_VALUE_V1)).anyTimes()
     EasyMock.replay(replicaManager)
 
     groupCoordinator.handleCommitOffsets(groupId, consumerId, generationId, offsets, responseCallback)
@@ -1149,8 +1146,7 @@ class GroupCoordinatorResponseTest extends JUnitSuite {
     val (responseFuture, responseCallback) = setupHeartbeatCallback
 
     EasyMock.expect(replicaManager.getPartition(new TopicPartition(Topic.GroupMetadataTopicName, groupPartitionId))).andReturn(None)
-    EasyMock.expect(replicaManager.getMagicAndTimestampType(EasyMock.anyObject()))
-      .andReturn(Some(Record.MAGIC_VALUE_V1, TimestampType.CREATE_TIME)).anyTimes()
+    EasyMock.expect(replicaManager.getMagic(EasyMock.anyObject())).andReturn(Some(Record.MAGIC_VALUE_V1)).anyTimes()
     EasyMock.replay(replicaManager)
 
     groupCoordinator.handleLeaveGroup(groupId, consumerId, responseCallback)

http://git-wip-us.apache.org/repos/asf/kafka/blob/7565dcd8/core/src/test/scala/unit/kafka/coordinator/GroupMetadataManagerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/coordinator/GroupMetadataManagerTest.scala b/core/src/test/scala/unit/kafka/coordinator/GroupMetadataManagerTest.scala
index 86189aa..8cfae8d 100644
--- a/core/src/test/scala/unit/kafka/coordinator/GroupMetadataManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/coordinator/GroupMetadataManagerTest.scala
@@ -321,7 +321,7 @@ class GroupMetadataManagerTest {
 
   @Test
   def testStoreNonEmptyGroupWhenCoordinatorHasMoved() {
-    EasyMock.expect(replicaManager.getMagicAndTimestampType(EasyMock.anyObject())).andReturn(None)
+    EasyMock.expect(replicaManager.getMagic(EasyMock.anyObject())).andReturn(None)
     val memberId = "memberId"
     val clientId = "clientId"
     val clientHost = "localhost"
@@ -390,7 +390,7 @@ class GroupMetadataManagerTest {
 
   @Test
   def testCommitOffsetWhenCoordinatorHasMoved() {
-    EasyMock.expect(replicaManager.getMagicAndTimestampType(EasyMock.anyObject())).andReturn(None)
+    EasyMock.expect(replicaManager.getMagic(EasyMock.anyObject())).andReturn(None)
     val memberId = ""
     val generationId = -1
     val topicPartition = new TopicPartition("foo", 0)
@@ -538,8 +538,7 @@ class GroupMetadataManagerTest {
     EasyMock.reset(partition)
     val recordsCapture: Capture[MemoryRecords] = EasyMock.newCapture()
 
-    EasyMock.expect(replicaManager.getMagicAndTimestampType(EasyMock.anyObject()))
-      .andStubReturn(Some(Record.MAGIC_VALUE_V1, TimestampType.CREATE_TIME))
+    EasyMock.expect(replicaManager.getMagic(EasyMock.anyObject())).andStubReturn(Some(Record.MAGIC_VALUE_V1))
     EasyMock.expect(replicaManager.getPartition(new TopicPartition(Topic.GroupMetadataTopicName, groupPartitionId))).andStubReturn(Some(partition))
     EasyMock.expect(partition.appendRecordsToLeader(EasyMock.capture(recordsCapture), EasyMock.anyInt()))
       .andReturn(LogAppendInfo.UnknownLogAppendInfo)
@@ -584,8 +583,7 @@ class GroupMetadataManagerTest {
     EasyMock.reset(partition)
     val recordsCapture: Capture[MemoryRecords] = EasyMock.newCapture()
 
-    EasyMock.expect(replicaManager.getMagicAndTimestampType(EasyMock.anyObject()))
-      .andStubReturn(Some(Record.MAGIC_VALUE_V1, TimestampType.LOG_APPEND_TIME))
+    EasyMock.expect(replicaManager.getMagic(EasyMock.anyObject())).andStubReturn(Some(Record.MAGIC_VALUE_V1))
     EasyMock.expect(replicaManager.getPartition(new TopicPartition(Topic.GroupMetadataTopicName, groupPartitionId))).andStubReturn(Some(partition))
     EasyMock.expect(partition.appendRecordsToLeader(EasyMock.capture(recordsCapture), EasyMock.anyInt()))
       .andReturn(LogAppendInfo.UnknownLogAppendInfo)
@@ -602,7 +600,8 @@ class GroupMetadataManagerTest {
     assertTrue(metadataTombstone.hasKey)
     assertTrue(metadataTombstone.hasNullValue)
     assertEquals(Record.MAGIC_VALUE_V1, metadataTombstone.magic)
-    assertEquals(TimestampType.LOG_APPEND_TIME, metadataTombstone.timestampType)
+    // Use CREATE_TIME, like the producer. The conversion to LOG_APPEND_TIME (if necessary) happens automatically.
+    assertEquals(TimestampType.CREATE_TIME, metadataTombstone.timestampType)
     assertTrue(metadataTombstone.timestamp > 0)
 
     val groupKey = GroupMetadataManager.readMessageKey(metadataTombstone.key).asInstanceOf[GroupMetadataKey]
@@ -762,8 +761,7 @@ class GroupMetadataManagerTest {
           new PartitionResponse(error, 0L, Record.NO_TIMESTAMP)
         )
       )})
-    EasyMock.expect(replicaManager.getMagicAndTimestampType(EasyMock.anyObject()))
-      .andStubReturn(Some(Record.MAGIC_VALUE_V1, TimestampType.CREATE_TIME))
+    EasyMock.expect(replicaManager.getMagic(EasyMock.anyObject())).andStubReturn(Some(Record.MAGIC_VALUE_V1))
   }
 
   private def buildStableGroupRecordWithMember(memberId: String): Record = {

http://git-wip-us.apache.org/repos/asf/kafka/blob/7565dcd8/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala b/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala
index 6fa7ad5..9f40ec6 100755
--- a/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala
+++ b/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala
@@ -47,7 +47,7 @@ trait KafkaServerTestHarness extends ZooKeeperTestHarness {
    * Implementations must override this method to return a set of KafkaConfigs. This method will be invoked for every
    * test and should not reuse previous configurations unless they select their ports randomly when servers are started.
    */
-  def generateConfigs(): Seq[KafkaConfig]
+  def generateConfigs: Seq[KafkaConfig]
 
   /**
    * Override this in case ACLs or security credentials must be set before `servers` are started.
@@ -64,7 +64,7 @@ trait KafkaServerTestHarness extends ZooKeeperTestHarness {
 
   def configs: Seq[KafkaConfig] = {
     if (instanceConfigs == null)
-      instanceConfigs = generateConfigs()
+      instanceConfigs = generateConfigs
     instanceConfigs
   }