You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by rs...@apache.org on 2018/09/26 13:26:07 UTC
[kafka] branch trunk updated: KAFKA-7437;
Persist leader epoch in offset commit metadata (#5689)
This is an automated email from the ASF dual-hosted git repository.
rsivaram pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 9f7267d KAFKA-7437; Persist leader epoch in offset commit metadata (#5689)
9f7267d is described below
commit 9f7267dd2fedde86bf15aabdbc5256e5fc617184
Author: Jason Gustafson <ja...@confluent.io>
AuthorDate: Wed Sep 26 06:25:55 2018 -0700
KAFKA-7437; Persist leader epoch in offset commit metadata (#5689)
This commit implements the changes described in KIP-320 for the persistence of leader epoch information in the offset commit protocol.
Reviewers: Rajini Sivaram <ra...@googlemail.com>
---
.../scala/kafka/common/OffsetAndMetadata.scala | 52 +++++++++++
.../kafka/common/OffsetMetadataAndError.scala | 81 -----------------
.../coordinator/group/GroupMetadataManager.scala | 89 +++++++++++-------
core/src/main/scala/kafka/server/KafkaApis.scala | 18 ++--
.../kafka/api/PlaintextConsumerTest.scala | 4 +-
.../integration/kafka/api/TransactionsTest.scala | 29 +++++-
.../group/GroupCoordinatorConcurrencyTest.scala | 14 +--
.../coordinator/group/GroupCoordinatorTest.scala | 54 +++++++----
.../group/GroupMetadataManagerTest.scala | 101 +++++++++++++++++----
.../coordinator/group/GroupMetadataTest.scala | 31 ++++---
10 files changed, 290 insertions(+), 183 deletions(-)
diff --git a/core/src/main/scala/kafka/common/OffsetAndMetadata.scala b/core/src/main/scala/kafka/common/OffsetAndMetadata.scala
new file mode 100644
index 0000000..5f0c080
--- /dev/null
+++ b/core/src/main/scala/kafka/common/OffsetAndMetadata.scala
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.common
+
+import java.util.Optional
+
+case class OffsetAndMetadata(offset: Long,
+ leaderEpoch: Optional[Integer],
+ metadata: String,
+ commitTimestamp: Long,
+ expireTimestamp: Option[Long]) {
+
+
+ override def toString: String = {
+ s"OffsetAndMetadata(offset=$offset" +
+ s", leaderEpoch=$leaderEpoch" +
+ s", metadata=$metadata" +
+ s", commitTimestamp=$commitTimestamp" +
+ s", expireTimestamp=$expireTimestamp)"
+ }
+}
+
+object OffsetAndMetadata {
+ val NoMetadata: String = ""
+
+ def apply(offset: Long, metadata: String, commitTimestamp: Long): OffsetAndMetadata = {
+ OffsetAndMetadata(offset, Optional.empty(), metadata, commitTimestamp, None)
+ }
+
+ def apply(offset: Long, metadata: String, commitTimestamp: Long, expireTimestamp: Long): OffsetAndMetadata = {
+ OffsetAndMetadata(offset, Optional.empty(), metadata, commitTimestamp, Some(expireTimestamp))
+ }
+
+ def apply(offset: Long, leaderEpoch: Optional[Integer], metadata: String, commitTimestamp: Long): OffsetAndMetadata = {
+ OffsetAndMetadata(offset, leaderEpoch, metadata, commitTimestamp, None)
+ }
+}
diff --git a/core/src/main/scala/kafka/common/OffsetMetadataAndError.scala b/core/src/main/scala/kafka/common/OffsetMetadataAndError.scala
deleted file mode 100644
index afe542c..0000000
--- a/core/src/main/scala/kafka/common/OffsetMetadataAndError.scala
+++ /dev/null
@@ -1,81 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kafka.common
-
-import org.apache.kafka.common.protocol.Errors
-
-case class OffsetMetadata(offset: Long, metadata: String = OffsetMetadata.NoMetadata) {
- override def toString = "OffsetMetadata[%d,%s]"
- .format(offset,
- if (metadata != null && metadata.length > 0) metadata else "NO_METADATA")
-}
-
-object OffsetMetadata {
- val InvalidOffset: Long = -1L
- val NoMetadata: String = ""
-
- val InvalidOffsetMetadata = OffsetMetadata(OffsetMetadata.InvalidOffset, OffsetMetadata.NoMetadata)
-}
-
-case class OffsetAndMetadata(offsetMetadata: OffsetMetadata,
- commitTimestamp: Long = org.apache.kafka.common.requests.OffsetCommitRequest.DEFAULT_TIMESTAMP,
- expireTimestamp: Option[Long] = None) {
-
- def offset = offsetMetadata.offset
-
- def metadata = offsetMetadata.metadata
-
- override def toString = s"[$offsetMetadata,CommitTime $commitTimestamp,ExpirationTime ${expireTimestamp.getOrElse("_")}]"
-}
-
-object OffsetAndMetadata {
- def apply(offset: Long, metadata: String, commitTimestamp: Long, expireTimestamp: Long) = new OffsetAndMetadata(OffsetMetadata(offset, metadata), commitTimestamp, Some(expireTimestamp))
-
- def apply(offset: Long, metadata: String, timestamp: Long) = new OffsetAndMetadata(OffsetMetadata(offset, metadata), timestamp)
-
- def apply(offset: Long, metadata: String) = new OffsetAndMetadata(OffsetMetadata(offset, metadata))
-
- def apply(offset: Long) = new OffsetAndMetadata(OffsetMetadata(offset, OffsetMetadata.NoMetadata))
-}
-
-case class OffsetMetadataAndError(offsetMetadata: OffsetMetadata, error: Errors = Errors.NONE) {
- def offset = offsetMetadata.offset
-
- def metadata = offsetMetadata.metadata
-
- override def toString = "[%s, Error=%s]".format(offsetMetadata, error)
-}
-
-object OffsetMetadataAndError {
- val NoOffset = OffsetMetadataAndError(OffsetMetadata.InvalidOffsetMetadata, Errors.NONE)
- val GroupLoading = OffsetMetadataAndError(OffsetMetadata.InvalidOffsetMetadata, Errors.COORDINATOR_LOAD_IN_PROGRESS)
- val UnknownMember = OffsetMetadataAndError(OffsetMetadata.InvalidOffsetMetadata, Errors.UNKNOWN_MEMBER_ID)
- val NotCoordinatorForGroup = OffsetMetadataAndError(OffsetMetadata.InvalidOffsetMetadata, Errors.NOT_COORDINATOR)
- val GroupCoordinatorNotAvailable = OffsetMetadataAndError(OffsetMetadata.InvalidOffsetMetadata, Errors.COORDINATOR_NOT_AVAILABLE)
- val UnknownTopicOrPartition = OffsetMetadataAndError(OffsetMetadata.InvalidOffsetMetadata, Errors.UNKNOWN_TOPIC_OR_PARTITION)
- val IllegalGroupGenerationId = OffsetMetadataAndError(OffsetMetadata.InvalidOffsetMetadata, Errors.ILLEGAL_GENERATION)
-
- def apply(offset: Long) = new OffsetMetadataAndError(OffsetMetadata(offset, OffsetMetadata.NoMetadata), Errors.NONE)
-
- def apply(error: Errors) = new OffsetMetadataAndError(OffsetMetadata.InvalidOffsetMetadata, error)
-
- def apply(offset: Long, metadata: String, error: Errors) = new OffsetMetadataAndError(OffsetMetadata(offset, metadata), error)
-}
-
-
-
diff --git a/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala b/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
index dba8b4e..21f4a3d 100644
--- a/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
+++ b/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
@@ -26,7 +26,7 @@ import java.util.concurrent.atomic.AtomicBoolean
import java.util.concurrent.locks.ReentrantLock
import com.yammer.metrics.core.Gauge
-import kafka.api.{ApiVersion, KAFKA_0_10_1_IV0, KAFKA_2_1_IV0}
+import kafka.api.{ApiVersion, KAFKA_0_10_1_IV0, KAFKA_2_1_IV0, KAFKA_2_1_IV1}
import kafka.common.{MessageFormatter, OffsetAndMetadata}
import kafka.metrics.KafkaMetricsGroup
import kafka.server.ReplicaManager
@@ -460,7 +460,7 @@ class GroupMetadataManager(brokerId: Int,
// that commit offsets to Kafka.)
group.allOffsets.map { case (topicPartition, offsetAndMetadata) =>
topicPartition -> new OffsetFetchResponse.PartitionData(offsetAndMetadata.offset,
- Optional.empty(), offsetAndMetadata.metadata, Errors.NONE)
+ offsetAndMetadata.leaderEpoch, offsetAndMetadata.metadata, Errors.NONE)
}
case Some(topicPartitions) =>
@@ -471,7 +471,7 @@ class GroupMetadataManager(brokerId: Int,
Optional.empty(), "", Errors.NONE)
case Some(offsetAndMetadata) =>
new OffsetFetchResponse.PartitionData(offsetAndMetadata.offset,
- Optional.empty(), offsetAndMetadata.metadata, Errors.NONE)
+ offsetAndMetadata.leaderEpoch, offsetAndMetadata.metadata, Errors.NONE)
}
topicPartition -> partitionData
}.toMap
@@ -960,6 +960,16 @@ object GroupMetadataManager {
private val OFFSET_VALUE_METADATA_FIELD_V2 = OFFSET_COMMIT_VALUE_SCHEMA_V2.get("metadata")
private val OFFSET_VALUE_COMMIT_TIMESTAMP_FIELD_V2 = OFFSET_COMMIT_VALUE_SCHEMA_V2.get("commit_timestamp")
+ private val OFFSET_COMMIT_VALUE_SCHEMA_V3 = new Schema(
+ new Field("offset", INT64),
+ new Field("leader_epoch", INT32),
+ new Field("metadata", STRING, "Associated metadata.", ""),
+ new Field("commit_timestamp", INT64))
+ private val OFFSET_VALUE_OFFSET_FIELD_V3 = OFFSET_COMMIT_VALUE_SCHEMA_V3.get("offset")
+ private val OFFSET_VALUE_LEADER_EPOCH_FIELD_V3 = OFFSET_COMMIT_VALUE_SCHEMA_V3.get("leader_epoch")
+ private val OFFSET_VALUE_METADATA_FIELD_V3 = OFFSET_COMMIT_VALUE_SCHEMA_V3.get("metadata")
+ private val OFFSET_VALUE_COMMIT_TIMESTAMP_FIELD_V3 = OFFSET_COMMIT_VALUE_SCHEMA_V3.get("commit_timestamp")
+
private val GROUP_METADATA_KEY_SCHEMA = new Schema(new Field("group", STRING))
private val GROUP_KEY_GROUP_FIELD = GROUP_METADATA_KEY_SCHEMA.get("group")
@@ -1019,7 +1029,6 @@ object GroupMetadataManager {
new Field(CURRENT_STATE_TIMESTAMP_KEY, INT64),
new Field(MEMBERS_KEY, new ArrayOf(MEMBER_METADATA_V2)))
-
// map of versions to key schemas as data types
private val MESSAGE_TYPE_SCHEMAS = Map(
0 -> OFFSET_COMMIT_KEY_SCHEMA,
@@ -1030,21 +1039,18 @@ object GroupMetadataManager {
private val OFFSET_VALUE_SCHEMAS = Map(
0 -> OFFSET_COMMIT_VALUE_SCHEMA_V0,
1 -> OFFSET_COMMIT_VALUE_SCHEMA_V1,
- 2 -> OFFSET_COMMIT_VALUE_SCHEMA_V2)
+ 2 -> OFFSET_COMMIT_VALUE_SCHEMA_V2,
+ 3 -> OFFSET_COMMIT_VALUE_SCHEMA_V3)
// map of version of group metadata value schemas
private val GROUP_VALUE_SCHEMAS = Map(
0 -> GROUP_METADATA_VALUE_SCHEMA_V0,
1 -> GROUP_METADATA_VALUE_SCHEMA_V1,
2 -> GROUP_METADATA_VALUE_SCHEMA_V2)
- private val CURRENT_GROUP_VALUE_SCHEMA_VERSION = 2.toShort
private val CURRENT_OFFSET_KEY_SCHEMA = schemaForKey(CURRENT_OFFSET_KEY_SCHEMA_VERSION)
private val CURRENT_GROUP_KEY_SCHEMA = schemaForKey(CURRENT_GROUP_KEY_SCHEMA_VERSION)
- private val CURRENT_OFFSET_VALUE_SCHEMA = schemaForOffset(2)
- private val CURRENT_GROUP_VALUE_SCHEMA = schemaForGroup(CURRENT_GROUP_VALUE_SCHEMA_VERSION)
-
private def schemaForKey(version: Int) = {
val schemaOpt = MESSAGE_TYPE_SCHEMAS.get(version)
schemaOpt match {
@@ -1053,7 +1059,7 @@ object GroupMetadataManager {
}
}
- private def schemaForOffset(version: Int) = {
+ private def schemaForOffsetValue(version: Int) = {
val schemaOpt = OFFSET_VALUE_SCHEMAS.get(version)
schemaOpt match {
case Some(schema) => schema
@@ -1061,7 +1067,7 @@ object GroupMetadataManager {
}
}
- private def schemaForGroup(version: Int) = {
+ private def schemaForGroupValue(version: Int) = {
val schemaOpt = GROUP_VALUE_SCHEMAS.get(version)
schemaOpt match {
case Some(schema) => schema
@@ -1074,8 +1080,8 @@ object GroupMetadataManager {
*
* @return key for offset commit message
*/
- private[group] def offsetCommitKey(group: String, topicPartition: TopicPartition,
- versionId: Short = 0): Array[Byte] = {
+ private[group] def offsetCommitKey(group: String,
+ topicPartition: TopicPartition): Array[Byte] = {
val key = new Struct(CURRENT_OFFSET_KEY_SCHEMA)
key.set(OFFSET_KEY_GROUP_FIELD, group)
key.set(OFFSET_KEY_TOPIC_FIELD, topicPartition.topic)
@@ -1113,27 +1119,34 @@ object GroupMetadataManager {
apiVersion: ApiVersion): Array[Byte] = {
// generate commit value according to schema version
val (version, value) = {
- if (apiVersion < KAFKA_2_1_IV0 || offsetAndMetadata.expireTimestamp.nonEmpty)
- // if an older version of the API is used, or if an explicit expiration is provided, use the older schema
- (1.toShort, new Struct(OFFSET_COMMIT_VALUE_SCHEMA_V1))
- else
- (2.toShort, new Struct(OFFSET_COMMIT_VALUE_SCHEMA_V2))
- }
-
- if (version == 2) {
- value.set(OFFSET_VALUE_OFFSET_FIELD_V2, offsetAndMetadata.offset)
- value.set(OFFSET_VALUE_METADATA_FIELD_V2, offsetAndMetadata.metadata)
- value.set(OFFSET_VALUE_COMMIT_TIMESTAMP_FIELD_V2, offsetAndMetadata.commitTimestamp)
- } else {
- value.set(OFFSET_VALUE_OFFSET_FIELD_V1, offsetAndMetadata.offset)
- value.set(OFFSET_VALUE_METADATA_FIELD_V1, offsetAndMetadata.metadata)
- value.set(OFFSET_VALUE_COMMIT_TIMESTAMP_FIELD_V1, offsetAndMetadata.commitTimestamp)
- // version 1 has a non empty expireTimestamp field
- value.set(OFFSET_VALUE_EXPIRE_TIMESTAMP_FIELD_V1, offsetAndMetadata.expireTimestamp.getOrElse(OffsetCommitRequest.DEFAULT_TIMESTAMP))
+ if (apiVersion < KAFKA_2_1_IV0 || offsetAndMetadata.expireTimestamp.nonEmpty) {
+ val value = new Struct(OFFSET_COMMIT_VALUE_SCHEMA_V1)
+ value.set(OFFSET_VALUE_OFFSET_FIELD_V1, offsetAndMetadata.offset)
+ value.set(OFFSET_VALUE_METADATA_FIELD_V1, offsetAndMetadata.metadata)
+ value.set(OFFSET_VALUE_COMMIT_TIMESTAMP_FIELD_V1, offsetAndMetadata.commitTimestamp)
+ // version 1 has a non empty expireTimestamp field
+ value.set(OFFSET_VALUE_EXPIRE_TIMESTAMP_FIELD_V1,
+ offsetAndMetadata.expireTimestamp.getOrElse(OffsetCommitRequest.DEFAULT_TIMESTAMP))
+ (1, value)
+ } else if (apiVersion < KAFKA_2_1_IV1) {
+ val value = new Struct(OFFSET_COMMIT_VALUE_SCHEMA_V2)
+ value.set(OFFSET_VALUE_OFFSET_FIELD_V2, offsetAndMetadata.offset)
+ value.set(OFFSET_VALUE_METADATA_FIELD_V2, offsetAndMetadata.metadata)
+ value.set(OFFSET_VALUE_COMMIT_TIMESTAMP_FIELD_V2, offsetAndMetadata.commitTimestamp)
+ (2, value)
+ } else {
+ val value = new Struct(OFFSET_COMMIT_VALUE_SCHEMA_V3)
+ value.set(OFFSET_VALUE_OFFSET_FIELD_V3, offsetAndMetadata.offset)
+ value.set(OFFSET_VALUE_LEADER_EPOCH_FIELD_V3,
+ offsetAndMetadata.leaderEpoch.orElse(RecordBatch.NO_PARTITION_LEADER_EPOCH))
+ value.set(OFFSET_VALUE_METADATA_FIELD_V3, offsetAndMetadata.metadata)
+ value.set(OFFSET_VALUE_COMMIT_TIMESTAMP_FIELD_V3, offsetAndMetadata.commitTimestamp)
+ (3, value)
+ }
}
val byteBuffer = ByteBuffer.allocate(2 /* version */ + value.sizeOf)
- byteBuffer.putShort(version)
+ byteBuffer.putShort(version.toShort)
value.writeTo(byteBuffer)
byteBuffer.array()
}
@@ -1157,7 +1170,7 @@ object GroupMetadataManager {
else if (apiVersion < KAFKA_2_1_IV0)
(1.toShort, new Struct(GROUP_METADATA_VALUE_SCHEMA_V1))
else
- (2.toShort, new Struct(CURRENT_GROUP_VALUE_SCHEMA))
+ (2.toShort, new Struct(GROUP_METADATA_VALUE_SCHEMA_V2))
}
value.set(PROTOCOL_TYPE_KEY, groupMetadata.protocolType.getOrElse(""))
@@ -1242,7 +1255,7 @@ object GroupMetadataManager {
null
} else {
val version = buffer.getShort
- val valueSchema = schemaForOffset(version)
+ val valueSchema = schemaForOffsetValue(version)
val value = valueSchema.read(buffer)
if (version == 0) {
@@ -1264,6 +1277,14 @@ object GroupMetadataManager {
val commitTimestamp = value.get(OFFSET_VALUE_COMMIT_TIMESTAMP_FIELD_V2).asInstanceOf[Long]
OffsetAndMetadata(offset, metadata, commitTimestamp)
+ } else if (version == 3) {
+ val offset = value.get(OFFSET_VALUE_OFFSET_FIELD_V3).asInstanceOf[Long]
+ val leaderEpoch = value.get(OFFSET_VALUE_LEADER_EPOCH_FIELD_V3).asInstanceOf[Int]
+ val metadata = value.get(OFFSET_VALUE_METADATA_FIELD_V3).asInstanceOf[String]
+ val commitTimestamp = value.get(OFFSET_VALUE_COMMIT_TIMESTAMP_FIELD_V3).asInstanceOf[Long]
+
+ val leaderEpochOpt: Optional[Integer] = if (leaderEpoch < 0) Optional.empty() else Optional.of(leaderEpoch)
+ OffsetAndMetadata(offset, leaderEpochOpt, metadata, commitTimestamp)
} else {
throw new IllegalStateException(s"Unknown offset message version: $version")
}
@@ -1282,7 +1303,7 @@ object GroupMetadataManager {
null
} else {
val version = buffer.getShort
- val valueSchema = schemaForGroup(version)
+ val valueSchema = schemaForGroupValue(version)
val value = valueSchema.read(buffer)
if (version >= 0 && version <= 2) {
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index 10119c8..d0be97f 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -27,7 +27,7 @@ import java.util.{Collections, Optional, Properties}
import kafka.admin.{AdminUtils, RackAwareMode}
import kafka.api.{ApiVersion, KAFKA_0_11_0_IV0}
import kafka.cluster.Partition
-import kafka.common.{OffsetAndMetadata, OffsetMetadata}
+import kafka.common.OffsetAndMetadata
import kafka.controller.KafkaController
import kafka.coordinator.group.{GroupCoordinator, JoinGroupResult}
import kafka.coordinator.transaction.{InitProducerIdResult, TransactionCoordinator}
@@ -340,9 +340,11 @@ class KafkaApis(val requestChannel: RequestChannel,
// - For v5 and beyond there is no per partition expiration timestamp, so this field is no longer in effect
val currentTimestamp = time.milliseconds
val partitionData = authorizedTopicRequestInfo.mapValues { partitionData =>
- val metadata = if (partitionData.metadata == null) OffsetMetadata.NoMetadata else partitionData.metadata
+ val metadata = if (partitionData.metadata == null) OffsetAndMetadata.NoMetadata else partitionData.metadata
new OffsetAndMetadata(
- offsetMetadata = OffsetMetadata(partitionData.offset, metadata),
+ offset = partitionData.offset,
+ leaderEpoch = partitionData.leaderEpoch,
+ metadata = metadata,
commitTimestamp = partitionData.timestamp match {
case OffsetCommitRequest.DEFAULT_TIMESTAMP => currentTimestamp
case customTimestamp => customTimestamp
@@ -1907,15 +1909,15 @@ class KafkaApis(val requestChannel: RequestChannel,
}
private def convertTxnOffsets(offsetsMap: immutable.Map[TopicPartition, TxnOffsetCommitRequest.CommittedOffset]): immutable.Map[TopicPartition, OffsetAndMetadata] = {
- val offsetRetention = groupCoordinator.offsetConfig.offsetsRetentionMs
val currentTimestamp = time.milliseconds
- val defaultExpireTimestamp = offsetRetention + currentTimestamp
offsetsMap.map { case (topicPartition, partitionData) =>
- val metadata = if (partitionData.metadata == null) OffsetMetadata.NoMetadata else partitionData.metadata
+ val metadata = if (partitionData.metadata == null) OffsetAndMetadata.NoMetadata else partitionData.metadata
topicPartition -> new OffsetAndMetadata(
- offsetMetadata = OffsetMetadata(partitionData.offset, metadata),
+ offset = partitionData.offset,
+ leaderEpoch = partitionData.leaderEpoch,
+ metadata = metadata,
commitTimestamp = currentTimestamp,
- expireTimestamp = Some(defaultExpireTimestamp))
+ expireTimestamp = None)
}
}
diff --git a/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala b/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
index 4417028..522ca49 100644
--- a/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
+++ b/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
@@ -14,7 +14,7 @@ package kafka.api
import java.util
import java.util.regex.Pattern
-import java.util.{Collections, Locale, Properties}
+import java.util.{Collections, Locale, Optional, Properties}
import kafka.log.LogConfig
import kafka.server.KafkaConfig
@@ -503,7 +503,7 @@ class PlaintextConsumerTest extends BaseConsumerTest {
consumer.assign(List(tp).asJava)
// sync commit
- val syncMetadata = new OffsetAndMetadata(5, "foo")
+ val syncMetadata = new OffsetAndMetadata(5, Optional.of(15), "foo")
consumer.commitSync(Map((tp, syncMetadata)).asJava)
assertEquals(syncMetadata, consumer.committed(tp))
diff --git a/core/src/test/scala/integration/kafka/api/TransactionsTest.scala b/core/src/test/scala/integration/kafka/api/TransactionsTest.scala
index 58059b4..ab14db4 100644
--- a/core/src/test/scala/integration/kafka/api/TransactionsTest.scala
+++ b/core/src/test/scala/integration/kafka/api/TransactionsTest.scala
@@ -18,7 +18,7 @@
package kafka.api
import java.lang.{Long => JLong}
-import java.util.Properties
+import java.util.{Optional, Properties}
import java.util.concurrent.TimeUnit
import kafka.integration.KafkaServerTestHarness
@@ -365,6 +365,31 @@ class TransactionsTest extends KafkaServerTestHarness {
}
@Test
+ def testOffsetMetadataInSendOffsetsToTransaction() = {
+ val tp = new TopicPartition(topic1, 0)
+ val groupId = "group"
+
+ val producer = transactionalProducers.head
+ val consumer = createReadCommittedConsumer(groupId)
+
+ consumer.subscribe(List(topic1).asJava)
+
+ producer.initTransactions()
+
+ producer.beginTransaction()
+ val offsetAndMetadata = new OffsetAndMetadata(110L, Optional.of(15), "some metadata")
+ producer.sendOffsetsToTransaction(Map(tp -> offsetAndMetadata).asJava, groupId)
+ producer.commitTransaction() // ok
+
+ // The call to commit the transaction may return before all markers are visible, so we initialize a second
+ // producer to ensure the transaction completes and the committed offsets are visible.
+ val producer2 = transactionalProducers(1)
+ producer2.initTransactions()
+
+ assertEquals(offsetAndMetadata, consumer.committed(tp))
+ }
+
+ @Test
def testFencingOnSend() {
val producer1 = transactionalProducers(0)
val producer2 = transactionalProducers(1)
@@ -434,7 +459,7 @@ class TransactionsTest extends KafkaServerTestHarness {
val result = producer1.send(TestUtils.producerRecordWithExpectedTransactionStatus(topic1, "1", "5", willBeCommitted = false))
val recordMetadata = result.get()
error(s"Missed a producer fenced exception when writing to ${recordMetadata.topic}-${recordMetadata.partition}. Grab the logs!!")
- servers.foreach { case (server) =>
+ servers.foreach { server =>
error(s"log dirs: ${server.logManager.liveLogDirs.map(_.getAbsolutePath).head}")
}
fail("Should not be able to send messages from a fenced producer.")
diff --git a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorConcurrencyTest.scala b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorConcurrencyTest.scala
index befd22a..ef014c8 100644
--- a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorConcurrencyTest.scala
+++ b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorConcurrencyTest.scala
@@ -17,24 +17,25 @@
package kafka.coordinator.group
-import java.util.concurrent.{ ConcurrentHashMap, TimeUnit }
+import java.util.concurrent.{ConcurrentHashMap, TimeUnit}
import kafka.common.OffsetAndMetadata
import kafka.coordinator.AbstractCoordinatorConcurrencyTest
import kafka.coordinator.AbstractCoordinatorConcurrencyTest._
import kafka.coordinator.group.GroupCoordinatorConcurrencyTest._
-import kafka.server.{ DelayedOperationPurgatory, KafkaConfig }
+import kafka.server.{DelayedOperationPurgatory, KafkaConfig}
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.internals.Topic
import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.requests.JoinGroupRequest
+import org.apache.kafka.common.utils.Time
import org.easymock.EasyMock
import org.junit.Assert._
-import org.junit.{ After, Before, Test }
+import org.junit.{After, Before, Test}
import scala.collection._
import scala.concurrent.duration.Duration
-import scala.concurrent.{ Await, Future, Promise, TimeoutException }
+import scala.concurrent.{Await, Future, Promise, TimeoutException}
class GroupCoordinatorConcurrencyTest extends AbstractCoordinatorConcurrencyTest[GroupMember] {
@@ -211,7 +212,7 @@ class GroupCoordinatorConcurrencyTest extends AbstractCoordinatorConcurrencyTest
}
override def runWithCallback(member: GroupMember, responseCallback: CommitOffsetCallback): Unit = {
val tp = new TopicPartition("topic", 0)
- val offsets = immutable.Map(tp -> OffsetAndMetadata(1))
+ val offsets = immutable.Map(tp -> OffsetAndMetadata(1, "", Time.SYSTEM.milliseconds()))
groupCoordinator.handleCommitOffsets(member.groupId, member.memberId, member.generationId,
offsets, responseCallback)
}
@@ -224,7 +225,7 @@ class GroupCoordinatorConcurrencyTest extends AbstractCoordinatorConcurrencyTest
class CommitTxnOffsetsOperation extends CommitOffsetsOperation {
override def runWithCallback(member: GroupMember, responseCallback: CommitOffsetCallback): Unit = {
val tp = new TopicPartition("topic", 0)
- val offsets = immutable.Map(tp -> OffsetAndMetadata(1))
+ val offsets = immutable.Map(tp -> OffsetAndMetadata(1, "", Time.SYSTEM.milliseconds()))
val producerId = 1000L
val producerEpoch : Short = 2
// When transaction offsets are appended to the log, transactions may be scheduled for
@@ -309,4 +310,5 @@ object GroupCoordinatorConcurrencyTest {
@volatile var generationId: Int = -1
def groupId: String = group.groupId
}
+
}
diff --git a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala
index 608d7cc..9df16ad 100644
--- a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala
+++ b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala
@@ -17,6 +17,8 @@
package kafka.coordinator.group
+import java.util.Optional
+
import kafka.common.OffsetAndMetadata
import kafka.server.{DelayedOperationPurgatory, KafkaConfig, ReplicaManager}
import kafka.utils._
@@ -138,7 +140,7 @@ class GroupCoordinatorTest extends JUnitSuite {
val topicPartition = new TopicPartition("foo", 0)
var offsetCommitErrors = Map.empty[TopicPartition, Errors]
groupCoordinator.handleCommitOffsets(otherGroupId, memberId, 1,
- Map(topicPartition -> OffsetAndMetadata(15L)), result => { offsetCommitErrors = result })
+ Map(topicPartition -> offsetAndMetadata(15L)), result => { offsetCommitErrors = result })
assertEquals(Some(Errors.COORDINATOR_LOAD_IN_PROGRESS), offsetCommitErrors.get(topicPartition))
// Heartbeat
@@ -436,7 +438,7 @@ class GroupCoordinatorTest extends JUnitSuite {
val sessionTimeout = 1000
val memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID
val tp = new TopicPartition("topic", 0)
- val offset = OffsetAndMetadata(0)
+ val offset = offsetAndMetadata(0)
val joinGroupResult = joinGroup(groupId, memberId, protocolType, protocols,
rebalanceTimeout = sessionTimeout, sessionTimeout = sessionTimeout)
@@ -818,7 +820,7 @@ class GroupCoordinatorTest extends JUnitSuite {
def testCommitOffsetFromUnknownGroup() {
val generationId = 1
val tp = new TopicPartition("topic", 0)
- val offset = OffsetAndMetadata(0)
+ val offset = offsetAndMetadata(0)
val commitOffsetResult = commitOffsets(groupId, memberId, generationId, Map(tp -> offset))
assertEquals(Errors.ILLEGAL_GENERATION, commitOffsetResult(tp))
@@ -827,7 +829,7 @@ class GroupCoordinatorTest extends JUnitSuite {
@Test
def testCommitOffsetWithDefaultGeneration() {
val tp = new TopicPartition("topic", 0)
- val offset = OffsetAndMetadata(0)
+ val offset = offsetAndMetadata(0)
val commitOffsetResult = commitOffsets(groupId, OffsetCommitRequest.DEFAULT_MEMBER_ID,
OffsetCommitRequest.DEFAULT_GENERATION_ID, Map(tp -> offset))
@@ -854,7 +856,7 @@ class GroupCoordinatorTest extends JUnitSuite {
// The simple offset commit should now fail
EasyMock.reset(replicaManager)
val tp = new TopicPartition("topic", 0)
- val offset = OffsetAndMetadata(0)
+ val offset = offsetAndMetadata(0)
val commitOffsetResult = commitOffsets(groupId, OffsetCommitRequest.DEFAULT_MEMBER_ID,
OffsetCommitRequest.DEFAULT_GENERATION_ID, Map(tp -> offset))
assertEquals(Errors.NONE, commitOffsetResult(tp))
@@ -865,17 +867,25 @@ class GroupCoordinatorTest extends JUnitSuite {
}
@Test
- def testFetchOffsets() {
+ def testFetchOffsets(): Unit = {
val tp = new TopicPartition("topic", 0)
- val offset = OffsetAndMetadata(0)
+ val offset = 97L
+ val metadata = "some metadata"
+ val leaderEpoch = Optional.of[Integer](15)
+ val offsetAndMetadata = OffsetAndMetadata(offset, leaderEpoch, metadata, timer.time.milliseconds())
val commitOffsetResult = commitOffsets(groupId, OffsetCommitRequest.DEFAULT_MEMBER_ID,
- OffsetCommitRequest.DEFAULT_GENERATION_ID, Map(tp -> offset))
+ OffsetCommitRequest.DEFAULT_GENERATION_ID, Map(tp -> offsetAndMetadata))
assertEquals(Errors.NONE, commitOffsetResult(tp))
val (error, partitionData) = groupCoordinator.handleFetchOffsets(groupId, Some(Seq(tp)))
assertEquals(Errors.NONE, error)
- assertEquals(Some(0), partitionData.get(tp).map(_.offset))
+
+ val maybePartitionData = partitionData.get(tp)
+ assertTrue(maybePartitionData.isDefined)
+ assertEquals(offset, maybePartitionData.get.offset)
+ assertEquals(metadata, maybePartitionData.get.metadata)
+ assertEquals(leaderEpoch, maybePartitionData.get.leaderEpoch)
}
@Test
@@ -884,7 +894,7 @@ class GroupCoordinatorTest extends JUnitSuite {
// To allow inspection and removal of the empty group, we must also support DescribeGroups and DeleteGroups
val tp = new TopicPartition("topic", 0)
- val offset = OffsetAndMetadata(0)
+ val offset = offsetAndMetadata(0)
val groupId = ""
val commitOffsetResult = commitOffsets(groupId, OffsetCommitRequest.DEFAULT_MEMBER_ID,
@@ -919,7 +929,7 @@ class GroupCoordinatorTest extends JUnitSuite {
@Test
def testBasicFetchTxnOffsets() {
val tp = new TopicPartition("topic", 0)
- val offset = OffsetAndMetadata(0)
+ val offset = offsetAndMetadata(0)
val producerId = 1000L
val producerEpoch : Short = 2
@@ -946,7 +956,7 @@ class GroupCoordinatorTest extends JUnitSuite {
@Test
def testFetchTxnOffsetsWithAbort() {
val tp = new TopicPartition("topic", 0)
- val offset = OffsetAndMetadata(0)
+ val offset = offsetAndMetadata(0)
val producerId = 1000L
val producerEpoch : Short = 2
@@ -970,7 +980,7 @@ class GroupCoordinatorTest extends JUnitSuite {
@Test
def testFetchTxnOffsetsIgnoreSpuriousCommit() {
val tp = new TopicPartition("topic", 0)
- val offset = OffsetAndMetadata(0)
+ val offset = offsetAndMetadata(0)
val producerId = 1000L
val producerEpoch : Short = 2
@@ -1003,7 +1013,7 @@ class GroupCoordinatorTest extends JUnitSuite {
// Marker for only one partition is received. That commit should be materialized while the other should not.
val partitions = List(new TopicPartition("topic1", 0), new TopicPartition("topic2", 0))
- val offsets = List(OffsetAndMetadata(10), OffsetAndMetadata(15))
+ val offsets = List(offsetAndMetadata(10), offsetAndMetadata(15))
val producerId = 1000L
val producerEpoch: Short = 3
@@ -1082,7 +1092,7 @@ class GroupCoordinatorTest extends JUnitSuite {
// Each partition's offsets should be materialized when the corresponding producer's marker is received.
val partitions = List(new TopicPartition("topic1", 0), new TopicPartition("topic2", 0))
- val offsets = List(OffsetAndMetadata(10), OffsetAndMetadata(15))
+ val offsets = List(offsetAndMetadata(10), offsetAndMetadata(15))
val producerIds = List(1000L, 1005L)
val producerEpochs: Seq[Short] = List(3, 4)
@@ -1154,9 +1164,9 @@ class GroupCoordinatorTest extends JUnitSuite {
val tp1 = new TopicPartition("topic", 0)
val tp2 = new TopicPartition("topic", 1)
val tp3 = new TopicPartition("other-topic", 0)
- val offset1 = OffsetAndMetadata(15)
- val offset2 = OffsetAndMetadata(16)
- val offset3 = OffsetAndMetadata(17)
+ val offset1 = offsetAndMetadata(15)
+ val offset2 = offsetAndMetadata(16)
+ val offset3 = offsetAndMetadata(17)
assertEquals((Errors.NONE, Map.empty), groupCoordinator.handleFetchOffsets(groupId))
@@ -1179,7 +1189,7 @@ class GroupCoordinatorTest extends JUnitSuite {
def testCommitOffsetInCompletingRebalance() {
val memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID
val tp = new TopicPartition("topic", 0)
- val offset = OffsetAndMetadata(0)
+ val offset = offsetAndMetadata(0)
val joinGroupResult = joinGroup(groupId, memberId, protocolType, protocols)
val assignedMemberId = joinGroupResult.memberId
@@ -1425,7 +1435,7 @@ class GroupCoordinatorTest extends JUnitSuite {
EasyMock.reset(replicaManager)
val tp = new TopicPartition("topic", 0)
- val offset = OffsetAndMetadata(0)
+ val offset = offsetAndMetadata(0)
val commitOffsetResult = commitOffsets(groupId, assignedMemberId, joinGroupResult.generationId, Map(tp -> offset))
assertEquals(Errors.NONE, commitOffsetResult(tp))
@@ -1730,4 +1740,8 @@ class GroupCoordinatorTest extends JUnitSuite {
groupCoordinator.groupManager.handleTxnCompletion(producerId, offsetsPartitions.map(_.partition).toSet, isCommit)
}
+ private def offsetAndMetadata(offset: Long): OffsetAndMetadata = {
+ OffsetAndMetadata(offset, "", timer.time.milliseconds())
+ }
+
}
diff --git a/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala b/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala
index a0dfbda..b48f297 100644
--- a/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala
@@ -17,7 +17,7 @@
package kafka.coordinator.group
-import kafka.api.{ApiVersion, KAFKA_1_1_IV0, KAFKA_2_1_IV0}
+import kafka.api._
import kafka.cluster.Partition
import kafka.common.OffsetAndMetadata
import kafka.log.{Log, LogAppendInfo}
@@ -35,6 +35,7 @@ import org.easymock.{Capture, EasyMock, IAnswer}
import org.junit.Assert.{assertEquals, assertFalse, assertNull, assertTrue}
import org.junit.{Before, Test}
import java.nio.ByteBuffer
+import java.util.Optional
import com.yammer.metrics.Metrics
import com.yammer.metrics.core.Gauge
@@ -932,7 +933,7 @@ class GroupMetadataManagerTest {
val group = new GroupMetadata(groupId, Empty, time)
groupMetadataManager.addGroup(group)
- val offsets = immutable.Map(topicPartition -> OffsetAndMetadata(offset))
+ val offsets = immutable.Map(topicPartition -> OffsetAndMetadata(offset, "", time.milliseconds()))
expectAppendMessage(Errors.NONE)
EasyMock.replay(replicaManager)
@@ -974,7 +975,8 @@ class GroupMetadataManagerTest {
val group = new GroupMetadata(groupId, Empty, time)
groupMetadataManager.addGroup(group)
- val offsets = immutable.Map(topicPartition -> OffsetAndMetadata(offset))
+ val offsetAndMetadata = OffsetAndMetadata(offset, "", time.milliseconds())
+ val offsets = immutable.Map(topicPartition -> offsetAndMetadata)
val capturedResponseCallback = appendAndCaptureCallback()
EasyMock.replay(replicaManager)
@@ -996,7 +998,7 @@ class GroupMetadataManagerTest {
group.completePendingTxnOffsetCommit(producerId, isCommit = true)
assertTrue(group.hasOffsets)
assertFalse(group.allOffsets.isEmpty)
- assertEquals(Some(OffsetAndMetadata(offset)), group.offset(topicPartition))
+ assertEquals(Some(offsetAndMetadata), group.offset(topicPartition))
EasyMock.verify(replicaManager)
}
@@ -1014,7 +1016,7 @@ class GroupMetadataManagerTest {
val group = new GroupMetadata(groupId, Empty, time)
groupMetadataManager.addGroup(group)
- val offsets = immutable.Map(topicPartition -> OffsetAndMetadata(offset))
+ val offsets = immutable.Map(topicPartition -> OffsetAndMetadata(offset, "", time.milliseconds()))
val capturedResponseCallback = appendAndCaptureCallback()
EasyMock.replay(replicaManager)
@@ -1053,7 +1055,7 @@ class GroupMetadataManagerTest {
val group = new GroupMetadata(groupId, Empty, time)
groupMetadataManager.addGroup(group)
- val offsets = immutable.Map(topicPartition -> OffsetAndMetadata(offset))
+ val offsets = immutable.Map(topicPartition -> OffsetAndMetadata(offset, "", time.milliseconds()))
val capturedResponseCallback = appendAndCaptureCallback()
EasyMock.replay(replicaManager)
@@ -1091,7 +1093,7 @@ class GroupMetadataManagerTest {
val group = new GroupMetadata(groupId, Empty, time)
groupMetadataManager.addGroup(group)
- val offsets = immutable.Map(topicPartition -> OffsetAndMetadata(offset))
+ val offsets = immutable.Map(topicPartition -> OffsetAndMetadata(offset, "", time.milliseconds()))
EasyMock.replay(replicaManager)
@@ -1133,7 +1135,7 @@ class GroupMetadataManagerTest {
val group = new GroupMetadata(groupId, Empty, time)
groupMetadataManager.addGroup(group)
- val offsets = immutable.Map(topicPartition -> OffsetAndMetadata(offset))
+ val offsets = immutable.Map(topicPartition -> OffsetAndMetadata(offset, "", time.milliseconds()))
val capturedResponseCallback = appendAndCaptureCallback()
EasyMock.replay(replicaManager)
@@ -1329,7 +1331,7 @@ class GroupMetadataManagerTest {
// expire the offset after 1 millisecond
val startMs = time.milliseconds
val offsets = immutable.Map(
- topicPartition1 -> OffsetAndMetadata(offset, "", startMs, startMs + 1),
+ topicPartition1 -> OffsetAndMetadata(offset, Optional.empty(), "", startMs, Some(startMs + 1)),
topicPartition2 -> OffsetAndMetadata(offset, "", startMs, startMs + 3))
mockGetPartition()
@@ -1633,7 +1635,7 @@ class GroupMetadataManagerTest {
)
val apiVersion = KAFKA_1_1_IV0
- val offsetCommitRecords = createCommittedOffsetRecords(committedOffsets, apiVersion = apiVersion, retentionTime = Some(100))
+ val offsetCommitRecords = createCommittedOffsetRecords(committedOffsets, apiVersion = apiVersion, retentionTimeOpt = Some(100))
val memberId = "98098230493"
val groupMetadataRecord = buildStableGroupRecordWithMember(generation, protocolType, protocol, memberId, apiVersion = apiVersion)
val records = MemoryRecords.withRecords(startOffset, CompressionType.NONE,
@@ -1673,7 +1675,7 @@ class GroupMetadataManagerTest {
new TopicPartition("bar", 0) -> 8992L
)
- val offsetCommitRecords = createCommittedOffsetRecords(committedOffsets, retentionTime = Some(100))
+ val offsetCommitRecords = createCommittedOffsetRecords(committedOffsets, retentionTimeOpt = Some(100))
val memberId = "98098230493"
val groupMetadataRecord = buildStableGroupRecordWithMember(generation, protocolType, protocol, memberId)
val records = MemoryRecords.withRecords(startOffset, CompressionType.NONE,
@@ -1700,6 +1702,70 @@ class GroupMetadataManagerTest {
}
}
+ @Test
+ def testSerdeOffsetCommitValue(): Unit = {
+ val offsetAndMetadata = OffsetAndMetadata(
+ offset = 537L,
+ leaderEpoch = Optional.of(15),
+ metadata = "metadata",
+ commitTimestamp = time.milliseconds(),
+ expireTimestamp = None)
+
+ def verifySerde(apiVersion: ApiVersion, expectedOffsetCommitValueVersion: Int): Unit = {
+ val bytes = GroupMetadataManager.offsetCommitValue(offsetAndMetadata, apiVersion)
+ val buffer = ByteBuffer.wrap(bytes)
+
+ assertEquals(expectedOffsetCommitValueVersion, buffer.getShort(0).toInt)
+
+ val deserializedOffsetAndMetadata = GroupMetadataManager.readOffsetMessageValue(buffer)
+ assertEquals(offsetAndMetadata.offset, deserializedOffsetAndMetadata.offset)
+ assertEquals(offsetAndMetadata.metadata, deserializedOffsetAndMetadata.metadata)
+ assertEquals(offsetAndMetadata.commitTimestamp, deserializedOffsetAndMetadata.commitTimestamp)
+
+ // Serialization drops the leader epoch silently if an older inter-broker protocol is in use
+ val expectedLeaderEpoch = if (expectedOffsetCommitValueVersion >= 3)
+ offsetAndMetadata.leaderEpoch
+ else
+ Optional.empty()
+
+ assertEquals(expectedLeaderEpoch, deserializedOffsetAndMetadata.leaderEpoch)
+ }
+
+ for (version <- ApiVersion.allVersions) {
+ val expectedSchemaVersion = version match {
+ case v if v < KAFKA_2_1_IV0 => 1
+ case v if v < KAFKA_2_1_IV1 => 2
+ case _ => 3
+ }
+ verifySerde(version, expectedSchemaVersion)
+ }
+ }
+
+ @Test
+ def testSerdeOffsetCommitValueWithExpireTimestamp(): Unit = {
+ // If expire timestamp is set, we should always use version 1 of the offset commit
+ // value schema since later versions do not support it
+
+ val offsetAndMetadata = OffsetAndMetadata(
+ offset = 537L,
+ leaderEpoch = Optional.empty(),
+ metadata = "metadata",
+ commitTimestamp = time.milliseconds(),
+ expireTimestamp = Some(time.milliseconds() + 1000))
+
+ def verifySerde(apiVersion: ApiVersion): Unit = {
+ val bytes = GroupMetadataManager.offsetCommitValue(offsetAndMetadata, apiVersion)
+ val buffer = ByteBuffer.wrap(bytes)
+ assertEquals(1, buffer.getShort(0).toInt)
+
+ val deserializedOffsetAndMetadata = GroupMetadataManager.readOffsetMessageValue(buffer)
+ assertEquals(offsetAndMetadata, deserializedOffsetAndMetadata)
+ }
+
+ for (version <- ApiVersion.allVersions)
+ verifySerde(version)
+ }
+
private def appendAndCaptureCallback(): Capture[Map[TopicPartition, PartitionResponse] => Unit] = {
val capturedArgument: Capture[Map[TopicPartition, PartitionResponse] => Unit] = EasyMock.newCapture()
EasyMock.expect(replicaManager.appendRecords(EasyMock.anyLong(),
@@ -1804,14 +1870,15 @@ class GroupMetadataManagerTest {
private def createCommittedOffsetRecords(committedOffsets: Map[TopicPartition, Long],
groupId: String = groupId,
apiVersion: ApiVersion = ApiVersion.latestVersion,
- retentionTime: Option[Long] = None): Seq[SimpleRecord] = {
+ retentionTimeOpt: Option[Long] = None): Seq[SimpleRecord] = {
committedOffsets.map { case (topicPartition, offset) =>
- val offsetAndMetadata = retentionTime match {
- case Some(timestamp) =>
- val commitTimestamp = time.milliseconds()
- OffsetAndMetadata(offset, "", commitTimestamp, commitTimestamp + timestamp)
+ val commitTimestamp = time.milliseconds()
+ val offsetAndMetadata = retentionTimeOpt match {
+ case Some(retentionTimeMs) =>
+ val expirationTime = commitTimestamp + retentionTimeMs
+ OffsetAndMetadata(offset, "", commitTimestamp, expirationTime)
case None =>
- OffsetAndMetadata(offset)
+ OffsetAndMetadata(offset, "", commitTimestamp)
}
val offsetCommitKey = GroupMetadataManager.offsetCommitKey(groupId, topicPartition)
val offsetCommitValue = GroupMetadataManager.offsetCommitValue(offsetAndMetadata, apiVersion)
diff --git a/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataTest.scala b/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataTest.scala
index ac12804..f45a6e2 100644
--- a/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataTest.scala
+++ b/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataTest.scala
@@ -293,7 +293,7 @@ class GroupMetadataTest extends JUnitSuite {
@Test
def testOffsetCommit(): Unit = {
val partition = new TopicPartition("foo", 0)
- val offset = OffsetAndMetadata(37)
+ val offset = offsetAndMetadata(37)
val commitRecordOffset = 3
group.prepareOffsetCommit(Map(partition -> offset))
@@ -308,7 +308,7 @@ class GroupMetadataTest extends JUnitSuite {
@Test
def testOffsetCommitFailure(): Unit = {
val partition = new TopicPartition("foo", 0)
- val offset = OffsetAndMetadata(37)
+ val offset = offsetAndMetadata(37)
group.prepareOffsetCommit(Map(partition -> offset))
assertTrue(group.hasOffsets)
@@ -322,8 +322,8 @@ class GroupMetadataTest extends JUnitSuite {
@Test
def testOffsetCommitFailureWithAnotherPending(): Unit = {
val partition = new TopicPartition("foo", 0)
- val firstOffset = OffsetAndMetadata(37)
- val secondOffset = OffsetAndMetadata(57)
+ val firstOffset = offsetAndMetadata(37)
+ val secondOffset = offsetAndMetadata(57)
group.prepareOffsetCommit(Map(partition -> firstOffset))
assertTrue(group.hasOffsets)
@@ -344,8 +344,8 @@ class GroupMetadataTest extends JUnitSuite {
@Test
def testOffsetCommitWithAnotherPending(): Unit = {
val partition = new TopicPartition("foo", 0)
- val firstOffset = OffsetAndMetadata(37)
- val secondOffset = OffsetAndMetadata(57)
+ val firstOffset = offsetAndMetadata(37)
+ val secondOffset = offsetAndMetadata(57)
group.prepareOffsetCommit(Map(partition -> firstOffset))
assertTrue(group.hasOffsets)
@@ -367,8 +367,8 @@ class GroupMetadataTest extends JUnitSuite {
def testConsumerBeatsTransactionalOffsetCommit(): Unit = {
val partition = new TopicPartition("foo", 0)
val producerId = 13232L
- val txnOffsetCommit = OffsetAndMetadata(37)
- val consumerOffsetCommit = OffsetAndMetadata(57)
+ val txnOffsetCommit = offsetAndMetadata(37)
+ val consumerOffsetCommit = offsetAndMetadata(57)
group.prepareTxnOffsetCommit(producerId, Map(partition -> txnOffsetCommit))
assertTrue(group.hasOffsets)
@@ -392,8 +392,8 @@ class GroupMetadataTest extends JUnitSuite {
def testTransactionBeatsConsumerOffsetCommit(): Unit = {
val partition = new TopicPartition("foo", 0)
val producerId = 13232L
- val txnOffsetCommit = OffsetAndMetadata(37)
- val consumerOffsetCommit = OffsetAndMetadata(57)
+ val txnOffsetCommit = offsetAndMetadata(37)
+ val consumerOffsetCommit = offsetAndMetadata(57)
group.prepareTxnOffsetCommit(producerId, Map(partition -> txnOffsetCommit))
assertTrue(group.hasOffsets)
@@ -419,8 +419,8 @@ class GroupMetadataTest extends JUnitSuite {
def testTransactionalCommitIsAbortedAndConsumerCommitWins(): Unit = {
val partition = new TopicPartition("foo", 0)
val producerId = 13232L
- val txnOffsetCommit = OffsetAndMetadata(37)
- val consumerOffsetCommit = OffsetAndMetadata(57)
+ val txnOffsetCommit = offsetAndMetadata(37)
+ val consumerOffsetCommit = offsetAndMetadata(57)
group.prepareTxnOffsetCommit(producerId, Map(partition -> txnOffsetCommit))
assertTrue(group.hasOffsets)
@@ -447,7 +447,7 @@ class GroupMetadataTest extends JUnitSuite {
def testFailedTxnOffsetCommitLeavesNoPendingState(): Unit = {
val partition = new TopicPartition("foo", 0)
val producerId = 13232L
- val txnOffsetCommit = OffsetAndMetadata(37)
+ val txnOffsetCommit = offsetAndMetadata(37)
group.prepareTxnOffsetCommit(producerId, Map(partition -> txnOffsetCommit))
assertTrue(group.hasPendingOffsetCommitsFromProducer(producerId))
@@ -471,4 +471,9 @@ class GroupMetadataTest extends JUnitSuite {
}
assertTrue(group.is(targetState))
}
+
+ private def offsetAndMetadata(offset: Long): OffsetAndMetadata = {
+ OffsetAndMetadata(offset, "", Time.SYSTEM.milliseconds())
+ }
+
}