You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2018/06/22 00:19:28 UTC
[kafka] branch trunk updated: KAFKA-4682;
Revise expiration semantics of consumer group offsets (KIP-211 -
Part 1) (#4896)
This is an automated email from the ASF dual-hosted git repository.
jgus pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 418a91b KAFKA-4682; Revise expiration semantics of consumer group offsets (KIP-211 - Part 1) (#4896)
418a91b is described below
commit 418a91b5d4e3a0579b91d286f61c2b63c5b4a9b6
Author: Vahid Hashemian <va...@us.ibm.com>
AuthorDate: Thu Jun 21 17:19:24 2018 -0700
KAFKA-4682; Revise expiration semantics of consumer group offsets (KIP-211 - Part 1) (#4896)
This patch contains the improved offset expiration semantics proposed in KIP-211. Committed offsets will not be expired as long as a group is active. Once all members have left the group, then offsets will be expired after the timeout configured by `offsets.retention.minutes`. Note that the optimization for early expiration of unsubscribed topics will be implemented in a separate patch.
---
.../consumer/internals/ConsumerCoordinator.java | 3 +-
.../kafka/common/requests/OffsetCommitRequest.java | 20 +-
.../common/requests/OffsetCommitResponse.java | 4 +-
.../kafka/common/requests/RequestResponseTest.java | 14 +-
core/src/main/scala/kafka/api/ApiVersion.scala | 11 +-
.../kafka/common/OffsetMetadataAndError.scala | 6 +-
.../kafka/coordinator/group/GroupCoordinator.scala | 6 +-
.../kafka/coordinator/group/GroupMetadata.scala | 62 +++-
.../coordinator/group/GroupMetadataManager.scala | 151 ++++++---
core/src/main/scala/kafka/server/KafkaApis.scala | 30 +-
.../main/scala/kafka/tools/ConsoleConsumer.scala | 4 +
.../main/scala/kafka/tools/DumpLogSegments.scala | 4 +-
.../kafka/api/AuthorizerIntegrationTest.scala | 2 +-
.../test/scala/unit/kafka/api/ApiVersionTest.scala | 3 +
.../group/GroupMetadataManagerTest.scala | 358 ++++++++++++++++++---
.../coordinator/group/GroupMetadataTest.scala | 5 +-
.../scala/unit/kafka/server/RequestQuotaTest.scala | 2 +-
docs/upgrade.html | 19 +-
18 files changed, 545 insertions(+), 159 deletions(-)
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
index 9c19af1..b484e11 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
@@ -798,8 +798,7 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
OffsetCommitRequest.Builder builder = new OffsetCommitRequest.Builder(this.groupId, offsetData).
setGenerationId(generation.generationId).
- setMemberId(generation.memberId).
- setRetentionTime(OffsetCommitRequest.DEFAULT_RETENTION_TIME);
+ setMemberId(generation.memberId);
log.trace("Sending OffsetCommit request with {} to coordinator {}", offsets, coordinator);
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java
index 570c4d5..8a51e84 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java
@@ -111,9 +111,15 @@ public class OffsetCommitRequest extends AbstractRequest {
*/
private static final Schema OFFSET_COMMIT_REQUEST_V4 = OFFSET_COMMIT_REQUEST_V3;
+ private static final Schema OFFSET_COMMIT_REQUEST_V5 = new Schema(
+ GROUP_ID,
+ GENERATION_ID,
+ MEMBER_ID,
+ new Field(TOPICS_KEY_NAME, new ArrayOf(OFFSET_COMMIT_REQUEST_TOPIC_V2), "Topics to commit offsets."));
+
public static Schema[] schemaVersions() {
return new Schema[] {OFFSET_COMMIT_REQUEST_V0, OFFSET_COMMIT_REQUEST_V1, OFFSET_COMMIT_REQUEST_V2,
- OFFSET_COMMIT_REQUEST_V3, OFFSET_COMMIT_REQUEST_V4};
+ OFFSET_COMMIT_REQUEST_V3, OFFSET_COMMIT_REQUEST_V4, OFFSET_COMMIT_REQUEST_V5};
}
// default values for the current version
@@ -166,7 +172,6 @@ public class OffsetCommitRequest extends AbstractRequest {
private final Map<TopicPartition, PartitionData> offsetData;
private String memberId = DEFAULT_MEMBER_ID;
private int generationId = DEFAULT_GENERATION_ID;
- private long retentionTime = DEFAULT_RETENTION_TIME;
public Builder(String groupId, Map<TopicPartition, PartitionData> offsetData) {
super(ApiKeys.OFFSET_COMMIT);
@@ -184,11 +189,6 @@ public class OffsetCommitRequest extends AbstractRequest {
return this;
}
- public Builder setRetentionTime(long retentionTime) {
- this.retentionTime = retentionTime;
- return this;
- }
-
@Override
public OffsetCommitRequest build(short version) {
switch (version) {
@@ -199,8 +199,8 @@ public class OffsetCommitRequest extends AbstractRequest {
case 2:
case 3:
case 4:
- long retentionTime = version == 1 ? DEFAULT_RETENTION_TIME : this.retentionTime;
- return new OffsetCommitRequest(groupId, generationId, memberId, retentionTime, offsetData, version);
+ case 5:
+ return new OffsetCommitRequest(groupId, generationId, memberId, DEFAULT_RETENTION_TIME, offsetData, version);
default:
throw new UnsupportedVersionException("Unsupported version " + version);
}
@@ -213,7 +213,6 @@ public class OffsetCommitRequest extends AbstractRequest {
append(", groupId=").append(groupId).
append(", memberId=").append(memberId).
append(", generationId=").append(generationId).
- append(", retentionTime=").append(retentionTime).
append(", offsetData=").append(offsetData).
append(")");
return bld.toString();
@@ -316,6 +315,7 @@ public class OffsetCommitRequest extends AbstractRequest {
return new OffsetCommitResponse(responseData);
case 3:
case 4:
+ case 5:
return new OffsetCommitResponse(throttleTimeMs, responseData);
default:
throw new IllegalArgumentException(String.format("Version %d is not valid. Valid versions for %s are 0 to %d",
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitResponse.java
index 0b0b228..c79bc57 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitResponse.java
@@ -85,9 +85,11 @@ public class OffsetCommitResponse extends AbstractResponse {
*/
private static final Schema OFFSET_COMMIT_RESPONSE_V4 = OFFSET_COMMIT_RESPONSE_V3;
+ private static final Schema OFFSET_COMMIT_RESPONSE_V5 = OFFSET_COMMIT_RESPONSE_V4;
+
public static Schema[] schemaVersions() {
return new Schema[] {OFFSET_COMMIT_RESPONSE_V0, OFFSET_COMMIT_RESPONSE_V1, OFFSET_COMMIT_RESPONSE_V2,
- OFFSET_COMMIT_RESPONSE_V3, OFFSET_COMMIT_RESPONSE_V4};
+ OFFSET_COMMIT_RESPONSE_V3, OFFSET_COMMIT_RESPONSE_V4, OFFSET_COMMIT_RESPONSE_V5};
}
private final Map<TopicPartition, Errors> responseData;
diff --git a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
index 6e705d2..da61398 100644
--- a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
@@ -141,9 +141,6 @@ public class RequestResponseTest {
checkErrorResponse(createMetadataRequest(3, singletonList("topic1")), new UnknownServerException());
checkResponse(createMetadataResponse(), 4);
checkErrorResponse(createMetadataRequest(4, singletonList("topic1")), new UnknownServerException());
- checkRequest(createOffsetCommitRequest(2));
- checkErrorResponse(createOffsetCommitRequest(2), new UnknownServerException());
- checkResponse(createOffsetCommitResponse(), 0);
checkRequest(OffsetFetchRequest.forAllPartitions("group1"));
checkErrorResponse(OffsetFetchRequest.forAllPartitions("group1"), new NotCoordinatorException("Not Coordinator"));
checkRequest(createOffsetFetchRequest(0));
@@ -210,6 +207,16 @@ public class RequestResponseTest {
checkErrorResponse(createOffsetCommitRequest(0), new UnknownServerException());
checkRequest(createOffsetCommitRequest(1));
checkErrorResponse(createOffsetCommitRequest(1), new UnknownServerException());
+ checkRequest(createOffsetCommitRequest(2));
+ checkErrorResponse(createOffsetCommitRequest(2), new UnknownServerException());
+ checkRequest(createOffsetCommitRequest(3));
+ checkErrorResponse(createOffsetCommitRequest(3), new UnknownServerException());
+ checkRequest(createOffsetCommitRequest(4));
+ checkErrorResponse(createOffsetCommitRequest(4), new UnknownServerException());
+ checkResponse(createOffsetCommitResponse(), 4);
+ checkRequest(createOffsetCommitRequest(5));
+ checkErrorResponse(createOffsetCommitRequest(5), new UnknownServerException());
+ checkResponse(createOffsetCommitResponse(), 5);
checkRequest(createJoinGroupRequest(0));
checkRequest(createUpdateMetadataRequest(0, null));
checkErrorResponse(createUpdateMetadataRequest(0, null), new UnknownServerException());
@@ -817,7 +824,6 @@ public class RequestResponseTest {
return new OffsetCommitRequest.Builder("group1", commitData)
.setGenerationId(100)
.setMemberId("consumer1")
- .setRetentionTime(1000000)
.build((short) version);
}
diff --git a/core/src/main/scala/kafka/api/ApiVersion.scala b/core/src/main/scala/kafka/api/ApiVersion.scala
index 9ed6432..485f2bd 100644
--- a/core/src/main/scala/kafka/api/ApiVersion.scala
+++ b/core/src/main/scala/kafka/api/ApiVersion.scala
@@ -73,7 +73,9 @@ object ApiVersion {
// Introduced OffsetsForLeaderEpochRequest V1 via KIP-279
KAFKA_2_0_IV0,
// Introduced ApiVersionsRequest V2 via KIP-219
- KAFKA_2_0_IV1
+ KAFKA_2_0_IV1,
+ // Introduced new schemas for group offset (v2) and group metadata (v2) (KIP-211)
+ KAFKA_2_1_IV0
)
// Map keys are the union of the short and full versions
@@ -248,4 +250,11 @@ case object KAFKA_2_0_IV1 extends DefaultApiVersion {
val subVersion = "IV1"
val recordVersion = RecordVersion.V2
val id: Int = 16
+}
+
+case object KAFKA_2_1_IV0 extends DefaultApiVersion {
+ val shortVersion: String = "2.1"
+ val subVersion = "IV0"
+ val recordVersion = RecordVersion.V2
+ val id: Int = 18
}
\ No newline at end of file
diff --git a/core/src/main/scala/kafka/common/OffsetMetadataAndError.scala b/core/src/main/scala/kafka/common/OffsetMetadataAndError.scala
index 2cf9bb4..afe542c 100644
--- a/core/src/main/scala/kafka/common/OffsetMetadataAndError.scala
+++ b/core/src/main/scala/kafka/common/OffsetMetadataAndError.scala
@@ -34,17 +34,17 @@ object OffsetMetadata {
case class OffsetAndMetadata(offsetMetadata: OffsetMetadata,
commitTimestamp: Long = org.apache.kafka.common.requests.OffsetCommitRequest.DEFAULT_TIMESTAMP,
- expireTimestamp: Long = org.apache.kafka.common.requests.OffsetCommitRequest.DEFAULT_TIMESTAMP) {
+ expireTimestamp: Option[Long] = None) {
def offset = offsetMetadata.offset
def metadata = offsetMetadata.metadata
- override def toString = "[%s,CommitTime %d,ExpirationTime %d]".format(offsetMetadata, commitTimestamp, expireTimestamp)
+ override def toString = s"[$offsetMetadata,CommitTime $commitTimestamp,ExpirationTime ${expireTimestamp.getOrElse("_")}]"
}
object OffsetAndMetadata {
- def apply(offset: Long, metadata: String, commitTimestamp: Long, expireTimestamp: Long) = new OffsetAndMetadata(OffsetMetadata(offset, metadata), commitTimestamp, expireTimestamp)
+ def apply(offset: Long, metadata: String, commitTimestamp: Long, expireTimestamp: Long) = new OffsetAndMetadata(OffsetMetadata(offset, metadata), commitTimestamp, Some(expireTimestamp))
def apply(offset: Long, metadata: String, timestamp: Long) = new OffsetAndMetadata(OffsetMetadata(offset, metadata), timestamp)
diff --git a/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala b/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
index 9748e17..2cedacd 100644
--- a/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
+++ b/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
@@ -125,7 +125,7 @@ class GroupCoordinator(val brokerId: Int,
if (memberId != JoinGroupRequest.UNKNOWN_MEMBER_ID) {
responseCallback(joinError(memberId, Errors.UNKNOWN_MEMBER_ID))
} else {
- val group = groupManager.addGroup(new GroupMetadata(groupId, initialState = Empty))
+ val group = groupManager.addGroup(new GroupMetadata(groupId, Empty, time))
doJoinGroup(group, memberId, clientId, clientHost, rebalanceTimeoutMs, sessionTimeoutMs, protocolType, protocols, responseCallback)
}
@@ -451,7 +451,7 @@ class GroupCoordinator(val brokerId: Int,
case Some(error) => responseCallback(offsetMetadata.mapValues(_ => error))
case None =>
val group = groupManager.getGroup(groupId).getOrElse {
- groupManager.addGroup(new GroupMetadata(groupId, initialState = Empty))
+ groupManager.addGroup(new GroupMetadata(groupId, Empty, time))
}
doCommitOffsets(group, NoMemberId, NoGeneration, producerId, producerEpoch, offsetMetadata, responseCallback)
}
@@ -469,7 +469,7 @@ class GroupCoordinator(val brokerId: Int,
case None =>
if (generationId < 0) {
// the group is not relying on Kafka for group management, so allow the commit
- val group = groupManager.addGroup(new GroupMetadata(groupId, initialState = Empty))
+ val group = groupManager.addGroup(new GroupMetadata(groupId, Empty, time))
doCommitOffsets(group, memberId, generationId, NO_PRODUCER_ID, NO_PRODUCER_EPOCH,
offsetMetadata, responseCallback)
} else {
diff --git a/core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala b/core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala
index 2b9c91f..d729449 100644
--- a/core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala
+++ b/core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala
@@ -22,6 +22,7 @@ import java.util.concurrent.locks.ReentrantLock
import kafka.common.OffsetAndMetadata
import kafka.utils.{CoreUtils, Logging, nonthreadsafe}
import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.utils.Time
import scala.collection.{Seq, immutable, mutable}
@@ -118,12 +119,15 @@ private object GroupMetadata {
protocolType: String,
protocol: String,
leaderId: String,
- members: Iterable[MemberMetadata]): GroupMetadata = {
- val group = new GroupMetadata(groupId, initialState)
+ currentStateTimestamp: Option[Long],
+ members: Iterable[MemberMetadata],
+ time: Time): GroupMetadata = {
+ val group = new GroupMetadata(groupId, initialState, time)
group.generationId = generationId
group.protocolType = if (protocolType == null || protocolType.isEmpty) None else Some(protocolType)
group.protocol = Option(protocol)
group.leaderId = Option(leaderId)
+ group.currentStateTimestamp = currentStateTimestamp
members.foreach(group.add)
group
}
@@ -167,10 +171,11 @@ case class CommitRecordMetadataAndOffset(appendedBatchOffset: Option[Long], offs
* 3. leader id
*/
@nonthreadsafe
-private[group] class GroupMetadata(val groupId: String, initialState: GroupState) extends Logging {
+private[group] class GroupMetadata(val groupId: String, initialState: GroupState, time: Time) extends Logging {
private[group] val lock = new ReentrantLock
private var state: GroupState = initialState
+ var currentStateTimestamp: Option[Long] = Some(time.milliseconds())
var protocolType: Option[String] = None
var generationId = 0
private var leaderId: Option[String] = None
@@ -195,6 +200,7 @@ private[group] class GroupMetadata(val groupId: String, initialState: GroupState
def isLeader(memberId: String): Boolean = leaderId.contains(memberId)
def leaderOrNull: String = leaderId.orNull
def protocolOrNull: String = protocol.orNull
+ def currentStateTimestampOrDefault: Long = currentStateTimestamp.getOrElse(-1)
def add(member: MemberMetadata) {
if (members.isEmpty)
@@ -240,6 +246,7 @@ private[group] class GroupMetadata(val groupId: String, initialState: GroupState
def transitionTo(groupState: GroupState) {
assertValidTransition(groupState)
state = groupState
+ currentStateTimestamp = Some(time.milliseconds())
}
def selectProtocol: String = {
@@ -434,18 +441,51 @@ private[group] class GroupMetadata(val groupId: String, initialState: GroupState
}.toMap
}
- def removeExpiredOffsets(startMs: Long) : Map[TopicPartition, OffsetAndMetadata] = {
- val expiredOffsets = offsets
- .filter {
+ def removeExpiredOffsets(currentTimestamp: Long, offsetRetentionMs: Long) : Map[TopicPartition, OffsetAndMetadata] = {
+
+ def getExpiredOffsets(baseTimestamp: CommitRecordMetadataAndOffset => Long): Map[TopicPartition, OffsetAndMetadata] = {
+ offsets.filter {
case (topicPartition, commitRecordMetadataAndOffset) =>
- commitRecordMetadataAndOffset.offsetAndMetadata.expireTimestamp < startMs && !pendingOffsetCommits.contains(topicPartition)
- }
- .map {
+ !pendingOffsetCommits.contains(topicPartition) && {
+ commitRecordMetadataAndOffset.offsetAndMetadata.expireTimestamp match {
+ case None =>
+ // current version with no per partition retention
+ currentTimestamp - baseTimestamp(commitRecordMetadataAndOffset) >= offsetRetentionMs
+ case Some(expireTimestamp) =>
+ // older versions with explicit expire_timestamp field => old expiration semantics is used
+ currentTimestamp >= expireTimestamp
+ }
+ }
+ }.map {
case (topicPartition, commitRecordOffsetAndMetadata) =>
(topicPartition, commitRecordOffsetAndMetadata.offsetAndMetadata)
- }
+ }.toMap
+ }
+
+ val expiredOffsets: Map[TopicPartition, OffsetAndMetadata] = protocolType match {
+ case Some(_) if is(Empty) =>
+ // no consumer exists in the group =>
+ // - if current state timestamp exists and retention period has passed since group became Empty,
+ // expire all offsets with no pending offset commit;
+ // - if there is no current state timestamp (old group metadata schema) and retention period has passed
+ // since the last commit timestamp, expire the offset
+ getExpiredOffsets(commitRecordMetadataAndOffset =>
+ currentStateTimestamp.getOrElse(commitRecordMetadataAndOffset.offsetAndMetadata.commitTimestamp))
+
+ case None =>
+ // protocolType is None => standalone (simple) consumer, that uses Kafka for offset storage only
+ // expire offsets with no pending offset commit that retention period has passed since their last commit
+ getExpiredOffsets(_.offsetAndMetadata.commitTimestamp)
+
+ case _ =>
+ Map()
+ }
+
+ if (expiredOffsets.nonEmpty)
+ debug(s"Expired offsets from group '$groupId': ${expiredOffsets.keySet}")
+
offsets --= expiredOffsets.keySet
- expiredOffsets.toMap
+ expiredOffsets
}
def allOffsets = offsets.map { case (topicPartition, commitRecordMetadataAndOffset) =>
diff --git a/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala b/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
index 02ba13a..6bd0a5a 100644
--- a/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
+++ b/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
@@ -25,7 +25,7 @@ import java.util.concurrent.atomic.AtomicBoolean
import java.util.concurrent.locks.ReentrantLock
import com.yammer.metrics.core.Gauge
-import kafka.api.{ApiVersion, KAFKA_0_10_1_IV0}
+import kafka.api.{ApiVersion, KAFKA_0_10_1_IV0, KAFKA_2_1_IV0}
import kafka.common.{MessageFormatter, OffsetAndMetadata}
import kafka.metrics.KafkaMetricsGroup
import kafka.server.ReplicaManager
@@ -197,18 +197,11 @@ class GroupMetadataManager(brokerId: Int,
responseCallback: Errors => Unit): Unit = {
getMagic(partitionFor(group.groupId)) match {
case Some(magicValue) =>
- val groupMetadataValueVersion = {
- if (interBrokerProtocolVersion < KAFKA_0_10_1_IV0)
- 0.toShort
- else
- GroupMetadataManager.CURRENT_GROUP_VALUE_SCHEMA_VERSION
- }
-
// We always use CREATE_TIME, like the producer. The conversion to LOG_APPEND_TIME (if necessary) happens automatically.
val timestampType = TimestampType.CREATE_TIME
val timestamp = time.milliseconds()
val key = GroupMetadataManager.groupMetadataKey(group.groupId)
- val value = GroupMetadataManager.groupMetadataValue(group, groupAssignment, version = groupMetadataValueVersion)
+ val value = GroupMetadataManager.groupMetadataValue(group, groupAssignment, interBrokerProtocolVersion)
val records = {
val buffer = ByteBuffer.allocate(AbstractRecords.estimateSizeInBytes(magicValue, compressionType,
@@ -330,7 +323,7 @@ class GroupMetadataManager(brokerId: Int,
val records = filteredOffsetMetadata.map { case (topicPartition, offsetAndMetadata) =>
val key = GroupMetadataManager.offsetCommitKey(group.groupId, topicPartition)
- val value = GroupMetadataManager.offsetCommitValue(offsetAndMetadata)
+ val value = GroupMetadataManager.offsetCommitValue(offsetAndMetadata, interBrokerProtocolVersion)
new SimpleRecord(timestamp, key, value)
}
val offsetTopicPartition = new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, partitionFor(group.groupId))
@@ -580,7 +573,7 @@ class GroupMetadataManager(brokerId: Int,
case groupMetadataKey: GroupMetadataKey =>
// load group metadata
val groupId = groupMetadataKey.key
- val groupMetadata = GroupMetadataManager.readGroupMessageValue(groupId, record.value)
+ val groupMetadata = GroupMetadataManager.readGroupMessageValue(groupId, record.value, time)
if (groupMetadata != null) {
removedGroups.remove(groupId)
loadedGroups.put(groupId, groupMetadata)
@@ -630,7 +623,7 @@ class GroupMetadataManager(brokerId: Int,
// load groups which store offsets in kafka, but which have no active members and thus no group
// metadata stored in the log
(emptyGroupOffsets.keySet ++ pendingEmptyGroupOffsets.keySet).foreach { groupId =>
- val group = new GroupMetadata(groupId, initialState = Empty)
+ val group = new GroupMetadata(groupId, Empty, time)
val offsets = emptyGroupOffsets.getOrElse(groupId, Map.empty[TopicPartition, CommitRecordMetadataAndOffset])
val pendingOffsets = pendingEmptyGroupOffsets.getOrElse(groupId, Map.empty[Long, mutable.Map[TopicPartition, CommitRecordMetadataAndOffset]])
debug(s"Loaded group metadata $group with offsets $offsets and pending offsets $pendingOffsets")
@@ -653,18 +646,8 @@ class GroupMetadataManager(brokerId: Int,
pendingTransactionalOffsets: Map[Long, mutable.Map[TopicPartition, CommitRecordMetadataAndOffset]]): Unit = {
// offsets are initialized prior to loading the group into the cache to ensure that clients see a consistent
// view of the group's offsets
- val loadedOffsets = offsets.mapValues { case CommitRecordMetadataAndOffset(commitRecordOffset, offsetAndMetadata) =>
- // special handling for version 0:
- // set the expiration time stamp as commit time stamp + server default retention time
- val updatedOffsetAndMetadata =
- if (offsetAndMetadata.expireTimestamp == org.apache.kafka.common.requests.OffsetCommitRequest.DEFAULT_TIMESTAMP)
- offsetAndMetadata.copy(expireTimestamp = offsetAndMetadata.commitTimestamp + config.offsetsRetentionMs)
- else
- offsetAndMetadata
- CommitRecordMetadataAndOffset(commitRecordOffset, updatedOffsetAndMetadata)
- }
- trace(s"Initialized offsets $loadedOffsets for group ${group.groupId}")
- group.initializeOffsets(loadedOffsets, pendingTransactionalOffsets.toMap)
+ trace(s"Initialized offsets $offsets for group ${group.groupId}")
+ group.initializeOffsets(offsets, pendingTransactionalOffsets.toMap)
val currentGroup = addGroup(group)
if (group != currentGroup)
@@ -711,11 +694,11 @@ class GroupMetadataManager(brokerId: Int,
// visible for testing
private[group] def cleanupGroupMetadata(): Unit = {
- val startMs = time.milliseconds()
- val offsetsRemoved = cleanupGroupMetadata(groupMetadataCache.values, group => {
- group.removeExpiredOffsets(time.milliseconds())
+ val currentTimestamp = time.milliseconds()
+ val numOffsetsRemoved = cleanupGroupMetadata(groupMetadataCache.values, group => {
+ group.removeExpiredOffsets(currentTimestamp, config.offsetsRetentionMs)
})
- info(s"Removed $offsetsRemoved expired offsets in ${time.milliseconds() - startMs} milliseconds.")
+ info(s"Removed $numOffsetsRemoved expired offsets in ${time.milliseconds() - currentTimestamp} milliseconds.")
}
/**
@@ -917,7 +900,7 @@ class GroupMetadataManager(brokerId: Int,
* -> value version 1: [offset, metadata, commit_timestamp, expire_timestamp]
*
* key version 2: group metadata
- * -> value version 0: [protocol_type, generation, protocol, leader, members]
+ * -> value version 0: [protocol_type, generation, protocol, leader, members]
*/
object GroupMetadataManager {
@@ -947,6 +930,13 @@ object GroupMetadataManager {
private val OFFSET_VALUE_COMMIT_TIMESTAMP_FIELD_V1 = OFFSET_COMMIT_VALUE_SCHEMA_V1.get("commit_timestamp")
private val OFFSET_VALUE_EXPIRE_TIMESTAMP_FIELD_V1 = OFFSET_COMMIT_VALUE_SCHEMA_V1.get("expire_timestamp")
+ private val OFFSET_COMMIT_VALUE_SCHEMA_V2 = new Schema(new Field("offset", INT64),
+ new Field("metadata", STRING, "Associated metadata.", ""),
+ new Field("commit_timestamp", INT64))
+ private val OFFSET_VALUE_OFFSET_FIELD_V2 = OFFSET_COMMIT_VALUE_SCHEMA_V2.get("offset")
+ private val OFFSET_VALUE_METADATA_FIELD_V2 = OFFSET_COMMIT_VALUE_SCHEMA_V2.get("metadata")
+ private val OFFSET_VALUE_COMMIT_TIMESTAMP_FIELD_V2 = OFFSET_COMMIT_VALUE_SCHEMA_V2.get("commit_timestamp")
+
private val GROUP_METADATA_KEY_SCHEMA = new Schema(new Field("group", STRING))
private val GROUP_KEY_GROUP_FIELD = GROUP_METADATA_KEY_SCHEMA.get("group")
@@ -975,10 +965,13 @@ object GroupMetadataManager {
new Field(SUBSCRIPTION_KEY, BYTES),
new Field(ASSIGNMENT_KEY, BYTES))
+ private val MEMBER_METADATA_V2 = MEMBER_METADATA_V1
+
private val PROTOCOL_TYPE_KEY = "protocol_type"
private val GENERATION_KEY = "generation"
private val PROTOCOL_KEY = "protocol"
private val LEADER_KEY = "leader"
+ private val CURRENT_STATE_TIMESTAMP_KEY = "current_state_timestamp"
private val MEMBERS_KEY = "members"
private val GROUP_METADATA_VALUE_SCHEMA_V0 = new Schema(
@@ -995,6 +988,14 @@ object GroupMetadataManager {
new Field(LEADER_KEY, NULLABLE_STRING),
new Field(MEMBERS_KEY, new ArrayOf(MEMBER_METADATA_V1)))
+ private val GROUP_METADATA_VALUE_SCHEMA_V2 = new Schema(
+ new Field(PROTOCOL_TYPE_KEY, STRING),
+ new Field(GENERATION_KEY, INT32),
+ new Field(PROTOCOL_KEY, NULLABLE_STRING),
+ new Field(LEADER_KEY, NULLABLE_STRING),
+ new Field(CURRENT_STATE_TIMESTAMP_KEY, INT64),
+ new Field(MEMBERS_KEY, new ArrayOf(MEMBER_METADATA_V2)))
+
// map of versions to key schemas as data types
private val MESSAGE_TYPE_SCHEMAS = Map(
@@ -1005,19 +1006,20 @@ object GroupMetadataManager {
// map of version of offset value schemas
private val OFFSET_VALUE_SCHEMAS = Map(
0 -> OFFSET_COMMIT_VALUE_SCHEMA_V0,
- 1 -> OFFSET_COMMIT_VALUE_SCHEMA_V1)
- private val CURRENT_OFFSET_VALUE_SCHEMA_VERSION = 1.toShort
+ 1 -> OFFSET_COMMIT_VALUE_SCHEMA_V1,
+ 2 -> OFFSET_COMMIT_VALUE_SCHEMA_V2)
// map of version of group metadata value schemas
private val GROUP_VALUE_SCHEMAS = Map(
0 -> GROUP_METADATA_VALUE_SCHEMA_V0,
- 1 -> GROUP_METADATA_VALUE_SCHEMA_V1)
- private val CURRENT_GROUP_VALUE_SCHEMA_VERSION = 1.toShort
+ 1 -> GROUP_METADATA_VALUE_SCHEMA_V1,
+ 2 -> GROUP_METADATA_VALUE_SCHEMA_V2)
+ private val CURRENT_GROUP_VALUE_SCHEMA_VERSION = 2.toShort
private val CURRENT_OFFSET_KEY_SCHEMA = schemaForKey(CURRENT_OFFSET_KEY_SCHEMA_VERSION)
private val CURRENT_GROUP_KEY_SCHEMA = schemaForKey(CURRENT_GROUP_KEY_SCHEMA_VERSION)
- private val CURRENT_OFFSET_VALUE_SCHEMA = schemaForOffset(CURRENT_OFFSET_VALUE_SCHEMA_VERSION)
+ private val CURRENT_OFFSET_VALUE_SCHEMA = schemaForOffset(2)
private val CURRENT_GROUP_VALUE_SCHEMA = schemaForGroup(CURRENT_GROUP_VALUE_SCHEMA_VERSION)
private def schemaForKey(version: Int) = {
@@ -1081,17 +1083,34 @@ object GroupMetadataManager {
* Generates the payload for offset commit message from given offset and metadata
*
* @param offsetAndMetadata consumer's current offset and metadata
+ * @param apiVersion the api version
* @return payload for offset commit message
*/
- private[group] def offsetCommitValue(offsetAndMetadata: OffsetAndMetadata): Array[Byte] = {
- // generate commit value with schema version 1
- val value = new Struct(CURRENT_OFFSET_VALUE_SCHEMA)
- value.set(OFFSET_VALUE_OFFSET_FIELD_V1, offsetAndMetadata.offset)
- value.set(OFFSET_VALUE_METADATA_FIELD_V1, offsetAndMetadata.metadata)
- value.set(OFFSET_VALUE_COMMIT_TIMESTAMP_FIELD_V1, offsetAndMetadata.commitTimestamp)
- value.set(OFFSET_VALUE_EXPIRE_TIMESTAMP_FIELD_V1, offsetAndMetadata.expireTimestamp)
+ private[group] def offsetCommitValue(offsetAndMetadata: OffsetAndMetadata,
+ apiVersion: ApiVersion): Array[Byte] = {
+ // generate commit value according to schema version
+ val (version, value) = {
+ if (apiVersion < KAFKA_2_1_IV0 || offsetAndMetadata.expireTimestamp.nonEmpty)
+ // if an older version of the API is used, or if an explicit expiration is provided, use the older schema
+ (1.toShort, new Struct(OFFSET_COMMIT_VALUE_SCHEMA_V1))
+ else
+ (2.toShort, new Struct(OFFSET_COMMIT_VALUE_SCHEMA_V2))
+ }
+
+ if (version == 2) {
+ value.set(OFFSET_VALUE_OFFSET_FIELD_V2, offsetAndMetadata.offset)
+ value.set(OFFSET_VALUE_METADATA_FIELD_V2, offsetAndMetadata.metadata)
+ value.set(OFFSET_VALUE_COMMIT_TIMESTAMP_FIELD_V2, offsetAndMetadata.commitTimestamp)
+ } else {
+ value.set(OFFSET_VALUE_OFFSET_FIELD_V1, offsetAndMetadata.offset)
+ value.set(OFFSET_VALUE_METADATA_FIELD_V1, offsetAndMetadata.metadata)
+ value.set(OFFSET_VALUE_COMMIT_TIMESTAMP_FIELD_V1, offsetAndMetadata.commitTimestamp)
+ // version 1 has a non empty expireTimestamp field
+ value.set(OFFSET_VALUE_EXPIRE_TIMESTAMP_FIELD_V1, offsetAndMetadata.expireTimestamp.get)
+ }
+
val byteBuffer = ByteBuffer.allocate(2 /* version */ + value.sizeOf)
- byteBuffer.putShort(CURRENT_OFFSET_VALUE_SCHEMA_VERSION)
+ byteBuffer.putShort(version)
value.writeTo(byteBuffer)
byteBuffer.array()
}
@@ -1102,19 +1121,30 @@ object GroupMetadataManager {
*
* @param groupMetadata current group metadata
* @param assignment the assignment for the rebalancing generation
- * @param version the version of the value message to use
+ * @param apiVersion the api version
* @return payload for offset commit message
*/
private[group] def groupMetadataValue(groupMetadata: GroupMetadata,
assignment: Map[String, Array[Byte]],
- version: Short = 0): Array[Byte] = {
- val value = if (version == 0) new Struct(GROUP_METADATA_VALUE_SCHEMA_V0) else new Struct(CURRENT_GROUP_VALUE_SCHEMA)
+ apiVersion: ApiVersion): Array[Byte] = {
+
+ val (version, value) = {
+ if (apiVersion < KAFKA_0_10_1_IV0)
+ (0.toShort, new Struct(GROUP_METADATA_VALUE_SCHEMA_V0))
+ else if (apiVersion < KAFKA_2_1_IV0)
+ (1.toShort, new Struct(GROUP_METADATA_VALUE_SCHEMA_V1))
+ else
+ (2.toShort, new Struct(CURRENT_GROUP_VALUE_SCHEMA))
+ }
value.set(PROTOCOL_TYPE_KEY, groupMetadata.protocolType.getOrElse(""))
value.set(GENERATION_KEY, groupMetadata.generationId)
value.set(PROTOCOL_KEY, groupMetadata.protocolOrNull)
value.set(LEADER_KEY, groupMetadata.leaderOrNull)
+ if (version >= 2)
+ value.set(CURRENT_STATE_TIMESTAMP_KEY, groupMetadata.currentStateTimestampOrDefault)
+
val memberArray = groupMetadata.allMemberMetadata.map { memberMetadata =>
val memberStruct = value.instance(MEMBERS_KEY)
memberStruct.set(MEMBER_ID_KEY, memberMetadata.memberId)
@@ -1174,7 +1204,7 @@ object GroupMetadataManager {
GroupMetadataKey(version, group)
} else {
- throw new IllegalStateException("Unknown version " + version + " for group metadata message")
+ throw new IllegalStateException(s"Unknown group metadata message version: $version")
}
}
@@ -1205,8 +1235,14 @@ object GroupMetadataManager {
val expireTimestamp = value.get(OFFSET_VALUE_EXPIRE_TIMESTAMP_FIELD_V1).asInstanceOf[Long]
OffsetAndMetadata(offset, metadata, commitTimestamp, expireTimestamp)
+ } else if (version == 2) {
+ val offset = value.get(OFFSET_VALUE_OFFSET_FIELD_V2).asInstanceOf[Long]
+ val metadata = value.get(OFFSET_VALUE_METADATA_FIELD_V2).asInstanceOf[String]
+ val commitTimestamp = value.get(OFFSET_VALUE_COMMIT_TIMESTAMP_FIELD_V2).asInstanceOf[Long]
+
+ OffsetAndMetadata(offset, metadata, commitTimestamp)
} else {
- throw new IllegalStateException("Unknown offset message version")
+ throw new IllegalStateException(s"Unknown offset message version: $version")
}
}
}
@@ -1215,9 +1251,10 @@ object GroupMetadataManager {
* Decodes the group metadata messages' payload and retrieves its member metadata from it
*
* @param buffer input byte-buffer
+ * @param time the time instance to use
* @return a group metadata object from the message
*/
- def readGroupMessageValue(groupId: String, buffer: ByteBuffer): GroupMetadata = {
+ def readGroupMessageValue(groupId: String, buffer: ByteBuffer, time: Time): GroupMetadata = {
if (buffer == null) { // tombstone
null
} else {
@@ -1225,13 +1262,23 @@ object GroupMetadataManager {
val valueSchema = schemaForGroup(version)
val value = valueSchema.read(buffer)
- if (version == 0 || version == 1) {
+ if (version >= 0 && version <= 2) {
val generationId = value.get(GENERATION_KEY).asInstanceOf[Int]
val protocolType = value.get(PROTOCOL_TYPE_KEY).asInstanceOf[String]
val protocol = value.get(PROTOCOL_KEY).asInstanceOf[String]
val leaderId = value.get(LEADER_KEY).asInstanceOf[String]
val memberMetadataArray = value.getArray(MEMBERS_KEY)
val initialState = if (memberMetadataArray.isEmpty) Empty else Stable
+ val currentStateTimestamp: Option[Long] = version match {
+ case version if version == 2 =>
+ if (value.hasField(CURRENT_STATE_TIMESTAMP_KEY)) {
+ val timestamp = value.getLong(CURRENT_STATE_TIMESTAMP_KEY)
+ if (timestamp == -1) None else Some(timestamp)
+ } else
+ None
+ case _ =>
+ None
+ }
val members = memberMetadataArray.map { memberMetadataObj =>
val memberMetadata = memberMetadataObj.asInstanceOf[Struct]
@@ -1247,9 +1294,9 @@ object GroupMetadataManager {
member.assignment = Utils.toArray(memberMetadata.get(ASSIGNMENT_KEY).asInstanceOf[ByteBuffer])
member
}
- GroupMetadata.loadGroup(groupId, initialState, generationId, protocolType, protocol, leaderId, members)
+ GroupMetadata.loadGroup(groupId, initialState, generationId, protocolType, protocol, leaderId, currentStateTimestamp, members, time)
} else {
- throw new IllegalStateException("Unknown group metadata message version")
+ throw new IllegalStateException(s"Unknown group metadata message version: $version")
}
}
}
@@ -1287,7 +1334,7 @@ object GroupMetadataManager {
val value = consumerRecord.value
val formattedValue =
if (value == null) "NULL"
- else GroupMetadataManager.readGroupMessageValue(groupId, ByteBuffer.wrap(value)).toString
+ else GroupMetadataManager.readGroupMessageValue(groupId, ByteBuffer.wrap(value), Time.SYSTEM).toString
output.write(groupId.getBytes(StandardCharsets.UTF_8))
output.write("::".getBytes(StandardCharsets.UTF_8))
output.write(formattedValue.getBytes(StandardCharsets.UTF_8))
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index 0c88be9..de9e0bf 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -332,33 +332,25 @@ class KafkaApis(val requestChannel: RequestChannel,
} else {
// for version 1 and beyond store offsets in offset manager
- // compute the retention time based on the request version:
- // if it is v1 or not specified by user, we can use the default retention
- val offsetRetention =
- if (header.apiVersion <= 1 ||
- offsetCommitRequest.retentionTime == OffsetCommitRequest.DEFAULT_RETENTION_TIME)
- groupCoordinator.offsetConfig.offsetsRetentionMs
- else
- offsetCommitRequest.retentionTime
-
// commit timestamp is always set to now.
// "default" expiration timestamp is now + retention (and retention may be overridden if v2)
// expire timestamp is computed differently for v1 and v2.
- // - If v1 and no explicit commit timestamp is provided we use default expiration timestamp.
+ // - If v1 and no explicit commit timestamp is provided we treat it the same as v5.
// - If v1 and explicit commit timestamp is provided we calculate retention from that explicit commit timestamp
- // - If v2 we use the default expiration timestamp
+ // - If v2/v3/v4 (no explicit commit timestamp) we treat it the same as v5.
+ // - For v5 and beyond there is no per partition expiration timestamp, so this field is no longer in effect
val currentTimestamp = time.milliseconds
- val defaultExpireTimestamp = offsetRetention + currentTimestamp
val partitionData = authorizedTopicRequestInfo.mapValues { partitionData =>
val metadata = if (partitionData.metadata == null) OffsetMetadata.NoMetadata else partitionData.metadata
new OffsetAndMetadata(
offsetMetadata = OffsetMetadata(partitionData.offset, metadata),
- commitTimestamp = currentTimestamp,
- expireTimestamp = {
- if (partitionData.timestamp == OffsetCommitRequest.DEFAULT_TIMESTAMP)
- defaultExpireTimestamp
- else
- offsetRetention + partitionData.timestamp
+ commitTimestamp = partitionData.timestamp match {
+ case OffsetCommitRequest.DEFAULT_TIMESTAMP => currentTimestamp
+ case customTimestamp => customTimestamp
+ },
+ expireTimestamp = offsetCommitRequest.retentionTime match {
+ case OffsetCommitRequest.DEFAULT_RETENTION_TIME => None
+ case retentionTime => Some(currentTimestamp + retentionTime)
}
)
}
@@ -1912,7 +1904,7 @@ class KafkaApis(val requestChannel: RequestChannel,
topicPartition -> new OffsetAndMetadata(
offsetMetadata = OffsetMetadata(partitionData.offset, metadata),
commitTimestamp = currentTimestamp,
- expireTimestamp = defaultExpireTimestamp)
+ expireTimestamp = Some(defaultExpireTimestamp))
}
}
diff --git a/core/src/main/scala/kafka/tools/ConsoleConsumer.scala b/core/src/main/scala/kafka/tools/ConsoleConsumer.scala
index c55f6c4..7e2c564 100755
--- a/core/src/main/scala/kafka/tools/ConsoleConsumer.scala
+++ b/core/src/main/scala/kafka/tools/ConsoleConsumer.scala
@@ -368,6 +368,10 @@ object ConsoleConsumer extends Logging {
consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, group)
case None =>
consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, s"console-consumer-${new Random().nextInt(100000)}")
+ // By default, avoid unnecessary expansion of the coordinator cache since
+ // the auto-generated group and its offsets is not intended to be used again
+ if (!consumerProps.containsKey(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG))
+ consumerProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false")
groupIdPassed = false
}
diff --git a/core/src/main/scala/kafka/tools/DumpLogSegments.scala b/core/src/main/scala/kafka/tools/DumpLogSegments.scala
index 17fbd8f..1792c7b 100755
--- a/core/src/main/scala/kafka/tools/DumpLogSegments.scala
+++ b/core/src/main/scala/kafka/tools/DumpLogSegments.scala
@@ -29,7 +29,7 @@ import kafka.utils._
import org.apache.kafka.clients.consumer.internals.ConsumerProtocol
import org.apache.kafka.common.KafkaException
import org.apache.kafka.common.record._
-import org.apache.kafka.common.utils.Utils
+import org.apache.kafka.common.utils.{Time, Utils}
import scala.collection.{Map, mutable}
import scala.collection.mutable.ArrayBuffer
@@ -321,7 +321,7 @@ object DumpLogSegments {
private def parseGroupMetadata(groupMetadataKey: GroupMetadataKey, payload: ByteBuffer) = {
val groupId = groupMetadataKey.key
- val group = GroupMetadataManager.readGroupMessageValue(groupId, payload)
+ val group = GroupMetadataManager.readGroupMessageValue(groupId, payload, Time.SYSTEM)
val protocolType = group.protocolType.getOrElse("")
val assignment = group.allMemberMetadata.map { member =>
diff --git a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
index f4ff8e1..69b6525 100644
--- a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
@@ -334,7 +334,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
private def createOffsetCommitRequest = {
new requests.OffsetCommitRequest.Builder(
group, Map(tp -> new requests.OffsetCommitRequest.PartitionData(0, "metadata")).asJava).
- setMemberId("").setGenerationId(1).setRetentionTime(1000).
+ setMemberId("").setGenerationId(1).
build()
}
diff --git a/core/src/test/scala/unit/kafka/api/ApiVersionTest.scala b/core/src/test/scala/unit/kafka/api/ApiVersionTest.scala
index 32e4085..2befc8f 100644
--- a/core/src/test/scala/unit/kafka/api/ApiVersionTest.scala
+++ b/core/src/test/scala/unit/kafka/api/ApiVersionTest.scala
@@ -79,6 +79,9 @@ class ApiVersionTest {
assertEquals(KAFKA_2_0_IV1, ApiVersion("2.0"))
assertEquals(KAFKA_2_0_IV0, ApiVersion("2.0-IV0"))
assertEquals(KAFKA_2_0_IV1, ApiVersion("2.0-IV1"))
+
+ assertEquals(KAFKA_2_1_IV0, ApiVersion("2.1"))
+ assertEquals(KAFKA_2_1_IV0, ApiVersion("2.1-IV0"))
}
@Test
diff --git a/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala b/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala
index 3bfacab..77e6fdc 100644
--- a/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala
@@ -17,20 +17,22 @@
package kafka.coordinator.group
-import kafka.api.ApiVersion
+import kafka.api.{ApiVersion, KAFKA_1_1_IV0, KAFKA_2_1_IV0}
import kafka.cluster.Partition
import kafka.common.OffsetAndMetadata
import kafka.log.{Log, LogAppendInfo}
import kafka.server.{FetchDataInfo, KafkaConfig, LogOffsetMetadata, ReplicaManager}
import kafka.utils.TestUtils.fail
import kafka.utils.{KafkaScheduler, MockTime, TestUtils}
+import org.apache.kafka.clients.consumer.internals.ConsumerProtocol
+import org.apache.kafka.clients.consumer.internals.PartitionAssignor.Subscription
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.record._
import org.apache.kafka.common.requests.{IsolationLevel, OffsetFetchResponse}
import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse
import org.easymock.{Capture, EasyMock, IAnswer}
-import org.junit.Assert.{assertEquals, assertFalse, assertTrue, assertNull}
+import org.junit.Assert.{assertEquals, assertFalse, assertNull, assertTrue}
import org.junit.{Before, Test}
import java.nio.ByteBuffer
@@ -52,6 +54,7 @@ class GroupMetadataManagerTest {
var scheduler: KafkaScheduler = null
var zkClient: KafkaZkClient = null
var partition: Partition = null
+ var defaultOffsetRetentionMs = Long.MaxValue
val groupId = "foo"
val groupPartitionId = 0
@@ -75,6 +78,8 @@ class GroupMetadataManagerTest {
offsetCommitTimeoutMs = config.offsetCommitTimeoutMs,
offsetCommitRequiredAcks = config.offsetCommitRequiredAcks)
+ defaultOffsetRetentionMs = offsetConfig.offsetsRetentionMs
+
// make two partitions of the group topic to make sure some partitions are not owned by the coordinator
zkClient = EasyMock.createNiceMock(classOf[KafkaZkClient])
EasyMock.expect(zkClient.getTopicPartitionCount(Topic.GROUP_METADATA_TOPIC_NAME)).andReturn(Some(2))
@@ -506,7 +511,7 @@ class GroupMetadataManagerTest {
// group is owned but does not exist yet
assertTrue(groupMetadataManager.groupNotExists(groupId))
- val group = new GroupMetadata(groupId, initialState = Empty)
+ val group = new GroupMetadata(groupId, Empty, time)
groupMetadataManager.addGroup(group)
// group is owned but not Dead
@@ -616,6 +621,7 @@ class GroupMetadataManagerTest {
assertEquals(committedOffsets.size, group.allOffsets.size)
committedOffsets.foreach { case (topicPartition, offset) =>
assertEquals(Some(offset), group.offset(topicPartition).map(_.offset))
+ assertTrue(group.offset(topicPartition).map(_.expireTimestamp).contains(None))
}
}
@@ -729,9 +735,9 @@ class GroupMetadataManagerTest {
@Test
def testAddGroup() {
- val group = new GroupMetadata("foo", initialState = Empty)
+ val group = new GroupMetadata("foo", Empty, time)
assertEquals(group, groupMetadataManager.addGroup(group))
- assertEquals(group, groupMetadataManager.addGroup(new GroupMetadata("foo", initialState = Empty)))
+ assertEquals(group, groupMetadataManager.addGroup(new GroupMetadata("foo", Empty, time)))
}
@Test
@@ -739,7 +745,7 @@ class GroupMetadataManagerTest {
val generation = 27
val protocolType = "consumer"
- val group = GroupMetadata.loadGroup(groupId, Empty, generation, protocolType, null, null, Seq.empty)
+ val group = GroupMetadata.loadGroup(groupId, Empty, generation, protocolType, null, null, None, Seq.empty, time)
groupMetadataManager.addGroup(group)
val capturedRecords = expectAppendMessage(Errors.NONE)
@@ -758,7 +764,7 @@ class GroupMetadataManagerTest {
assertEquals(1, records.size)
val record = records.head
- val groupMetadata = GroupMetadataManager.readGroupMessageValue(groupId, record.value)
+ val groupMetadata = GroupMetadataManager.readGroupMessageValue(groupId, record.value, time)
assertTrue(groupMetadata.is(Empty))
assertEquals(generation, groupMetadata.generationId)
assertEquals(Some(protocolType), groupMetadata.protocolType)
@@ -766,7 +772,7 @@ class GroupMetadataManagerTest {
@Test
def testStoreEmptySimpleGroup() {
- val group = new GroupMetadata(groupId, initialState = Empty)
+ val group = new GroupMetadata(groupId, Empty, time)
groupMetadataManager.addGroup(group)
val capturedRecords = expectAppendMessage(Errors.NONE)
@@ -787,7 +793,7 @@ class GroupMetadataManagerTest {
assertEquals(1, records.size)
val record = records.head
- val groupMetadata = GroupMetadataManager.readGroupMessageValue(groupId, record.value)
+ val groupMetadata = GroupMetadataManager.readGroupMessageValue(groupId, record.value, time)
assertTrue(groupMetadata.is(Empty))
assertEquals(0, groupMetadata.generationId)
assertEquals(None, groupMetadata.protocolType)
@@ -809,7 +815,7 @@ class GroupMetadataManagerTest {
private def assertStoreGroupErrorMapping(appendError: Errors, expectedError: Errors) {
EasyMock.reset(replicaManager)
- val group = new GroupMetadata(groupId, initialState = Empty)
+ val group = new GroupMetadata(groupId, Empty, time)
groupMetadataManager.addGroup(group)
expectAppendMessage(appendError)
@@ -832,7 +838,7 @@ class GroupMetadataManagerTest {
val clientId = "clientId"
val clientHost = "localhost"
- val group = new GroupMetadata(groupId, initialState = Empty)
+ val group = new GroupMetadata(groupId, Empty, time)
groupMetadataManager.addGroup(group)
val member = new MemberMetadata(memberId, groupId, clientId, clientHost, rebalanceTimeout, sessionTimeout,
@@ -863,7 +869,7 @@ class GroupMetadataManagerTest {
val clientId = "clientId"
val clientHost = "localhost"
- val group = new GroupMetadata(groupId, initialState = Empty)
+ val group = new GroupMetadata(groupId, Empty, time)
val member = new MemberMetadata(memberId, groupId, clientId, clientHost, rebalanceTimeout, sessionTimeout,
protocolType, List(("protocol", Array[Byte]())))
@@ -893,7 +899,7 @@ class GroupMetadataManagerTest {
groupMetadataManager.addPartitionOwnership(groupPartitionId)
- val group = new GroupMetadata(groupId, initialState = Empty)
+ val group = new GroupMetadata(groupId, Empty, time)
groupMetadataManager.addGroup(group)
val offsets = immutable.Map(topicPartition -> OffsetAndMetadata(offset))
@@ -935,7 +941,7 @@ class GroupMetadataManagerTest {
groupMetadataManager.addPartitionOwnership(groupPartitionId)
- val group = new GroupMetadata(groupId, initialState = Empty)
+ val group = new GroupMetadata(groupId, Empty, time)
groupMetadataManager.addGroup(group)
val offsets = immutable.Map(topicPartition -> OffsetAndMetadata(offset))
@@ -975,7 +981,7 @@ class GroupMetadataManagerTest {
groupMetadataManager.addPartitionOwnership(groupPartitionId)
- val group = new GroupMetadata(groupId, initialState = Empty)
+ val group = new GroupMetadata(groupId, Empty, time)
groupMetadataManager.addGroup(group)
val offsets = immutable.Map(topicPartition -> OffsetAndMetadata(offset))
@@ -1014,7 +1020,7 @@ class GroupMetadataManagerTest {
groupMetadataManager.addPartitionOwnership(groupPartitionId)
- val group = new GroupMetadata(groupId, initialState = Empty)
+ val group = new GroupMetadata(groupId, Empty, time)
groupMetadataManager.addGroup(group)
val offsets = immutable.Map(topicPartition -> OffsetAndMetadata(offset))
@@ -1052,7 +1058,7 @@ class GroupMetadataManagerTest {
groupMetadataManager.addPartitionOwnership(groupPartitionId)
- val group = new GroupMetadata(groupId, initialState = Empty)
+ val group = new GroupMetadata(groupId, Empty, time)
groupMetadataManager.addGroup(group)
val offsets = immutable.Map(topicPartition -> OffsetAndMetadata(offset))
@@ -1094,7 +1100,7 @@ class GroupMetadataManagerTest {
groupMetadataManager.addPartitionOwnership(groupPartitionId)
- val group = new GroupMetadata(groupId, initialState = Empty)
+ val group = new GroupMetadata(groupId, Empty, time)
groupMetadataManager.addGroup(group)
val offsets = immutable.Map(topicPartition -> OffsetAndMetadata(offset))
@@ -1132,7 +1138,7 @@ class GroupMetadataManagerTest {
groupMetadataManager.addPartitionOwnership(groupPartitionId)
- val group = new GroupMetadata(groupId, initialState = Empty)
+ val group = new GroupMetadata(groupId, Empty, time)
groupMetadataManager.addGroup(group)
// expire the offset after 1 millisecond
@@ -1185,7 +1191,7 @@ class GroupMetadataManagerTest {
groupMetadataManager.addPartitionOwnership(groupPartitionId)
- val group = new GroupMetadata(groupId, initialState = Empty)
+ val group = new GroupMetadata(groupId, Empty, time)
groupMetadataManager.addGroup(group)
group.generationId = 5
@@ -1233,7 +1239,7 @@ class GroupMetadataManagerTest {
groupMetadataManager.addPartitionOwnership(groupPartitionId)
- val group = new GroupMetadata(groupId, initialState = Empty)
+ val group = new GroupMetadata(groupId, Empty, time)
groupMetadataManager.addGroup(group)
group.generationId = 5
@@ -1287,7 +1293,7 @@ class GroupMetadataManagerTest {
groupMetadataManager.addPartitionOwnership(groupPartitionId)
- val group = new GroupMetadata(groupId, initialState = Empty)
+ val group = new GroupMetadata(groupId, Empty, time)
groupMetadataManager.addGroup(group)
// expire the offset after 1 millisecond
@@ -1348,31 +1354,39 @@ class GroupMetadataManagerTest {
}
@Test
- def testExpireOffsetsWithActiveGroup() {
+ def testOffsetExpirationSemantics() {
val memberId = "memberId"
val clientId = "clientId"
val clientHost = "localhost"
- val topicPartition1 = new TopicPartition("foo", 0)
- val topicPartition2 = new TopicPartition("foo", 1)
+ val topic = "foo"
+ val topicPartition1 = new TopicPartition(topic, 0)
+ val topicPartition2 = new TopicPartition(topic, 1)
+ val topicPartition3 = new TopicPartition(topic, 2)
val offset = 37
groupMetadataManager.addPartitionOwnership(groupPartitionId)
- val group = new GroupMetadata(groupId, initialState = Empty)
+ val group = new GroupMetadata(groupId, Empty, time)
groupMetadataManager.addGroup(group)
+ val subscription = new Subscription(List(topic).asJava)
val member = new MemberMetadata(memberId, groupId, clientId, clientHost, rebalanceTimeout, sessionTimeout,
- protocolType, List(("protocol", Array[Byte]())))
+ protocolType, List(("protocol", ConsumerProtocol.serializeSubscription(subscription).array())))
member.awaitingJoinCallback = _ => ()
group.add(member)
group.transitionTo(PreparingRebalance)
group.initNextGeneration()
- // expire the offset after 1 millisecond
val startMs = time.milliseconds
+ // old clients, expiry timestamp is explicitly set
+ val tp1OffsetAndMetadata = OffsetAndMetadata(offset, "", startMs, startMs + 1)
+ val tp2OffsetAndMetadata = OffsetAndMetadata(offset, "", startMs, startMs + 3)
+ // new clients, no per-partition expiry timestamp, offsets of group expire together
+ val tp3OffsetAndMetadata = OffsetAndMetadata(offset, "", startMs)
val offsets = immutable.Map(
- topicPartition1 -> OffsetAndMetadata(offset, "", startMs, startMs + 1),
- topicPartition2 -> OffsetAndMetadata(offset, "", startMs, startMs + 3))
+ topicPartition1 -> tp1OffsetAndMetadata,
+ topicPartition2 -> tp2OffsetAndMetadata,
+ topicPartition3 -> tp3OffsetAndMetadata)
mockGetPartition()
expectAppendMessage(Errors.NONE)
@@ -1389,8 +1403,26 @@ class GroupMetadataManagerTest {
assertFalse(commitErrors.isEmpty)
assertEquals(Some(Errors.NONE), commitErrors.get.get(topicPartition1))
- // expire all of the offsets
- time.sleep(4)
+ // do not expire any offset even though expiration timestamp is reached for one (due to group still being active)
+ time.sleep(2)
+
+ groupMetadataManager.cleanupGroupMetadata()
+
+ // group and offsets should still be there
+ assertEquals(Some(group), groupMetadataManager.getGroup(groupId))
+ assertEquals(Some(tp1OffsetAndMetadata), group.offset(topicPartition1))
+ assertEquals(Some(tp2OffsetAndMetadata), group.offset(topicPartition2))
+ assertEquals(Some(tp3OffsetAndMetadata), group.offset(topicPartition3))
+
+ var cachedOffsets = groupMetadataManager.getOffsets(groupId, Some(Seq(topicPartition1, topicPartition2, topicPartition3)))
+ assertEquals(Some(offset), cachedOffsets.get(topicPartition1).map(_.offset))
+ assertEquals(Some(offset), cachedOffsets.get(topicPartition2).map(_.offset))
+ assertEquals(Some(offset), cachedOffsets.get(topicPartition3).map(_.offset))
+
+ EasyMock.verify(replicaManager)
+
+ group.transitionTo(PreparingRebalance)
+ group.transitionTo(Empty)
// expect the offset tombstone
EasyMock.reset(partition)
@@ -1401,16 +1433,245 @@ class GroupMetadataManagerTest {
groupMetadataManager.cleanupGroupMetadata()
- // group should still be there, but the offsets should be gone
+ // group is empty now, only one offset should expire
+ assertEquals(Some(group), groupMetadataManager.getGroup(groupId))
+ assertEquals(None, group.offset(topicPartition1))
+ assertEquals(Some(tp2OffsetAndMetadata), group.offset(topicPartition2))
+ assertEquals(Some(tp3OffsetAndMetadata), group.offset(topicPartition3))
+
+ cachedOffsets = groupMetadataManager.getOffsets(groupId, Some(Seq(topicPartition1, topicPartition2, topicPartition3)))
+ assertEquals(Some(OffsetFetchResponse.INVALID_OFFSET), cachedOffsets.get(topicPartition1).map(_.offset))
+ assertEquals(Some(offset), cachedOffsets.get(topicPartition2).map(_.offset))
+ assertEquals(Some(offset), cachedOffsets.get(topicPartition3).map(_.offset))
+
+ EasyMock.verify(replicaManager)
+
+ time.sleep(2)
+
+ // expect the offset tombstone
+ EasyMock.reset(partition)
+ EasyMock.expect(partition.appendRecordsToLeader(EasyMock.anyObject(classOf[MemoryRecords]),
+ isFromClient = EasyMock.eq(false), requiredAcks = EasyMock.anyInt()))
+ .andReturn(LogAppendInfo.UnknownLogAppendInfo)
+ EasyMock.replay(partition)
+
+ groupMetadataManager.cleanupGroupMetadata()
+
+ // one more offset should expire
assertEquals(Some(group), groupMetadataManager.getGroup(groupId))
assertEquals(None, group.offset(topicPartition1))
assertEquals(None, group.offset(topicPartition2))
+ assertEquals(Some(tp3OffsetAndMetadata), group.offset(topicPartition3))
- val cachedOffsets = groupMetadataManager.getOffsets(groupId, Some(Seq(topicPartition1, topicPartition2)))
+ cachedOffsets = groupMetadataManager.getOffsets(groupId, Some(Seq(topicPartition1, topicPartition2, topicPartition3)))
+ assertEquals(Some(OffsetFetchResponse.INVALID_OFFSET), cachedOffsets.get(topicPartition1).map(_.offset))
+ assertEquals(Some(OffsetFetchResponse.INVALID_OFFSET), cachedOffsets.get(topicPartition2).map(_.offset))
+ assertEquals(Some(offset), cachedOffsets.get(topicPartition3).map(_.offset))
+
+ EasyMock.verify(replicaManager)
+
+ // advance time to just before the offset of last partition is to be expired, no offset should expire
+ time.sleep(group.currentStateTimestamp.get + defaultOffsetRetentionMs - time.milliseconds() - 1)
+
+ groupMetadataManager.cleanupGroupMetadata()
+
+ // one more offset should expire
+ assertEquals(Some(group), groupMetadataManager.getGroup(groupId))
+ assertEquals(None, group.offset(topicPartition1))
+ assertEquals(None, group.offset(topicPartition2))
+ assertEquals(Some(tp3OffsetAndMetadata), group.offset(topicPartition3))
+
+ cachedOffsets = groupMetadataManager.getOffsets(groupId, Some(Seq(topicPartition1, topicPartition2, topicPartition3)))
+ assertEquals(Some(OffsetFetchResponse.INVALID_OFFSET), cachedOffsets.get(topicPartition1).map(_.offset))
+ assertEquals(Some(OffsetFetchResponse.INVALID_OFFSET), cachedOffsets.get(topicPartition2).map(_.offset))
+ assertEquals(Some(offset), cachedOffsets.get(topicPartition3).map(_.offset))
+
+ EasyMock.verify(replicaManager)
+
+ // advance time enough for that last offset to expire
+ time.sleep(2)
+
+ // expect the offset tombstone
+ EasyMock.reset(partition)
+ EasyMock.expect(partition.appendRecordsToLeader(EasyMock.anyObject(classOf[MemoryRecords]),
+ isFromClient = EasyMock.eq(false), requiredAcks = EasyMock.anyInt()))
+ .andReturn(LogAppendInfo.UnknownLogAppendInfo)
+ EasyMock.replay(partition)
+
+ groupMetadataManager.cleanupGroupMetadata()
+
+ // group and all its offsets should be gone now
+ assertEquals(None, groupMetadataManager.getGroup(groupId))
+ assertEquals(None, group.offset(topicPartition1))
+ assertEquals(None, group.offset(topicPartition2))
+ assertEquals(None, group.offset(topicPartition3))
+
+ cachedOffsets = groupMetadataManager.getOffsets(groupId, Some(Seq(topicPartition1, topicPartition2, topicPartition3)))
assertEquals(Some(OffsetFetchResponse.INVALID_OFFSET), cachedOffsets.get(topicPartition1).map(_.offset))
assertEquals(Some(OffsetFetchResponse.INVALID_OFFSET), cachedOffsets.get(topicPartition2).map(_.offset))
+ assertEquals(Some(OffsetFetchResponse.INVALID_OFFSET), cachedOffsets.get(topicPartition3).map(_.offset))
EasyMock.verify(replicaManager)
+
+ assert(group.is(Dead))
+ }
+
+ @Test
+ def testOffsetExpirationOfSimpleConsumer() {
+ val memberId = "memberId"
+ val clientId = "clientId"
+ val clientHost = "localhost"
+ val topic = "foo"
+ val topicPartition1 = new TopicPartition(topic, 0)
+ val offset = 37
+
+ groupMetadataManager.addPartitionOwnership(groupPartitionId)
+
+ val group = new GroupMetadata(groupId, Empty, time)
+ groupMetadataManager.addGroup(group)
+
+ // expire the offset after 1 and 3 milliseconds (old clients) and after default retention (new clients)
+ val startMs = time.milliseconds
+ // old clients, expiry timestamp is explicitly set
+ val tp1OffsetAndMetadata = OffsetAndMetadata(offset, "", startMs)
+ val tp2OffsetAndMetadata = OffsetAndMetadata(offset, "", startMs)
+ // new clients, no per-partition expiry timestamp, offsets of group expire together
+ val offsets = immutable.Map(
+ topicPartition1 -> tp1OffsetAndMetadata)
+
+ mockGetPartition()
+ expectAppendMessage(Errors.NONE)
+ EasyMock.replay(replicaManager)
+
+ var commitErrors: Option[immutable.Map[TopicPartition, Errors]] = None
+ def callback(errors: immutable.Map[TopicPartition, Errors]) {
+ commitErrors = Some(errors)
+ }
+
+ groupMetadataManager.storeOffsets(group, memberId, offsets, callback)
+ assertTrue(group.hasOffsets)
+
+ assertFalse(commitErrors.isEmpty)
+ assertEquals(Some(Errors.NONE), commitErrors.get.get(topicPartition1))
+
+ // do not expire offsets while within retention period since commit timestamp
+ val expiryTimestamp = offsets.get(topicPartition1).get.commitTimestamp + defaultOffsetRetentionMs
+ time.sleep(expiryTimestamp - time.milliseconds() - 1)
+
+ groupMetadataManager.cleanupGroupMetadata()
+
+ // group and offsets should still be there
+ assertEquals(Some(group), groupMetadataManager.getGroup(groupId))
+ assertEquals(Some(tp1OffsetAndMetadata), group.offset(topicPartition1))
+
+ var cachedOffsets = groupMetadataManager.getOffsets(groupId, Some(Seq(topicPartition1)))
+ assertEquals(Some(offset), cachedOffsets.get(topicPartition1).map(_.offset))
+
+ EasyMock.verify(replicaManager)
+
+ // advance time to enough for offsets to expire
+ time.sleep(2)
+
+ // expect the offset tombstone
+ EasyMock.reset(partition)
+ EasyMock.expect(partition.appendRecordsToLeader(EasyMock.anyObject(classOf[MemoryRecords]),
+ isFromClient = EasyMock.eq(false), requiredAcks = EasyMock.anyInt()))
+ .andReturn(LogAppendInfo.UnknownLogAppendInfo)
+ EasyMock.replay(partition)
+
+ groupMetadataManager.cleanupGroupMetadata()
+
+ // group and all its offsets should be gone now
+ assertEquals(None, groupMetadataManager.getGroup(groupId))
+ assertEquals(None, group.offset(topicPartition1))
+
+ cachedOffsets = groupMetadataManager.getOffsets(groupId, Some(Seq(topicPartition1)))
+ assertEquals(Some(OffsetFetchResponse.INVALID_OFFSET), cachedOffsets.get(topicPartition1).map(_.offset))
+
+ EasyMock.verify(replicaManager)
+
+ assert(group.is(Dead))
+ }
+
+ @Test
+ def testLoadOffsetFromOldCommit() = {
+ val groupMetadataTopicPartition = groupTopicPartition
+ val generation = 935
+ val protocolType = "consumer"
+ val protocol = "range"
+ val startOffset = 15L
+ val committedOffsets = Map(
+ new TopicPartition("foo", 0) -> 23L,
+ new TopicPartition("foo", 1) -> 455L,
+ new TopicPartition("bar", 0) -> 8992L
+ )
+
+ val apiVersion = KAFKA_1_1_IV0
+ val offsetCommitRecords = createCommittedOffsetRecords(committedOffsets, apiVersion = apiVersion, retentionTime = Some(100))
+ val memberId = "98098230493"
+ val groupMetadataRecord = buildStableGroupRecordWithMember(generation, protocolType, protocol, memberId, apiVersion)
+ val records = MemoryRecords.withRecords(startOffset, CompressionType.NONE,
+ offsetCommitRecords ++ Seq(groupMetadataRecord): _*)
+
+ expectGroupMetadataLoad(groupMetadataTopicPartition, startOffset, records)
+
+ EasyMock.replay(replicaManager)
+
+ groupMetadataManager.loadGroupsAndOffsets(groupMetadataTopicPartition, _ => ())
+
+ val group = groupMetadataManager.getGroup(groupId).getOrElse(fail("Group was not loaded into the cache"))
+ assertEquals(groupId, group.groupId)
+ assertEquals(Stable, group.currentState)
+ assertEquals(memberId, group.leaderOrNull)
+ assertEquals(generation, group.generationId)
+ assertEquals(Some(protocolType), group.protocolType)
+ assertEquals(protocol, group.protocolOrNull)
+ assertEquals(Set(memberId), group.allMembers)
+ assertEquals(committedOffsets.size, group.allOffsets.size)
+ committedOffsets.foreach { case (topicPartition, offset) =>
+ assertEquals(Some(offset), group.offset(topicPartition).map(_.offset))
+ assertTrue(group.offset(topicPartition).map(_.expireTimestamp).get.nonEmpty)
+ }
+ }
+
+ @Test
+ def testLoadOffsetWithExplicitRetention() = {
+ val groupMetadataTopicPartition = groupTopicPartition
+ val generation = 935
+ val protocolType = "consumer"
+ val protocol = "range"
+ val startOffset = 15L
+ val committedOffsets = Map(
+ new TopicPartition("foo", 0) -> 23L,
+ new TopicPartition("foo", 1) -> 455L,
+ new TopicPartition("bar", 0) -> 8992L
+ )
+
+ val offsetCommitRecords = createCommittedOffsetRecords(committedOffsets, retentionTime = Some(100))
+ val memberId = "98098230493"
+ val groupMetadataRecord = buildStableGroupRecordWithMember(generation, protocolType, protocol, memberId)
+ val records = MemoryRecords.withRecords(startOffset, CompressionType.NONE,
+ offsetCommitRecords ++ Seq(groupMetadataRecord): _*)
+
+ expectGroupMetadataLoad(groupMetadataTopicPartition, startOffset, records)
+
+ EasyMock.replay(replicaManager)
+
+ groupMetadataManager.loadGroupsAndOffsets(groupMetadataTopicPartition, _ => ())
+
+ val group = groupMetadataManager.getGroup(groupId).getOrElse(fail("Group was not loaded into the cache"))
+ assertEquals(groupId, group.groupId)
+ assertEquals(Stable, group.currentState)
+ assertEquals(memberId, group.leaderOrNull)
+ assertEquals(generation, group.generationId)
+ assertEquals(Some(protocolType), group.protocolType)
+ assertEquals(protocol, group.protocolOrNull)
+ assertEquals(Set(memberId), group.allMembers)
+ assertEquals(committedOffsets.size, group.allOffsets.size)
+ committedOffsets.foreach { case (topicPartition, offset) =>
+ assertEquals(Some(offset), group.offset(topicPartition).map(_.offset))
+ assertTrue(group.offset(topicPartition).map(_.expireTimestamp).get.nonEmpty)
+ }
}
private def appendAndCaptureCallback(): Capture[Map[TopicPartition, PartitionResponse] => Unit] = {
@@ -1452,20 +1713,21 @@ class GroupMetadataManagerTest {
private def buildStableGroupRecordWithMember(generation: Int,
protocolType: String,
protocol: String,
- memberId: String): SimpleRecord = {
+ memberId: String,
+ apiVersion: ApiVersion = ApiVersion.latestVersion): SimpleRecord = {
val memberProtocols = List((protocol, Array.emptyByteArray))
val member = new MemberMetadata(memberId, groupId, "clientId", "clientHost", 30000, 10000, protocolType, memberProtocols)
- val group = GroupMetadata.loadGroup(groupId, Stable, generation, protocolType, protocol,
- leaderId = memberId, Seq(member))
+ val group = GroupMetadata.loadGroup(groupId, Stable, generation, protocolType, protocol, memberId,
+ if (apiVersion >= KAFKA_2_1_IV0) Some(time.milliseconds()) else None, Seq(member), time)
val groupMetadataKey = GroupMetadataManager.groupMetadataKey(groupId)
- val groupMetadataValue = GroupMetadataManager.groupMetadataValue(group, Map(memberId -> Array.empty[Byte]))
+ val groupMetadataValue = GroupMetadataManager.groupMetadataValue(group, Map(memberId -> Array.empty[Byte]), apiVersion)
new SimpleRecord(groupMetadataKey, groupMetadataValue)
}
private def buildEmptyGroupRecord(generation: Int, protocolType: String): SimpleRecord = {
- val group = GroupMetadata.loadGroup(groupId, Empty, generation, protocolType, null, null, Seq.empty)
+ val group = GroupMetadata.loadGroup(groupId, Empty, generation, protocolType, null, null, None, Seq.empty, time)
val groupMetadataKey = GroupMetadataManager.groupMetadataKey(groupId)
- val groupMetadataValue = GroupMetadataManager.groupMetadataValue(group, Map.empty)
+ val groupMetadataValue = GroupMetadataManager.groupMetadataValue(group, Map.empty, ApiVersion.latestVersion)
new SimpleRecord(groupMetadataKey, groupMetadataValue)
}
@@ -1511,11 +1773,19 @@ class GroupMetadataManagerTest {
}
private def createCommittedOffsetRecords(committedOffsets: Map[TopicPartition, Long],
- groupId: String = groupId): Seq[SimpleRecord] = {
+ groupId: String = groupId,
+ apiVersion: ApiVersion = ApiVersion.latestVersion,
+ retentionTime: Option[Long] = None): Seq[SimpleRecord] = {
committedOffsets.map { case (topicPartition, offset) =>
- val offsetAndMetadata = OffsetAndMetadata(offset)
+ val offsetAndMetadata = retentionTime match {
+ case Some(timestamp) =>
+ val commitTimestamp = time.milliseconds()
+ OffsetAndMetadata(offset, "", commitTimestamp, commitTimestamp + timestamp)
+ case None =>
+ OffsetAndMetadata(offset)
+ }
val offsetCommitKey = GroupMetadataManager.offsetCommitKey(groupId, topicPartition)
- val offsetCommitValue = GroupMetadataManager.offsetCommitValue(offsetAndMetadata)
+ val offsetCommitValue = GroupMetadataManager.offsetCommitValue(offsetAndMetadata, apiVersion)
new SimpleRecord(offsetCommitKey, offsetCommitValue)
}.toSeq
}
@@ -1542,7 +1812,7 @@ class GroupMetadataManagerTest {
def testMetrics() {
groupMetadataManager.cleanupGroupMetadata()
expectMetrics(groupMetadataManager, 0, 0, 0)
- val group = new GroupMetadata("foo2", Stable)
+ val group = new GroupMetadata("foo2", Stable, time)
groupMetadataManager.addGroup(group)
expectMetrics(groupMetadataManager, 1, 0, 0)
group.transitionTo(PreparingRebalance)
diff --git a/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataTest.scala b/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataTest.scala
index 183860f..9054533 100644
--- a/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataTest.scala
+++ b/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataTest.scala
@@ -18,9 +18,8 @@
package kafka.coordinator.group
import kafka.common.OffsetAndMetadata
-
import org.apache.kafka.common.TopicPartition
-
+import org.apache.kafka.common.utils.Time
import org.junit.Assert._
import org.junit.{Before, Test}
import org.scalatest.junit.JUnitSuite
@@ -40,7 +39,7 @@ class GroupMetadataTest extends JUnitSuite {
@Before
def setUp() {
- group = new GroupMetadata("groupId", initialState = Empty)
+ group = new GroupMetadata("groupId", Empty, Time.SYSTEM)
}
@Test
diff --git a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
index 0205bcf..d91e008 100644
--- a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
+++ b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
@@ -240,7 +240,7 @@ class RequestQuotaTest extends BaseRequestTest {
case ApiKeys.OFFSET_COMMIT =>
new OffsetCommitRequest.Builder("test-group",
Map(tp -> new OffsetCommitRequest.PartitionData(0, "metadata")).asJava).
- setMemberId("").setGenerationId(1).setRetentionTime(1000)
+ setMemberId("").setGenerationId(1)
case ApiKeys.OFFSET_FETCH =>
new OffsetFetchRequest.Builder("test-group", List(tp).asJava)
diff --git a/docs/upgrade.html b/docs/upgrade.html
index cb246f6..26c0d15 100644
--- a/docs/upgrade.html
+++ b/docs/upgrade.html
@@ -19,6 +19,18 @@
<script id="upgrade-template" type="text/x-handlebars-template">
+<h4><a id="upgrade_2_1_0" href="#upgrade_2_1_0">Upgrading from 0.8.x, 0.9.x, 0.10.0.x, 0.10.1.x, 0.10.2.x, 0.11.0.x, 1.0.x, 1.1.x, or 2.0.0 to 2.1.0</a></h4>
+<p><b>Additional Upgrade Notes:</b></p>
+
+<ol>
+ <li>Offset expiration semantics has slightly changed in this version. According to the new semantics, offsets of partitions in a group will
+ not be removed while the group is subscribed to the corresponding topic and is still active (has active consumers). If group becomes
+ empty all its offsets will be removed after default offset retention period (or the one set by broker) has passed (unless the group becomes
+ active again). Offsets associated with standalone (simple) consumers, that do not use Kafka group management, will be removed after default
+ offset retention period (or the one set by broker) has passed since their last commit.</li>
+</ol>
+
+
<h4><a id="upgrade_2_0_0" href="#upgrade_2_0_0">Upgrading from 0.8.x, 0.9.x, 0.10.0.x, 0.10.1.x, 0.10.2.x, 0.11.0.x, 1.0.x, or 1.1.x to 2.0.0</a></h4>
<p>Kafka 2.0.0 introduces wire protocol changes. By following the recommended rolling upgrade plan below,
you guarantee no downtime during the upgrade. However, please review the <a href="#upgrade_200_notable">notable changes in 2.0.0</a> before upgrading.
@@ -66,6 +78,9 @@
until all brokers in the cluster have been updated.
<p><b>NOTE:</b> any prefixed ACLs added to a cluster, even after the cluster is fully upgraded, will be ignored should the cluster be downgraded again.
</li>
+ <li>The default for console consumer's <code>enable.auto.commit</code> property when no <code>group.id</code> is provided is now set to <code>false</code>.
+ This is to avoid polluting the consumer coordinator cache as the auto-generated group is not likely to be used by other consumers.
+ </li>
</ol>
<h5><a id="upgrade_200_notable" href="#upgrade_200_notable">Notable changes in 2.0.0</a></h5>
@@ -112,9 +127,9 @@
<code>beginningOffsets</code>, <code>endOffsets</code> and <code>close</code> that take in a <code>Duration</code>.</li>
<li>Also as part of KIP-266, the default value of <code>request.timeout.ms</code> has been changed to 30 seconds.
The previous value was a little higher than 5 minutes to account for maximum time that a rebalance would take.
- Now we treat the JoinGroup request in the rebalance as a special case and use a value derived from
+ Now we treat the JoinGroup request in the rebalance as a special case and use a value derived from
<code>max.poll.interval.ms</code> for the request timeout. All other request types use the timeout defined
- by <code>request.timeout.ms</code></li>
+ by <code>request.timeout.ms</code></li>
<li>The internal method <code>kafka.admin.AdminClient.deleteRecordsBefore</code> has been removed. Users are encouraged to migrate to <code>org.apache.kafka.clients.admin.AdminClient.deleteRecords</code>.</li>
<li>The AclCommand tool <code>--producer</code> convenience option uses the <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-277+-+Fine+Grained+ACL+for+CreateTopics+API">KIP-277</a> finer grained ACL on the given topic. </li>
<li><a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-176%3A+Remove+deprecated+new-consumer+option+for+tools">KIP-176</a> removes