You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2015/11/03 08:36:21 UTC
[2/3] kafka git commit: KAFKA-2017: Persist Group Metadata and
Assignment before Responding
http://git-wip-us.apache.org/repos/asf/kafka/blob/7c334752/core/src/main/scala/kafka/coordinator/MemberMetadata.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/MemberMetadata.scala b/core/src/main/scala/kafka/coordinator/MemberMetadata.scala
index 7f7df9a..6a76241 100644
--- a/core/src/main/scala/kafka/coordinator/MemberMetadata.scala
+++ b/core/src/main/scala/kafka/coordinator/MemberMetadata.scala
@@ -49,9 +49,12 @@ private[coordinator] class MemberMetadata(val memberId: String,
val sessionTimeoutMs: Int,
var supportedProtocols: List[(String, Array[Byte])]) {
- var assignment: Array[Byte] = null
+ // NOTE: we need to add memory barrier to assignment and awaitingSyncCallback
+ // since they can be accessed in the append callback thread that does not
+ // hold on the group object lock
+ @volatile var assignment: Array[Byte] = null
var awaitingJoinCallback: JoinGroupResult => Unit = null
- var awaitingSyncCallback: (Array[Byte], Short) => Unit = null
+ @volatile var awaitingSyncCallback: (Array[Byte], Short) => Unit = null
var latestHeartbeat: Long = -1
var isLeaving: Boolean = false
http://git-wip-us.apache.org/repos/asf/kafka/blob/7c334752/core/src/main/scala/kafka/coordinator/OffsetConfig.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/OffsetConfig.scala b/core/src/main/scala/kafka/coordinator/OffsetConfig.scala
new file mode 100644
index 0000000..92f56b2
--- /dev/null
+++ b/core/src/main/scala/kafka/coordinator/OffsetConfig.scala
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.coordinator
+
+import kafka.message.{NoCompressionCodec, CompressionCodec}
+
+/**
+ * Configuration settings for in-built offset management
+ * @param maxMetadataSize The maximum allowed metadata for any offset commit.
+ * @param loadBufferSize Batch size for reading from the offsets segments when loading offsets into the cache.
+ * @param offsetsRetentionMs Offsets older than this retention period will be discarded.
+ * @param offsetsRetentionCheckIntervalMs Frequency at which to check for expired offsets.
+ * @param offsetsTopicNumPartitions The number of partitions for the offset commit topic (should not change after deployment).
+ * @param offsetsTopicSegmentBytes The offsets topic segment bytes should be kept relatively small to facilitate faster
+ * log compaction and faster offset loads
+ * @param offsetsTopicReplicationFactor The replication factor for the offset commit topic (set higher to ensure availability).
+ * @param offsetsTopicCompressionCodec Compression codec for the offsets topic - compression should be turned on in
+ * order to achieve "atomic" commits.
+ * @param offsetCommitTimeoutMs The offset commit will be delayed until all replicas for the offsets topic receive the
+ * commit or this timeout is reached. (Similar to the producer request timeout.)
+ * @param offsetCommitRequiredAcks The required acks before the commit can be accepted. In general, the default (-1)
+ * should not be overridden.
+ */
+case class OffsetConfig(maxMetadataSize: Int = OffsetConfig.DefaultMaxMetadataSize,
+ loadBufferSize: Int = OffsetConfig.DefaultLoadBufferSize,
+ offsetsRetentionMs: Long = OffsetConfig.DefaultOffsetRetentionMs,
+ offsetsRetentionCheckIntervalMs: Long = OffsetConfig.DefaultOffsetsRetentionCheckIntervalMs,
+ offsetsTopicNumPartitions: Int = OffsetConfig.DefaultOffsetsTopicNumPartitions,
+ offsetsTopicSegmentBytes: Int = OffsetConfig.DefaultOffsetsTopicSegmentBytes,
+ offsetsTopicReplicationFactor: Short = OffsetConfig.DefaultOffsetsTopicReplicationFactor,
+ offsetsTopicCompressionCodec: CompressionCodec = OffsetConfig.DefaultOffsetsTopicCompressionCodec,
+ offsetCommitTimeoutMs: Int = OffsetConfig.DefaultOffsetCommitTimeoutMs,
+ offsetCommitRequiredAcks: Short = OffsetConfig.DefaultOffsetCommitRequiredAcks)
+
+object OffsetConfig {
+ val DefaultMaxMetadataSize = 4096
+ val DefaultLoadBufferSize = 5*1024*1024
+ val DefaultOffsetRetentionMs = 24*60*60*1000L
+ val DefaultOffsetsRetentionCheckIntervalMs = 600000L
+ val DefaultOffsetsTopicNumPartitions = 50
+ val DefaultOffsetsTopicSegmentBytes = 100*1024*1024
+ val DefaultOffsetsTopicReplicationFactor = 3.toShort
+ val DefaultOffsetsTopicCompressionCodec = NoCompressionCodec
+ val DefaultOffsetCommitTimeoutMs = 5000
+ val DefaultOffsetCommitRequiredAcks = (-1).toShort
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/kafka/blob/7c334752/core/src/main/scala/kafka/server/KafkaApis.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index dfcfa25..35c5956 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -116,12 +116,12 @@ class KafkaApis(val requestChannel: RequestChannel,
// for each new leader or follower, call coordinator to handle
// consumer group migration
result.updatedLeaders.foreach { case partition =>
- if (partition.topic == GroupCoordinator.OffsetsTopicName)
+ if (partition.topic == GroupCoordinator.GroupMetadataTopicName)
coordinator.handleGroupImmigration(partition.partitionId)
}
result.updatedFollowers.foreach { case partition =>
partition.leaderReplicaIdOpt.foreach { leaderReplica =>
- if (partition.topic == GroupCoordinator.OffsetsTopicName &&
+ if (partition.topic == GroupCoordinator.GroupMetadataTopicName &&
leaderReplica == brokerId)
coordinator.handleGroupEmigration(partition.partitionId)
}
@@ -546,9 +546,9 @@ class KafkaApis(val requestChannel: RequestChannel,
if (topics.size > 0 && topicResponses.size != topics.size) {
val nonExistentTopics = topics -- topicResponses.map(_.topic).toSet
val responsesForNonExistentTopics = nonExistentTopics.map { topic =>
- if (topic == GroupCoordinator.OffsetsTopicName || config.autoCreateTopicsEnable) {
+ if (topic == GroupCoordinator.GroupMetadataTopicName || config.autoCreateTopicsEnable) {
try {
- if (topic == GroupCoordinator.OffsetsTopicName) {
+ if (topic == GroupCoordinator.GroupMetadataTopicName) {
val aliveBrokers = metadataCache.getAliveBrokers
val offsetsTopicReplicationFactor =
if (aliveBrokers.length > 0)
@@ -689,7 +689,7 @@ class KafkaApis(val requestChannel: RequestChannel,
val partition = coordinator.partitionFor(groupMetadataRequest.group)
// get metadata (and create the topic if necessary)
- val offsetsTopicMetadata = getTopicMetadata(Set(GroupCoordinator.OffsetsTopicName), request.securityProtocol).head
+ val offsetsTopicMetadata = getTopicMetadata(Set(GroupCoordinator.GroupMetadataTopicName), request.securityProtocol).head
val errorResponse = GroupMetadataResponse(None, ErrorMapping.ConsumerCoordinatorNotAvailableCode, groupMetadataRequest.correlationId)
@@ -717,6 +717,7 @@ class KafkaApis(val requestChannel: RequestChannel,
val members = joinResult.members map { case (memberId, metadataArray) => (memberId, ByteBuffer.wrap(metadataArray)) }
val responseBody = new JoinGroupResponse(joinResult.errorCode, joinResult.generationId, joinResult.subProtocol,
joinResult.memberId, joinResult.leaderId, members)
+
trace("Sending join group response %s for correlation id %d to client %s."
.format(responseBody, request.header.correlationId, request.header.clientId))
requestChannel.sendResponse(new RequestChannel.Response(request, new ResponseSend(request.connectionId, responseHeader, responseBody)))
@@ -738,6 +739,7 @@ class KafkaApis(val requestChannel: RequestChannel,
coordinator.handleJoinGroup(
joinGroupRequest.groupId(),
joinGroupRequest.memberId(),
+ request.header.clientId(),
joinGroupRequest.sessionTimeout(),
joinGroupRequest.protocolType(),
protocols,
http://git-wip-us.apache.org/repos/asf/kafka/blob/7c334752/core/src/main/scala/kafka/server/KafkaConfig.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala
index b749446..2973dab 100755
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -23,6 +23,7 @@ import java.util.Properties
import kafka.api.ApiVersion
import kafka.cluster.EndPoint
import kafka.consumer.ConsumerConfig
+import kafka.coordinator.OffsetConfig
import kafka.message.{BrokerCompressionCodec, CompressionCodec, Message, MessageSet}
import kafka.utils.CoreUtils
import org.apache.kafka.clients.CommonClientConfigs
@@ -129,16 +130,16 @@ object Defaults {
val ConsumerMaxSessionTimeoutMs = 30000
/** ********* Offset management configuration ***********/
- val OffsetMetadataMaxSize = OffsetManagerConfig.DefaultMaxMetadataSize
- val OffsetsLoadBufferSize = OffsetManagerConfig.DefaultLoadBufferSize
- val OffsetsTopicReplicationFactor = OffsetManagerConfig.DefaultOffsetsTopicReplicationFactor
- val OffsetsTopicPartitions: Int = OffsetManagerConfig.DefaultOffsetsTopicNumPartitions
- val OffsetsTopicSegmentBytes: Int = OffsetManagerConfig.DefaultOffsetsTopicSegmentBytes
- val OffsetsTopicCompressionCodec: Int = OffsetManagerConfig.DefaultOffsetsTopicCompressionCodec.codec
+ val OffsetMetadataMaxSize = OffsetConfig.DefaultMaxMetadataSize
+ val OffsetsLoadBufferSize = OffsetConfig.DefaultLoadBufferSize
+ val OffsetsTopicReplicationFactor = OffsetConfig.DefaultOffsetsTopicReplicationFactor
+ val OffsetsTopicPartitions: Int = OffsetConfig.DefaultOffsetsTopicNumPartitions
+ val OffsetsTopicSegmentBytes: Int = OffsetConfig.DefaultOffsetsTopicSegmentBytes
+ val OffsetsTopicCompressionCodec: Int = OffsetConfig.DefaultOffsetsTopicCompressionCodec.codec
val OffsetsRetentionMinutes: Int = 24 * 60
- val OffsetsRetentionCheckIntervalMs: Long = OffsetManagerConfig.DefaultOffsetsRetentionCheckIntervalMs
- val OffsetCommitTimeoutMs = OffsetManagerConfig.DefaultOffsetCommitTimeoutMs
- val OffsetCommitRequiredAcks = OffsetManagerConfig.DefaultOffsetCommitRequiredAcks
+ val OffsetsRetentionCheckIntervalMs: Long = OffsetConfig.DefaultOffsetsRetentionCheckIntervalMs
+ val OffsetCommitTimeoutMs = OffsetConfig.DefaultOffsetCommitTimeoutMs
+ val OffsetCommitRequiredAcks = OffsetConfig.DefaultOffsetCommitRequiredAcks
/** ********* Quota Configuration ***********/
val ProducerQuotaBytesPerSecondDefault = ClientQuotaManagerConfig.QuotaBytesPerSecondDefault
http://git-wip-us.apache.org/repos/asf/kafka/blob/7c334752/core/src/main/scala/kafka/server/KafkaServer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala
index 732eb55..80cc6f1 100755
--- a/core/src/main/scala/kafka/server/KafkaServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaServer.scala
@@ -50,7 +50,7 @@ import kafka.common.{ErrorMapping, InconsistentBrokerIdException, GenerateBroker
import kafka.network.{BlockingChannel, SocketServer}
import kafka.metrics.KafkaMetricsGroup
import com.yammer.metrics.core.Gauge
-import kafka.coordinator.{GroupManagerConfig, GroupCoordinator}
+import kafka.coordinator.{GroupConfig, GroupCoordinator}
object KafkaServer {
// Copy the subset of properties that are relevant to Logs
http://git-wip-us.apache.org/repos/asf/kafka/blob/7c334752/core/src/main/scala/kafka/server/OffsetManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/OffsetManager.scala b/core/src/main/scala/kafka/server/OffsetManager.scala
deleted file mode 100755
index 967dc6f..0000000
--- a/core/src/main/scala/kafka/server/OffsetManager.scala
+++ /dev/null
@@ -1,627 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kafka.server
-
-import org.apache.kafka.common.protocol.Errors
-import org.apache.kafka.common.protocol.types.{Struct, Schema, Field}
-import org.apache.kafka.common.protocol.types.Type.STRING
-import org.apache.kafka.common.protocol.types.Type.INT32
-import org.apache.kafka.common.protocol.types.Type.INT64
-import org.apache.kafka.common.utils.Utils
-
-import kafka.utils._
-import kafka.common._
-import kafka.log.FileMessageSet
-import kafka.message._
-import kafka.metrics.KafkaMetricsGroup
-import kafka.common.TopicAndPartition
-import kafka.tools.MessageFormatter
-import kafka.api.ProducerResponseStatus
-import kafka.coordinator.GroupCoordinator
-
-import scala.Some
-import scala.collection._
-import java.io.PrintStream
-import java.util.concurrent.atomic.AtomicBoolean
-import java.nio.ByteBuffer
-import java.util.concurrent.TimeUnit
-
-import com.yammer.metrics.core.Gauge
-import org.I0Itec.zkclient.ZkClient
-
-/**
- * Configuration settings for in-built offset management
- * @param maxMetadataSize The maximum allowed metadata for any offset commit.
- * @param loadBufferSize Batch size for reading from the offsets segments when loading offsets into the cache.
- * @param offsetsRetentionMs Offsets older than this retention period will be discarded.
- * @param offsetsRetentionCheckIntervalMs Frequency at which to check for expired offsets.
- * @param offsetsTopicNumPartitions The number of partitions for the offset commit topic (should not change after deployment).
- * @param offsetsTopicSegmentBytes The offsets topic segment bytes should be kept relatively small to facilitate faster
- * log compaction and faster offset loads
- * @param offsetsTopicReplicationFactor The replication factor for the offset commit topic (set higher to ensure availability).
- * @param offsetsTopicCompressionCodec Compression codec for the offsets topic - compression should be turned on in
- * order to achieve "atomic" commits.
- * @param offsetCommitTimeoutMs The offset commit will be delayed until all replicas for the offsets topic receive the
- * commit or this timeout is reached. (Similar to the producer request timeout.)
- * @param offsetCommitRequiredAcks The required acks before the commit can be accepted. In general, the default (-1)
- * should not be overridden.
- */
-case class OffsetManagerConfig(maxMetadataSize: Int = OffsetManagerConfig.DefaultMaxMetadataSize,
- loadBufferSize: Int = OffsetManagerConfig.DefaultLoadBufferSize,
- offsetsRetentionMs: Long = OffsetManagerConfig.DefaultOffsetRetentionMs,
- offsetsRetentionCheckIntervalMs: Long = OffsetManagerConfig.DefaultOffsetsRetentionCheckIntervalMs,
- offsetsTopicNumPartitions: Int = OffsetManagerConfig.DefaultOffsetsTopicNumPartitions,
- offsetsTopicSegmentBytes: Int = OffsetManagerConfig.DefaultOffsetsTopicSegmentBytes,
- offsetsTopicReplicationFactor: Short = OffsetManagerConfig.DefaultOffsetsTopicReplicationFactor,
- offsetsTopicCompressionCodec: CompressionCodec = OffsetManagerConfig.DefaultOffsetsTopicCompressionCodec,
- offsetCommitTimeoutMs: Int = OffsetManagerConfig.DefaultOffsetCommitTimeoutMs,
- offsetCommitRequiredAcks: Short = OffsetManagerConfig.DefaultOffsetCommitRequiredAcks)
-
-object OffsetManagerConfig {
- val DefaultMaxMetadataSize = 4096
- val DefaultLoadBufferSize = 5*1024*1024
- val DefaultOffsetRetentionMs = 24*60*60*1000L
- val DefaultOffsetsRetentionCheckIntervalMs = 600000L
- val DefaultOffsetsTopicNumPartitions = 50
- val DefaultOffsetsTopicSegmentBytes = 100*1024*1024
- val DefaultOffsetsTopicReplicationFactor = 3.toShort
- val DefaultOffsetsTopicCompressionCodec = NoCompressionCodec
- val DefaultOffsetCommitTimeoutMs = 5000
- val DefaultOffsetCommitRequiredAcks = (-1).toShort
-}
-
-class OffsetManager(val config: OffsetManagerConfig,
- replicaManager: ReplicaManager,
- zkUtils: ZkUtils,
- scheduler: Scheduler) extends Logging with KafkaMetricsGroup {
-
- /* offsets and metadata cache */
- private val offsetsCache = new Pool[GroupTopicPartition, OffsetAndMetadata]
- private val followerTransitionLock = new Object
- private val loadingPartitions: mutable.Set[Int] = mutable.Set()
- private val cleanupOrLoadMutex = new Object
- private val shuttingDown = new AtomicBoolean(false)
- private val offsetsTopicPartitionCount = getOffsetsTopicPartitionCount
-
- this.logIdent = "[Offset Manager on Broker " + replicaManager.config.brokerId + "]: "
-
- scheduler.schedule(name = "delete-expired-consumer-offsets",
- fun = deleteExpiredOffsets,
- period = config.offsetsRetentionCheckIntervalMs,
- unit = TimeUnit.MILLISECONDS)
-
- newGauge("NumOffsets",
- new Gauge[Int] {
- def value = offsetsCache.size
- }
- )
-
- newGauge("NumGroups",
- new Gauge[Int] {
- def value = offsetsCache.keys.map(_.group).toSet.size
- }
- )
-
- private def deleteExpiredOffsets() {
- debug("Collecting expired offsets.")
- val startMs = SystemTime.milliseconds
-
- val numExpiredOffsetsRemoved = cleanupOrLoadMutex synchronized {
- val expiredOffsets = offsetsCache.filter { case (groupTopicPartition, offsetAndMetadata) =>
- offsetAndMetadata.expireTimestamp < startMs
- }
-
- debug("Found %d expired offsets.".format(expiredOffsets.size))
-
- // delete the expired offsets from the table and generate tombstone messages to remove them from the log
- val tombstonesForPartition = expiredOffsets.map { case (groupTopicAndPartition, offsetAndMetadata) =>
- val offsetsPartition = partitionFor(groupTopicAndPartition.group)
- trace("Removing expired offset and metadata for %s: %s".format(groupTopicAndPartition, offsetAndMetadata))
-
- offsetsCache.remove(groupTopicAndPartition)
-
- val commitKey = OffsetManager.offsetCommitKey(groupTopicAndPartition.group,
- groupTopicAndPartition.topicPartition.topic, groupTopicAndPartition.topicPartition.partition)
-
- (offsetsPartition, new Message(bytes = null, key = commitKey))
- }.groupBy { case (partition, tombstone) => partition }
-
- // Append the tombstone messages to the offset partitions. It is okay if the replicas don't receive these (say,
- // if we crash or leaders move) since the new leaders will get rid of expired offsets during their own purge cycles.
- tombstonesForPartition.flatMap { case (offsetsPartition, tombstones) =>
- val partitionOpt = replicaManager.getPartition(GroupCoordinator.OffsetsTopicName, offsetsPartition)
- partitionOpt.map { partition =>
- val appendPartition = TopicAndPartition(GroupCoordinator.OffsetsTopicName, offsetsPartition)
- val messages = tombstones.map(_._2).toSeq
-
- trace("Marked %d offsets in %s for deletion.".format(messages.size, appendPartition))
-
- try {
- // do not need to require acks since even if the tombsone is lost,
- // it will be appended again in the next purge cycle
- partition.appendMessagesToLeader(new ByteBufferMessageSet(config.offsetsTopicCompressionCodec, messages: _*))
- tombstones.size
- }
- catch {
- case t: Throwable =>
- error("Failed to mark %d expired offsets for deletion in %s.".format(messages.size, appendPartition), t)
- // ignore and continue
- 0
- }
- }
- }.sum
- }
-
- info("Removed %d expired offsets in %d milliseconds.".format(numExpiredOffsetsRemoved, SystemTime.milliseconds - startMs))
- }
-
-
- def partitionFor(group: String): Int = Utils.abs(group.hashCode) % offsetsTopicPartitionCount
-
- /**
- * Fetch the current offset for the given group/topic/partition from the underlying offsets storage.
- *
- * @param key The requested group-topic-partition
- * @return If the key is present, return the offset and metadata; otherwise return None
- */
- private def getOffset(key: GroupTopicPartition) = {
- val offsetAndMetadata = offsetsCache.get(key)
- if (offsetAndMetadata == null)
- OffsetMetadataAndError.NoOffset
- else
- OffsetMetadataAndError(offsetAndMetadata.offset, offsetAndMetadata.metadata, ErrorMapping.NoError)
- }
-
- /**
- * Put the (already committed) offset for the given group/topic/partition into the cache.
- *
- * @param key The group-topic-partition
- * @param offsetAndMetadata The offset/metadata to be stored
- */
- private def putOffset(key: GroupTopicPartition, offsetAndMetadata: OffsetAndMetadata) {
- offsetsCache.put(key, offsetAndMetadata)
- }
-
- /*
- * Check if the offset metadata length is valid
- */
- def validateOffsetMetadataLength(metadata: String) : Boolean = {
- metadata == null || metadata.length() <= config.maxMetadataSize
- }
-
- /**
- * Store offsets by appending it to the replicated log and then inserting to cache
- */
- def storeOffsets(groupId: String,
- consumerId: String,
- generationId: Int,
- offsetMetadata: immutable.Map[TopicAndPartition, OffsetAndMetadata],
- responseCallback: immutable.Map[TopicAndPartition, Short] => Unit) {
- // first filter out partitions with offset metadata size exceeding limit
- val filteredOffsetMetadata = offsetMetadata.filter { case (topicAndPartition, offsetAndMetadata) =>
- validateOffsetMetadataLength(offsetAndMetadata.metadata)
- }
-
- // construct the message set to append
- val messages = filteredOffsetMetadata.map { case (topicAndPartition, offsetAndMetadata) =>
- new Message(
- key = OffsetManager.offsetCommitKey(groupId, topicAndPartition.topic, topicAndPartition.partition),
- bytes = OffsetManager.offsetCommitValue(offsetAndMetadata)
- )
- }.toSeq
-
- val offsetTopicPartition = TopicAndPartition(GroupCoordinator.OffsetsTopicName, partitionFor(groupId))
-
- val offsetsAndMetadataMessageSet = Map(offsetTopicPartition ->
- new ByteBufferMessageSet(config.offsetsTopicCompressionCodec, messages:_*))
-
- // set the callback function to insert offsets into cache after log append completed
- def putCacheCallback(responseStatus: Map[TopicAndPartition, ProducerResponseStatus]) {
- // the append response should only contain the topics partition
- if (responseStatus.size != 1 || ! responseStatus.contains(offsetTopicPartition))
- throw new IllegalStateException("Append status %s should only have one partition %s"
- .format(responseStatus, offsetTopicPartition))
-
- // construct the commit response status and insert
- // the offset and metadata to cache if the append status has no error
- val status = responseStatus(offsetTopicPartition)
-
- val responseCode =
- if (status.error == ErrorMapping.NoError) {
- filteredOffsetMetadata.foreach { case (topicAndPartition, offsetAndMetadata) =>
- putOffset(GroupTopicPartition(groupId, topicAndPartition), offsetAndMetadata)
- }
- ErrorMapping.NoError
- } else {
- debug("Offset commit %s from group %s consumer %s with generation %d failed when appending to log due to %s"
- .format(filteredOffsetMetadata, groupId, consumerId, generationId, ErrorMapping.exceptionNameFor(status.error)))
-
- // transform the log append error code to the corresponding the commit status error code
- if (status.error == ErrorMapping.UnknownTopicOrPartitionCode)
- ErrorMapping.ConsumerCoordinatorNotAvailableCode
- else if (status.error == ErrorMapping.NotLeaderForPartitionCode)
- ErrorMapping.NotCoordinatorForConsumerCode
- else if (status.error == ErrorMapping.MessageSizeTooLargeCode
- || status.error == ErrorMapping.MessageSetSizeTooLargeCode
- || status.error == ErrorMapping.InvalidFetchSizeCode)
- Errors.INVALID_COMMIT_OFFSET_SIZE.code
- else
- status.error
- }
-
-
- // compute the final error codes for the commit response
- val commitStatus = offsetMetadata.map { case (topicAndPartition, offsetAndMetadata) =>
- if (validateOffsetMetadataLength(offsetAndMetadata.metadata))
- (topicAndPartition, responseCode)
- else
- (topicAndPartition, ErrorMapping.OffsetMetadataTooLargeCode)
- }
-
- // finally trigger the callback logic passed from the API layer
- responseCallback(commitStatus)
- }
-
- // call replica manager to append the offset messages
- replicaManager.appendMessages(
- config.offsetCommitTimeoutMs.toLong,
- config.offsetCommitRequiredAcks,
- true, // allow appending to internal offset topic
- offsetsAndMetadataMessageSet,
- putCacheCallback)
- }
-
- /**
- * The most important guarantee that this API provides is that it should never return a stale offset. i.e., it either
- * returns the current offset or it begins to sync the cache from the log (and returns an error code).
- */
- def getOffsets(group: String, topicPartitions: Seq[TopicAndPartition]): Map[TopicAndPartition, OffsetMetadataAndError] = {
- trace("Getting offsets %s for group %s.".format(topicPartitions, group))
-
- val offsetsPartition = partitionFor(group)
-
- /**
- * followerTransitionLock protects against fetching from an empty/cleared offset cache (i.e., cleared due to a
- * leader->follower transition). i.e., even if leader-is-local is true a follower transition can occur right after
- * the check and clear the cache. i.e., we would read from the empty cache and incorrectly return NoOffset.
- */
- followerTransitionLock synchronized {
- if (leaderIsLocal(offsetsPartition)) {
- if (loadingPartitions synchronized loadingPartitions.contains(offsetsPartition)) {
- debug("Cannot fetch offsets for group %s due to ongoing offset load.".format(group))
- topicPartitions.map { topicAndPartition =>
- val groupTopicPartition = GroupTopicPartition(group, topicAndPartition)
- (groupTopicPartition.topicPartition, OffsetMetadataAndError.OffsetsLoading)
- }.toMap
- } else {
- if (topicPartitions.size == 0) {
- // Return offsets for all partitions owned by this consumer group. (this only applies to consumers that commit offsets to Kafka.)
- offsetsCache.filter(_._1.group == group).map { case(groupTopicPartition, offsetAndMetadata) =>
- (groupTopicPartition.topicPartition, OffsetMetadataAndError(offsetAndMetadata.offset, offsetAndMetadata.metadata, ErrorMapping.NoError))
- }.toMap
- } else {
- topicPartitions.map { topicAndPartition =>
- val groupTopicPartition = GroupTopicPartition(group, topicAndPartition)
- (groupTopicPartition.topicPartition, getOffset(groupTopicPartition))
- }.toMap
- }
- }
- } else {
- debug("Could not fetch offsets for group %s (not offset coordinator).".format(group))
- topicPartitions.map { topicAndPartition =>
- val groupTopicPartition = GroupTopicPartition(group, topicAndPartition)
- (groupTopicPartition.topicPartition, OffsetMetadataAndError.NotCoordinatorForGroup)
- }.toMap
- }
- }
- }
-
- /**
- * Asynchronously read the partition from the offsets topic and populate the cache
- */
- def loadOffsetsFromLog(offsetsPartition: Int) {
-
- val topicPartition = TopicAndPartition(GroupCoordinator.OffsetsTopicName, offsetsPartition)
-
- loadingPartitions synchronized {
- if (loadingPartitions.contains(offsetsPartition)) {
- info("Offset load from %s already in progress.".format(topicPartition))
- } else {
- loadingPartitions.add(offsetsPartition)
- scheduler.schedule(topicPartition.toString, loadOffsets)
- }
- }
-
- def loadOffsets() {
- info("Loading offsets from " + topicPartition)
-
- val startMs = SystemTime.milliseconds
- try {
- replicaManager.logManager.getLog(topicPartition) match {
- case Some(log) =>
- var currOffset = log.logSegments.head.baseOffset
- val buffer = ByteBuffer.allocate(config.loadBufferSize)
- // loop breaks if leader changes at any time during the load, since getHighWatermark is -1
- cleanupOrLoadMutex synchronized {
- while (currOffset < getHighWatermark(offsetsPartition) && !shuttingDown.get()) {
- buffer.clear()
- val messages = log.read(currOffset, config.loadBufferSize).messageSet.asInstanceOf[FileMessageSet]
- messages.readInto(buffer, 0)
- val messageSet = new ByteBufferMessageSet(buffer)
- messageSet.foreach { msgAndOffset =>
- require(msgAndOffset.message.key != null, "Offset entry key should not be null")
- val key = OffsetManager.readMessageKey(msgAndOffset.message.key)
- if (msgAndOffset.message.payload == null) {
- if (offsetsCache.remove(key) != null)
- trace("Removed offset for %s due to tombstone entry.".format(key))
- else
- trace("Ignoring redundant tombstone for %s.".format(key))
- } else {
- // special handling for version 0:
- // set the expiration time stamp as commit time stamp + server default retention time
- val value = OffsetManager.readMessageValue(msgAndOffset.message.payload)
- putOffset(key, value.copy (
- expireTimestamp = {
- if (value.expireTimestamp == org.apache.kafka.common.requests.OffsetCommitRequest.DEFAULT_TIMESTAMP)
- value.commitTimestamp + config.offsetsRetentionMs
- else
- value.expireTimestamp
- }
- ))
- trace("Loaded offset %s for %s.".format(value, key))
- }
- currOffset = msgAndOffset.nextOffset
- }
- }
- }
-
- if (!shuttingDown.get())
- info("Finished loading offsets from %s in %d milliseconds."
- .format(topicPartition, SystemTime.milliseconds - startMs))
- case None =>
- warn("No log found for " + topicPartition)
- }
- }
- catch {
- case t: Throwable =>
- error("Error in loading offsets from " + topicPartition, t)
- }
- finally {
- loadingPartitions synchronized loadingPartitions.remove(offsetsPartition)
- }
- }
- }
-
- private def getHighWatermark(partitionId: Int): Long = {
- val partitionOpt = replicaManager.getPartition(GroupCoordinator.OffsetsTopicName, partitionId)
-
- val hw = partitionOpt.map { partition =>
- partition.leaderReplicaIfLocal().map(_.highWatermark.messageOffset).getOrElse(-1L)
- }.getOrElse(-1L)
-
- hw
- }
-
- def leaderIsLocal(partition: Int) = { getHighWatermark(partition) != -1L }
-
- /**
- * When this broker becomes a follower for an offsets topic partition clear out the cache for groups that belong to
- * that partition.
- * @param offsetsPartition Groups belonging to this partition of the offsets topic will be deleted from the cache.
- */
- def removeOffsetsFromCacheForPartition(offsetsPartition: Int) {
- var numRemoved = 0
- followerTransitionLock synchronized {
- offsetsCache.keys.foreach { key =>
- if (partitionFor(key.group) == offsetsPartition) {
- offsetsCache.remove(key)
- numRemoved += 1
- }
- }
- }
-
- if (numRemoved > 0) info("Removed %d cached offsets for %s on follower transition."
- .format(numRemoved, TopicAndPartition(GroupCoordinator.OffsetsTopicName, offsetsPartition)))
- }
-
- def shutdown() {
- shuttingDown.set(true)
- }
-
- /**
- * Gets the partition count of the offsets topic from ZooKeeper.
- * If the topic does not exist, the configured partition count is returned.
- */
- private def getOffsetsTopicPartitionCount = {
- val topic = GroupCoordinator.OffsetsTopicName
- val topicData = zkUtils.getPartitionAssignmentForTopics(Seq(topic))
- if (topicData(topic).nonEmpty)
- topicData(topic).size
- else
- config.offsetsTopicNumPartitions
- }
-}
-
-object OffsetManager {
-
- private case class KeyAndValueSchemas(keySchema: Schema, valueSchema: Schema)
-
- private val CURRENT_OFFSET_SCHEMA_VERSION = 1.toShort
-
- private val OFFSET_COMMIT_KEY_SCHEMA_V0 = new Schema(new Field("group", STRING),
- new Field("topic", STRING),
- new Field("partition", INT32))
- private val KEY_GROUP_FIELD = OFFSET_COMMIT_KEY_SCHEMA_V0.get("group")
- private val KEY_TOPIC_FIELD = OFFSET_COMMIT_KEY_SCHEMA_V0.get("topic")
- private val KEY_PARTITION_FIELD = OFFSET_COMMIT_KEY_SCHEMA_V0.get("partition")
-
- private val OFFSET_COMMIT_VALUE_SCHEMA_V0 = new Schema(new Field("offset", INT64),
- new Field("metadata", STRING, "Associated metadata.", ""),
- new Field("timestamp", INT64))
-
- private val OFFSET_COMMIT_VALUE_SCHEMA_V1 = new Schema(new Field("offset", INT64),
- new Field("metadata", STRING, "Associated metadata.", ""),
- new Field("commit_timestamp", INT64),
- new Field("expire_timestamp", INT64))
-
- private val VALUE_OFFSET_FIELD_V0 = OFFSET_COMMIT_VALUE_SCHEMA_V0.get("offset")
- private val VALUE_METADATA_FIELD_V0 = OFFSET_COMMIT_VALUE_SCHEMA_V0.get("metadata")
- private val VALUE_TIMESTAMP_FIELD_V0 = OFFSET_COMMIT_VALUE_SCHEMA_V0.get("timestamp")
-
- private val VALUE_OFFSET_FIELD_V1 = OFFSET_COMMIT_VALUE_SCHEMA_V1.get("offset")
- private val VALUE_METADATA_FIELD_V1 = OFFSET_COMMIT_VALUE_SCHEMA_V1.get("metadata")
- private val VALUE_COMMIT_TIMESTAMP_FIELD_V1 = OFFSET_COMMIT_VALUE_SCHEMA_V1.get("commit_timestamp")
- private val VALUE_EXPIRE_TIMESTAMP_FIELD_V1 = OFFSET_COMMIT_VALUE_SCHEMA_V1.get("expire_timestamp")
-
- // map of versions to schemas
- private val OFFSET_SCHEMAS = Map(0 -> KeyAndValueSchemas(OFFSET_COMMIT_KEY_SCHEMA_V0, OFFSET_COMMIT_VALUE_SCHEMA_V0),
- 1 -> KeyAndValueSchemas(OFFSET_COMMIT_KEY_SCHEMA_V0, OFFSET_COMMIT_VALUE_SCHEMA_V1))
-
- private val CURRENT_SCHEMA = schemaFor(CURRENT_OFFSET_SCHEMA_VERSION)
-
- private def schemaFor(version: Int) = {
- val schemaOpt = OFFSET_SCHEMAS.get(version)
- schemaOpt match {
- case Some(schema) => schema
- case _ => throw new KafkaException("Unknown offset schema version " + version)
- }
- }
-
- /**
- * Generates the key for offset commit message for given (group, topic, partition)
- *
- * @return key for offset commit message
- */
- private def offsetCommitKey(group: String, topic: String, partition: Int, versionId: Short = 0): Array[Byte] = {
- val key = new Struct(CURRENT_SCHEMA.keySchema)
- key.set(KEY_GROUP_FIELD, group)
- key.set(KEY_TOPIC_FIELD, topic)
- key.set(KEY_PARTITION_FIELD, partition)
-
- val byteBuffer = ByteBuffer.allocate(2 /* version */ + key.sizeOf)
- byteBuffer.putShort(CURRENT_OFFSET_SCHEMA_VERSION)
- key.writeTo(byteBuffer)
- byteBuffer.array()
- }
-
- /**
- * Generates the payload for offset commit message from given offset and metadata
- *
- * @param offsetAndMetadata consumer's current offset and metadata
- * @return payload for offset commit message
- */
- private def offsetCommitValue(offsetAndMetadata: OffsetAndMetadata): Array[Byte] = {
- // generate commit value with schema version 1
- val value = new Struct(CURRENT_SCHEMA.valueSchema)
- value.set(VALUE_OFFSET_FIELD_V1, offsetAndMetadata.offset)
- value.set(VALUE_METADATA_FIELD_V1, offsetAndMetadata.metadata)
- value.set(VALUE_COMMIT_TIMESTAMP_FIELD_V1, offsetAndMetadata.commitTimestamp)
- value.set(VALUE_EXPIRE_TIMESTAMP_FIELD_V1, offsetAndMetadata.expireTimestamp)
- val byteBuffer = ByteBuffer.allocate(2 /* version */ + value.sizeOf)
- byteBuffer.putShort(CURRENT_OFFSET_SCHEMA_VERSION)
- value.writeTo(byteBuffer)
- byteBuffer.array()
- }
-
- /**
- * Decodes the offset messages' key
- *
- * @param buffer input byte-buffer
- * @return an GroupTopicPartition object
- */
- private def readMessageKey(buffer: ByteBuffer): GroupTopicPartition = {
- val version = buffer.getShort()
- val keySchema = schemaFor(version).keySchema
- val key = keySchema.read(buffer).asInstanceOf[Struct]
-
- val group = key.get(KEY_GROUP_FIELD).asInstanceOf[String]
- val topic = key.get(KEY_TOPIC_FIELD).asInstanceOf[String]
- val partition = key.get(KEY_PARTITION_FIELD).asInstanceOf[Int]
-
- GroupTopicPartition(group, TopicAndPartition(topic, partition))
- }
-
- /**
- * Decodes the offset messages' payload and retrieves offset and metadata from it
- *
- * @param buffer input byte-buffer
- * @return an offset-metadata object from the message
- */
- private def readMessageValue(buffer: ByteBuffer): OffsetAndMetadata = {
- val structAndVersion = readMessageValueStruct(buffer)
-
- if (structAndVersion.value == null) { // tombstone
- null
- } else {
- if (structAndVersion.version == 0) {
- val offset = structAndVersion.value.get(VALUE_OFFSET_FIELD_V0).asInstanceOf[Long]
- val metadata = structAndVersion.value.get(VALUE_METADATA_FIELD_V0).asInstanceOf[String]
- val timestamp = structAndVersion.value.get(VALUE_TIMESTAMP_FIELD_V0).asInstanceOf[Long]
-
- OffsetAndMetadata(offset, metadata, timestamp)
- } else if (structAndVersion.version == 1) {
- val offset = structAndVersion.value.get(VALUE_OFFSET_FIELD_V1).asInstanceOf[Long]
- val metadata = structAndVersion.value.get(VALUE_METADATA_FIELD_V1).asInstanceOf[String]
- val commitTimestamp = structAndVersion.value.get(VALUE_COMMIT_TIMESTAMP_FIELD_V1).asInstanceOf[Long]
- val expireTimestamp = structAndVersion.value.get(VALUE_EXPIRE_TIMESTAMP_FIELD_V1).asInstanceOf[Long]
-
- OffsetAndMetadata(offset, metadata, commitTimestamp, expireTimestamp)
- } else {
- throw new IllegalStateException("Unknown offset message version")
- }
- }
- }
-
- private def readMessageValueStruct(buffer: ByteBuffer): MessageValueStructAndVersion = {
- if(buffer == null) { // tombstone
- MessageValueStructAndVersion(null, -1)
- } else {
- val version = buffer.getShort()
- val valueSchema = schemaFor(version).valueSchema
- val value = valueSchema.read(buffer).asInstanceOf[Struct]
-
- MessageValueStructAndVersion(value, version)
- }
- }
-
- // Formatter for use with tools such as console consumer: Consumer should also set exclude.internal.topics to false.
- // (specify --formatter "kafka.server.OffsetManager\$OffsetsMessageFormatter" when consuming __consumer_offsets)
- class OffsetsMessageFormatter extends MessageFormatter {
- def writeTo(key: Array[Byte], value: Array[Byte], output: PrintStream) {
- val formattedKey = if (key == null) "NULL" else OffsetManager.readMessageKey(ByteBuffer.wrap(key)).toString
- val formattedValue = if (value == null) "NULL" else OffsetManager.readMessageValueStruct(ByteBuffer.wrap(value)).value.toString
- output.write(formattedKey.getBytes)
- output.write("::".getBytes)
- output.write(formattedValue.getBytes)
- output.write("\n".getBytes)
- }
- }
-
-}
-
-case class MessageValueStructAndVersion(value: Struct, version: Short)
-
-case class GroupTopicPartition(group: String, topicPartition: TopicAndPartition) {
-
- def this(group: String, topic: String, partition: Int) =
- this(group, new TopicAndPartition(topic, partition))
-
- override def toString =
- "[%s,%s,%d]".format(group, topicPartition.topic, topicPartition.partition)
-
-}
http://git-wip-us.apache.org/repos/asf/kafka/blob/7c334752/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
index b1db12a..3d484b8 100644
--- a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
@@ -148,7 +148,7 @@ class AuthorizerIntegrationTest extends KafkaServerTestHarness {
consumers += TestUtils.createNewConsumer(TestUtils.getBrokerListStrFromServers(servers), groupId = group, securityProtocol = SecurityProtocol.PLAINTEXT)
// create the consumer offset topic
- TestUtils.createTopic(zkUtils, GroupCoordinator.OffsetsTopicName,
+ TestUtils.createTopic(zkUtils, GroupCoordinator.GroupMetadataTopicName,
1,
1,
servers,
http://git-wip-us.apache.org/repos/asf/kafka/blob/7c334752/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala b/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala
index db4295c..9487c77 100644
--- a/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala
+++ b/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala
@@ -193,7 +193,7 @@ abstract class BaseConsumerTest extends IntegrationTestHarness with Logging {
// get metadata for the topic
var parts: Seq[PartitionInfo] = null
while (parts == null)
- parts = consumer0.partitionsFor(GroupCoordinator.OffsetsTopicName).asScala
+ parts = consumer0.partitionsFor(GroupCoordinator.GroupMetadataTopicName).asScala
assertEquals(1, parts.size)
assertNotNull(parts(0).leader())
http://git-wip-us.apache.org/repos/asf/kafka/blob/7c334752/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala b/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala
index bbc9a54..b7ecc9d 100644
--- a/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala
+++ b/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala
@@ -69,7 +69,7 @@ trait IntegrationTestHarness extends KafkaServerTestHarness {
}
// create the consumer offset topic
- TestUtils.createTopic(zkUtils, GroupCoordinator.OffsetsTopicName,
+ TestUtils.createTopic(zkUtils, GroupCoordinator.GroupMetadataTopicName,
serverConfig.getProperty(KafkaConfig.OffsetsTopicPartitionsProp).toInt,
serverConfig.getProperty(KafkaConfig.OffsetsTopicReplicationFactorProp).toInt,
servers,
http://git-wip-us.apache.org/repos/asf/kafka/blob/7c334752/core/src/test/scala/unit/kafka/admin/TopicCommandTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/admin/TopicCommandTest.scala b/core/src/test/scala/unit/kafka/admin/TopicCommandTest.scala
index 820a825..f14e25a 100644
--- a/core/src/test/scala/unit/kafka/admin/TopicCommandTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/TopicCommandTest.scala
@@ -85,12 +85,12 @@ class TopicCommandTest extends ZooKeeperTestHarness with Logging {
// create the offset topic
val createOffsetTopicOpts = new TopicCommandOptions(Array("--partitions", numPartitionsOriginal.toString,
"--replication-factor", "1",
- "--topic", GroupCoordinator.OffsetsTopicName))
+ "--topic", GroupCoordinator.GroupMetadataTopicName))
TopicCommand.createTopic(zkUtils, createOffsetTopicOpts)
- // try to delete the OffsetManager.OffsetsTopicName and make sure it doesn't
- val deleteOffsetTopicOpts = new TopicCommandOptions(Array("--topic", GroupCoordinator.OffsetsTopicName))
- val deleteOffsetTopicPath = getDeleteTopicPath(GroupCoordinator.OffsetsTopicName)
+ // try to delete the GroupCoordinator.GroupMetadataTopicName and make sure it doesn't
+ val deleteOffsetTopicOpts = new TopicCommandOptions(Array("--topic", GroupCoordinator.GroupMetadataTopicName))
+ val deleteOffsetTopicPath = getDeleteTopicPath(GroupCoordinator.GroupMetadataTopicName)
assertFalse("Delete path for topic shouldn't exist before deletion.", zkUtils.zkClient.exists(deleteOffsetTopicPath))
intercept[AdminOperationException] {
TopicCommand.deleteTopic(zkUtils, deleteOffsetTopicOpts)
http://git-wip-us.apache.org/repos/asf/kafka/blob/7c334752/core/src/test/scala/unit/kafka/consumer/TopicFilterTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/consumer/TopicFilterTest.scala b/core/src/test/scala/unit/kafka/consumer/TopicFilterTest.scala
index 24fba45..1e8d04e 100644
--- a/core/src/test/scala/unit/kafka/consumer/TopicFilterTest.scala
+++ b/core/src/test/scala/unit/kafka/consumer/TopicFilterTest.scala
@@ -21,7 +21,6 @@ package kafka.consumer
import org.junit.Assert._
import org.scalatest.junit.JUnitSuite
import org.junit.Test
-import kafka.server.OffsetManager
import kafka.coordinator.GroupCoordinator
@@ -38,8 +37,8 @@ class TopicFilterTest extends JUnitSuite {
val topicFilter2 = new Whitelist(".+")
assertTrue(topicFilter2.isTopicAllowed("alltopics", excludeInternalTopics = true))
- assertFalse(topicFilter2.isTopicAllowed(GroupCoordinator.OffsetsTopicName, excludeInternalTopics = true))
- assertTrue(topicFilter2.isTopicAllowed(GroupCoordinator.OffsetsTopicName, excludeInternalTopics = false))
+ assertFalse(topicFilter2.isTopicAllowed(GroupCoordinator.GroupMetadataTopicName, excludeInternalTopics = true))
+ assertTrue(topicFilter2.isTopicAllowed(GroupCoordinator.GroupMetadataTopicName, excludeInternalTopics = false))
val topicFilter3 = new Whitelist("white_listed-topic.+")
assertTrue(topicFilter3.isTopicAllowed("white_listed-topic1", excludeInternalTopics = true))
@@ -58,8 +57,8 @@ class TopicFilterTest extends JUnitSuite {
assertFalse(topicFilter1.isTopicAllowed("black1", excludeInternalTopics = true))
assertFalse(topicFilter1.isTopicAllowed("black1", excludeInternalTopics = false))
- assertFalse(topicFilter1.isTopicAllowed(GroupCoordinator.OffsetsTopicName, excludeInternalTopics = true))
- assertTrue(topicFilter1.isTopicAllowed(GroupCoordinator.OffsetsTopicName, excludeInternalTopics = false))
+ assertFalse(topicFilter1.isTopicAllowed(GroupCoordinator.GroupMetadataTopicName, excludeInternalTopics = true))
+ assertTrue(topicFilter1.isTopicAllowed(GroupCoordinator.GroupMetadataTopicName, excludeInternalTopics = false))
}
@Test
http://git-wip-us.apache.org/repos/asf/kafka/blob/7c334752/core/src/test/scala/unit/kafka/coordinator/CoordinatorMetadataTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/coordinator/CoordinatorMetadataTest.scala b/core/src/test/scala/unit/kafka/coordinator/CoordinatorMetadataTest.scala
deleted file mode 100644
index 49a237b..0000000
--- a/core/src/test/scala/unit/kafka/coordinator/CoordinatorMetadataTest.scala
+++ /dev/null
@@ -1,71 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kafka.coordinator
-
-import kafka.server.KafkaConfig
-import kafka.utils.TestUtils
-
-import org.junit.Assert._
-import org.junit.{Before, Test}
-import org.scalatest.junit.JUnitSuite
-
-/**
- * Test coordinator group and topic metadata management
- */
-class CoordinatorMetadataTest extends JUnitSuite {
- val DefaultNumPartitions = 8
- val DefaultNumReplicas = 2
- var coordinatorMetadata: CoordinatorMetadata = null
-
- @Before
- def setUp() {
- val props = TestUtils.createBrokerConfig(nodeId = 0, zkConnect = "")
- coordinatorMetadata = new CoordinatorMetadata(KafkaConfig.fromProps(props).brokerId)
- }
-
- @Test
- def testGetNonexistentGroup() {
- assertNull(coordinatorMetadata.getGroup("group"))
- }
-
- @Test
- def testGetGroup() {
- val groupId = "group"
- val protocolType = "consumer"
- val expected = coordinatorMetadata.addGroup(groupId, protocolType)
- val actual = coordinatorMetadata.getGroup(groupId)
- assertEquals(expected, actual)
- }
-
- @Test
- def testAddGroupReturnsPreexistingGroupIfItAlreadyExists() {
- val groupId = "group"
- val protocolType = "consumer"
- val group1 = coordinatorMetadata.addGroup(groupId, protocolType)
- val group2 = coordinatorMetadata.addGroup(groupId, protocolType)
- assertEquals(group1, group2)
- }
-
- @Test(expected = classOf[IllegalArgumentException])
- def testRemoveNonexistentGroup() {
- val groupId = "group"
- val topics = Set("a")
- coordinatorMetadata.removeGroup(groupId)
- }
-
-}