You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gw...@apache.org on 2016/03/17 20:33:51 UTC
kafka git commit: KAFKA-2832: Add a consumer config option to exclude
internal topics
Repository: kafka
Updated Branches:
refs/heads/trunk dce06766d -> 30e78fa00
KAFKA-2832: Add a consumer config option to exclude internal topics
A new consumer config option 'exclude.internal.topics' was added to
allow excluding internal topics when wildcards are used to specify
consumers.
The new option takes a boolean value, with a default 'false' value (i.e.
no exclusion).
This patch is co-authored with rajinisivaram edoardocomar mimaison
Author: edoardo <ec...@uk.ibm.com>
Author: Vahid Hashemian <va...@us.ibm.com>
Reviewers: Ismael Juma, Jun Rao, Gwen Shapira
Closes #1082 from edoardocomar/KAFKA-2832
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/30e78fa0
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/30e78fa0
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/30e78fa0
Branch: refs/heads/trunk
Commit: 30e78fa00650b258f3ab5ef6c9bdf5ca137289c0
Parents: dce0676
Author: edoardo <ec...@uk.ibm.com>
Authored: Thu Mar 17 12:33:47 2016 -0700
Committer: Gwen Shapira <cs...@gmail.com>
Committed: Thu Mar 17 12:33:47 2016 -0700
----------------------------------------------------------------------
.../kafka/clients/consumer/ConsumerConfig.java | 11 ++++++
.../kafka/clients/consumer/KafkaConsumer.java | 3 +-
.../consumer/internals/ConsumerCoordinator.java | 9 +++--
.../kafka/common/internals/TopicConstants.java | 33 +++++++++++++++++
.../internals/ConsumerCoordinatorTest.java | 37 ++++++++++++++++----
.../main/scala/kafka/admin/TopicCommand.scala | 7 ++--
core/src/main/scala/kafka/common/Topic.scala | 2 --
.../main/scala/kafka/consumer/TopicFilter.scala | 5 +--
.../kafka/coordinator/GroupCoordinator.scala | 3 --
.../coordinator/GroupMetadataManager.scala | 30 ++++++++--------
.../src/main/scala/kafka/server/KafkaApis.scala | 11 +++---
.../scala/kafka/server/ReplicaManager.scala | 5 ++-
.../kafka/api/AuthorizerIntegrationTest.scala | 5 ++-
.../kafka/api/BaseConsumerTest.scala | 7 ++--
.../kafka/api/IntegrationTestHarness.scala | 4 +--
.../kafka/api/ProducerFailureHandlingTest.scala | 4 +--
.../unit/kafka/admin/TopicCommandTest.scala | 9 ++---
.../unit/kafka/consumer/TopicFilterTest.scala | 9 ++---
.../GroupCoordinatorResponseTest.scala | 14 ++++----
19 files changed, 135 insertions(+), 73 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/30e78fa0/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
index bd9efc3..9101307 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
@@ -172,6 +172,12 @@ public class ConsumerConfig extends AbstractConfig {
public static final String MAX_POLL_RECORDS_CONFIG = "max.poll.records";
private static final String MAX_POLL_RECORDS_DOC = "The maximum number of records returned in a single call to poll().";
+ /** <code>exclude.internal.topics</code> */
+ public static final String EXCLUDE_INTERNAL_TOPICS_CONFIG = "exclude.internal.topics";
+ private static final String EXCLUDE_INTERNAL_TOPICS_DOC = "Whether records from internal topics (such as offsets) should be exposed to the consumer. "
+ + "If set to <code>true</code> the only way to receive records from an internal topic is subscribing to it.";
+ public static final boolean EXCLUDE_INTERNAL_TOPICS_DEFAULT = true;
+
static {
CONFIG = new ConfigDef().define(BOOTSTRAP_SERVERS_CONFIG,
Type.LIST,
@@ -316,6 +322,11 @@ public class ConsumerConfig extends AbstractConfig {
atLeast(1),
Importance.MEDIUM,
MAX_POLL_RECORDS_DOC)
+ .define(EXCLUDE_INTERNAL_TOPICS_CONFIG,
+ Type.BOOLEAN,
+ EXCLUDE_INTERNAL_TOPICS_DEFAULT,
+ Importance.MEDIUM,
+ EXCLUDE_INTERNAL_TOPICS_DOC)
// security support
.define(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG,
http://git-wip-us.apache.org/repos/asf/kafka/blob/30e78fa0/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
index d7c8e14..804a160 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
@@ -612,7 +612,8 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
new ConsumerCoordinator.DefaultOffsetCommitCallback(),
config.getBoolean(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG),
config.getLong(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG),
- this.interceptors);
+ this.interceptors,
+ config.getBoolean(ConsumerConfig.EXCLUDE_INTERNAL_TOPICS_CONFIG));
if (keyDeserializer == null) {
this.keyDeserializer = config.getConfiguredInstance(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
Deserializer.class);
http://git-wip-us.apache.org/repos/asf/kafka/blob/30e78fa0/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
index 2ae1437..cf93530 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
@@ -26,6 +26,7 @@ import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.GroupAuthorizationException;
import org.apache.kafka.common.errors.TopicAuthorizationException;
import org.apache.kafka.common.errors.WakeupException;
+import org.apache.kafka.common.internals.TopicConstants;
import org.apache.kafka.common.metrics.Measurable;
import org.apache.kafka.common.metrics.MetricConfig;
import org.apache.kafka.common.metrics.Metrics;
@@ -69,6 +70,7 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
private final boolean autoCommitEnabled;
private final AutoCommitTask autoCommitTask;
private final ConsumerInterceptors<?, ?> interceptors;
+ private final boolean excludeInternalTopics;
/**
* Initialize the coordination manager.
@@ -87,7 +89,8 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
OffsetCommitCallback defaultOffsetCommitCallback,
boolean autoCommitEnabled,
long autoCommitIntervalMs,
- ConsumerInterceptors<?, ?> interceptors) {
+ ConsumerInterceptors<?, ?> interceptors,
+ boolean excludeInternalTopics) {
super(client,
groupId,
sessionTimeoutMs,
@@ -110,6 +113,7 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
this.autoCommitTask = autoCommitEnabled ? new AutoCommitTask(autoCommitIntervalMs) : null;
this.sensors = new ConsumerCoordinatorMetrics(metrics, metricGrpPrefix);
this.interceptors = interceptors;
+ this.excludeInternalTopics = excludeInternalTopics;
}
@Override
@@ -140,7 +144,8 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
final List<String> topicsToSubscribe = new ArrayList<>();
for (String topic : cluster.topics())
- if (subscriptions.getSubscribedPattern().matcher(topic).matches())
+ if (subscriptions.getSubscribedPattern().matcher(topic).matches() &&
+ !(excludeInternalTopics && TopicConstants.INTERNAL_TOPICS.contains(topic)))
topicsToSubscribe.add(topic);
subscriptions.changeSubscription(topicsToSubscribe);
http://git-wip-us.apache.org/repos/asf/kafka/blob/30e78fa0/clients/src/main/java/org/apache/kafka/common/internals/TopicConstants.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/internals/TopicConstants.java b/clients/src/main/java/org/apache/kafka/common/internals/TopicConstants.java
new file mode 100644
index 0000000..5d6b992
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/internals/TopicConstants.java
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.internals;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+
+public final class TopicConstants {
+
+ //avoid instantiation
+ private TopicConstants() {
+ }
+
+ // TODO: we store both group metadata and offset data here despite the topic name being offsets only
+ public static final String GROUP_METADATA_TOPIC_NAME = "__consumer_offsets";
+ public static final Collection<String> INTERNAL_TOPICS = Collections.unmodifiableSet(new HashSet<String>(Arrays.asList(GROUP_METADATA_TOPIC_NAME)));
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/30e78fa0/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
index 0b8a162..260ee7a 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
@@ -20,6 +20,7 @@ import org.apache.kafka.clients.ClientRequest;
import org.apache.kafka.clients.Metadata;
import org.apache.kafka.clients.MockClient;
import org.apache.kafka.clients.consumer.CommitFailedException;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.OffsetCommitCallback;
@@ -34,6 +35,7 @@ import org.apache.kafka.common.errors.ApiException;
import org.apache.kafka.common.errors.DisconnectException;
import org.apache.kafka.common.errors.GroupAuthorizationException;
import org.apache.kafka.common.errors.OffsetMetadataTooLarge;
+import org.apache.kafka.common.internals.TopicConstants;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.protocol.types.Struct;
@@ -63,6 +65,7 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.regex.Pattern;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
@@ -107,7 +110,7 @@ public class ConsumerCoordinatorTest {
this.partitionAssignor.clear();
client.setNode(node);
- this.coordinator = buildCoordinator(metrics, assignors);
+ this.coordinator = buildCoordinator(metrics, assignors, ConsumerConfig.EXCLUDE_INTERNAL_TOPICS_DEFAULT);
}
@After
@@ -263,7 +266,7 @@ public class ConsumerCoordinatorTest {
}
@Test(expected = ApiException.class)
- public void testJoinGroupInvalidGroupId() {
+ public void testJoinGroupInvalidGroupId() {
final String consumerId = "leader";
subscriptions.subscribe(Arrays.asList(topicName), rebalanceListener);
@@ -509,7 +512,7 @@ public class ConsumerCoordinatorTest {
}
@Test
- public void testMetadataChangeTriggersRebalance() {
+ public void testMetadataChangeTriggersRebalance() {
final String consumerId = "consumer";
subscriptions.subscribe(Arrays.asList(topicName), rebalanceListener);
@@ -533,6 +536,25 @@ public class ConsumerCoordinatorTest {
}
@Test
+ public void testExcludeInternalTopicsConfigOption() {
+ subscriptions.subscribe(Pattern.compile(".*"), rebalanceListener);
+
+ metadata.update(TestUtils.singletonCluster(TopicConstants.GROUP_METADATA_TOPIC_NAME, 2), time.milliseconds());
+
+ assertFalse(subscriptions.partitionAssignmentNeeded());
+ }
+
+ @Test
+ public void testIncludeInternalTopicsConfigOption() {
+ coordinator = buildCoordinator(new Metrics(), assignors, false);
+ subscriptions.subscribe(Pattern.compile(".*"), rebalanceListener);
+
+ metadata.update(TestUtils.singletonCluster(TopicConstants.GROUP_METADATA_TOPIC_NAME, 2), time.milliseconds());
+
+ assertTrue(subscriptions.partitionAssignmentNeeded());
+ }
+
+ @Test
public void testRejoinGroup() {
subscriptions.subscribe(Arrays.asList(topicName), rebalanceListener);
subscriptions.needReassignment();
@@ -882,7 +904,7 @@ public class ConsumerCoordinatorTest {
RangeAssignor range = new RangeAssignor();
try (Metrics metrics = new Metrics(time)) {
- ConsumerCoordinator coordinator = buildCoordinator(metrics, Arrays.<PartitionAssignor>asList(roundRobin, range));
+ ConsumerCoordinator coordinator = buildCoordinator(metrics, Arrays.<PartitionAssignor>asList(roundRobin, range), ConsumerConfig.EXCLUDE_INTERNAL_TOPICS_DEFAULT);
List<ProtocolMetadata> metadata = coordinator.metadata();
assertEquals(2, metadata.size());
assertEquals(roundRobin.name(), metadata.get(0).name());
@@ -890,7 +912,7 @@ public class ConsumerCoordinatorTest {
}
try (Metrics metrics = new Metrics(time)) {
- ConsumerCoordinator coordinator = buildCoordinator(metrics, Arrays.<PartitionAssignor>asList(range, roundRobin));
+ ConsumerCoordinator coordinator = buildCoordinator(metrics, Arrays.<PartitionAssignor>asList(range, roundRobin), ConsumerConfig.EXCLUDE_INTERNAL_TOPICS_DEFAULT);
List<ProtocolMetadata> metadata = coordinator.metadata();
assertEquals(2, metadata.size());
assertEquals(range.name(), metadata.get(0).name());
@@ -898,7 +920,7 @@ public class ConsumerCoordinatorTest {
}
}
- private ConsumerCoordinator buildCoordinator(Metrics metrics, List<PartitionAssignor> assignors) {
+ private ConsumerCoordinator buildCoordinator(Metrics metrics, List<PartitionAssignor> assignors, boolean excludeInternalTopics) {
return new ConsumerCoordinator(
consumerClient,
groupId,
@@ -914,7 +936,8 @@ public class ConsumerCoordinatorTest {
defaultOffsetCommitCallback,
autoCommitEnabled,
autoCommitIntervalMs,
- null);
+ null,
+ excludeInternalTopics);
}
private Struct consumerMetadataResponse(Node node, short error) {
http://git-wip-us.apache.org/repos/asf/kafka/blob/30e78fa0/core/src/main/scala/kafka/admin/TopicCommand.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/TopicCommand.scala b/core/src/main/scala/kafka/admin/TopicCommand.scala
index e89e09d..b3b0635 100755
--- a/core/src/main/scala/kafka/admin/TopicCommand.scala
+++ b/core/src/main/scala/kafka/admin/TopicCommand.scala
@@ -18,7 +18,6 @@
package kafka.admin
import java.util.Properties
-
import joptsimple._
import kafka.common.{AdminCommandFailedException, Topic, TopicExistsException}
import kafka.consumer.{ConsumerConfig, Whitelist}
@@ -30,9 +29,9 @@ import kafka.utils._
import org.I0Itec.zkclient.exception.ZkNodeExistsException
import org.apache.kafka.common.security.JaasUtils
import org.apache.kafka.common.utils.Utils
-
import scala.collection.JavaConversions._
import scala.collection._
+import org.apache.kafka.common.internals.TopicConstants
object TopicCommand extends Logging {
@@ -138,7 +137,7 @@ object TopicCommand extends Logging {
}
if(opts.options.has(opts.partitionsOpt)) {
- if (topic == GroupCoordinator.GroupMetadataTopicName) {
+ if (topic == TopicConstants.GROUP_METADATA_TOPIC_NAME) {
throw new IllegalArgumentException("The number of partitions for the offsets topic cannot be changed.")
}
println("WARNING: If partitions are increased for a topic that has a key, the partition " +
@@ -171,7 +170,7 @@ object TopicCommand extends Logging {
}
topics.foreach { topic =>
try {
- if (Topic.InternalTopics.contains(topic)) {
+ if (TopicConstants.INTERNAL_TOPICS.contains(topic)) {
throw new AdminOperationException("Topic %s is a kafka internal topic and is not allowed to be marked for deletion.".format(topic))
} else {
zkUtils.createPersistentPath(getDeleteTopicPath(topic))
http://git-wip-us.apache.org/repos/asf/kafka/blob/30e78fa0/core/src/main/scala/kafka/common/Topic.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/common/Topic.scala b/core/src/main/scala/kafka/common/Topic.scala
index 55d2bdb..930d0e4 100644
--- a/core/src/main/scala/kafka/common/Topic.scala
+++ b/core/src/main/scala/kafka/common/Topic.scala
@@ -25,8 +25,6 @@ object Topic {
private val maxNameLength = 255
private val rgx = new Regex(legalChars + "+")
- val InternalTopics = Set(GroupCoordinator.GroupMetadataTopicName)
-
def validate(topic: String) {
if (topic.length <= 0)
throw new org.apache.kafka.common.errors.InvalidTopicException("topic name is illegal, can't be empty")
http://git-wip-us.apache.org/repos/asf/kafka/blob/30e78fa0/core/src/main/scala/kafka/consumer/TopicFilter.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/TopicFilter.scala b/core/src/main/scala/kafka/consumer/TopicFilter.scala
index 5a13540..b89968e 100644
--- a/core/src/main/scala/kafka/consumer/TopicFilter.scala
+++ b/core/src/main/scala/kafka/consumer/TopicFilter.scala
@@ -21,6 +21,7 @@ package kafka.consumer
import kafka.utils.Logging
import java.util.regex.{PatternSyntaxException, Pattern}
import kafka.common.Topic
+import org.apache.kafka.common.internals.TopicConstants
sealed abstract class TopicFilter(rawRegex: String) extends Logging {
@@ -47,7 +48,7 @@ sealed abstract class TopicFilter(rawRegex: String) extends Logging {
case class Whitelist(rawRegex: String) extends TopicFilter(rawRegex) {
override def isTopicAllowed(topic: String, excludeInternalTopics: Boolean) = {
- val allowed = topic.matches(regex) && !(Topic.InternalTopics.contains(topic) && excludeInternalTopics)
+ val allowed = topic.matches(regex) && !(TopicConstants.INTERNAL_TOPICS.contains(topic) && excludeInternalTopics)
debug("%s %s".format(
topic, if (allowed) "allowed" else "filtered"))
@@ -60,7 +61,7 @@ case class Whitelist(rawRegex: String) extends TopicFilter(rawRegex) {
case class Blacklist(rawRegex: String) extends TopicFilter(rawRegex) {
override def isTopicAllowed(topic: String, excludeInternalTopics: Boolean) = {
- val allowed = (!topic.matches(regex)) && !(Topic.InternalTopics.contains(topic) && excludeInternalTopics)
+ val allowed = (!topic.matches(regex)) && !(TopicConstants.INTERNAL_TOPICS.contains(topic) && excludeInternalTopics)
debug("%s %s".format(
topic, if (allowed) "allowed" else "filtered"))
http://git-wip-us.apache.org/repos/asf/kafka/blob/30e78fa0/core/src/main/scala/kafka/coordinator/GroupCoordinator.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/GroupCoordinator.scala b/core/src/main/scala/kafka/coordinator/GroupCoordinator.scala
index 36d7bbb..30a3a78 100644
--- a/core/src/main/scala/kafka/coordinator/GroupCoordinator.scala
+++ b/core/src/main/scala/kafka/coordinator/GroupCoordinator.scala
@@ -725,9 +725,6 @@ object GroupCoordinator {
val EmptyGroup = GroupSummary(NoState, NoProtocolType, NoProtocol, NoMembers)
val DeadGroup = GroupSummary(Dead.toString, NoProtocolType, NoProtocol, NoMembers)
- // TODO: we store both group metadata and offset data here despite the topic name being offsets only
- val GroupMetadataTopicName = "__consumer_offsets"
-
def apply(config: KafkaConfig,
zkUtils: ZkUtils,
replicaManager: ReplicaManager,
http://git-wip-us.apache.org/repos/asf/kafka/blob/30e78fa0/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala b/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala
index 2c0236e..c6bc44e 100644
--- a/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala
+++ b/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala
@@ -18,7 +18,6 @@
package kafka.coordinator
import java.util.concurrent.locks.ReentrantReadWriteLock
-
import kafka.utils.CoreUtils._
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.protocol.Errors
@@ -40,14 +39,13 @@ import kafka.metrics.KafkaMetricsGroup
import kafka.common.TopicAndPartition
import kafka.common.MessageFormatter
import kafka.server.ReplicaManager
-
import scala.collection._
import java.io.PrintStream
import java.nio.ByteBuffer
import java.util.concurrent.atomic.AtomicBoolean
import java.util.concurrent.TimeUnit
-
import com.yammer.metrics.core.Gauge
+import org.apache.kafka.common.internals.TopicConstants
case class DelayedStore(messageSet: Map[TopicPartition, MessageSet],
callback: Map[TopicPartition, PartitionResponse] => Unit)
@@ -147,9 +145,9 @@ class GroupMetadataManager(val brokerId: Int,
val tombstone = new Message(bytes = null, key = GroupMetadataManager.groupMetadataKey(group.groupId),
timestamp = timestamp, magicValue = magicValue)
- val partitionOpt = replicaManager.getPartition(GroupCoordinator.GroupMetadataTopicName, groupPartition)
+ val partitionOpt = replicaManager.getPartition(TopicConstants.GROUP_METADATA_TOPIC_NAME, groupPartition)
partitionOpt.foreach { partition =>
- val appendPartition = TopicAndPartition(GroupCoordinator.GroupMetadataTopicName, groupPartition)
+ val appendPartition = TopicAndPartition(TopicConstants.GROUP_METADATA_TOPIC_NAME, groupPartition)
trace("Marking group %s as deleted.".format(group.groupId))
@@ -177,7 +175,7 @@ class GroupMetadataManager(val brokerId: Int,
timestamp = timestamp,
magicValue = magicValue)
- val groupMetadataPartition = new TopicPartition(GroupCoordinator.GroupMetadataTopicName, partitionFor(group.groupId))
+ val groupMetadataPartition = new TopicPartition(TopicConstants.GROUP_METADATA_TOPIC_NAME, partitionFor(group.groupId))
val groupMetadataMessageSet = Map(groupMetadataPartition ->
new ByteBufferMessageSet(config.offsetsTopicCompressionCodec, message))
@@ -263,7 +261,7 @@ class GroupMetadataManager(val brokerId: Int,
)
}.toSeq
- val offsetTopicPartition = new TopicPartition(GroupCoordinator.GroupMetadataTopicName, partitionFor(groupId))
+ val offsetTopicPartition = new TopicPartition(TopicConstants.GROUP_METADATA_TOPIC_NAME, partitionFor(groupId))
val offsetsAndMetadataMessageSet = Map(offsetTopicPartition ->
new ByteBufferMessageSet(config.offsetsTopicCompressionCodec, messages:_*))
@@ -351,7 +349,7 @@ class GroupMetadataManager(val brokerId: Int,
*/
def loadGroupsForPartition(offsetsPartition: Int,
onGroupLoaded: GroupMetadata => Unit) {
- val topicPartition = TopicAndPartition(GroupCoordinator.GroupMetadataTopicName, offsetsPartition)
+ val topicPartition = TopicAndPartition(TopicConstants.GROUP_METADATA_TOPIC_NAME, offsetsPartition)
scheduler.schedule(topicPartition.toString, loadGroupsAndOffsets)
def loadGroupsAndOffsets() {
@@ -470,7 +468,7 @@ class GroupMetadataManager(val brokerId: Int,
*/
def removeGroupsForPartition(offsetsPartition: Int,
onGroupUnloaded: GroupMetadata => Unit) {
- val topicPartition = TopicAndPartition(GroupCoordinator.GroupMetadataTopicName, offsetsPartition)
+ val topicPartition = TopicAndPartition(TopicConstants.GROUP_METADATA_TOPIC_NAME, offsetsPartition)
scheduler.schedule(topicPartition.toString, removeGroupsAndOffsets)
def removeGroupsAndOffsets() {
@@ -507,10 +505,10 @@ class GroupMetadataManager(val brokerId: Int,
}
if (numOffsetsRemoved > 0) info("Removed %d cached offsets for %s on follower transition."
- .format(numOffsetsRemoved, TopicAndPartition(GroupCoordinator.GroupMetadataTopicName, offsetsPartition)))
+ .format(numOffsetsRemoved, TopicAndPartition(TopicConstants.GROUP_METADATA_TOPIC_NAME, offsetsPartition)))
if (numGroupsRemoved > 0) info("Removed %d cached groups for %s on follower transition."
- .format(numGroupsRemoved, TopicAndPartition(GroupCoordinator.GroupMetadataTopicName, offsetsPartition)))
+ .format(numGroupsRemoved, TopicAndPartition(TopicConstants.GROUP_METADATA_TOPIC_NAME, offsetsPartition)))
}
}
@@ -566,9 +564,9 @@ class GroupMetadataManager(val brokerId: Int,
// Append the tombstone messages to the offset partitions. It is okay if the replicas don't receive these (say,
// if we crash or leaders move) since the new leaders will get rid of expired offsets during their own purge cycles.
tombstonesForPartition.flatMap { case (offsetsPartition, tombstones) =>
- val partitionOpt = replicaManager.getPartition(GroupCoordinator.GroupMetadataTopicName, offsetsPartition)
+ val partitionOpt = replicaManager.getPartition(TopicConstants.GROUP_METADATA_TOPIC_NAME, offsetsPartition)
partitionOpt.map { partition =>
- val appendPartition = TopicAndPartition(GroupCoordinator.GroupMetadataTopicName, offsetsPartition)
+ val appendPartition = TopicAndPartition(TopicConstants.GROUP_METADATA_TOPIC_NAME, offsetsPartition)
val messages = tombstones.map(_._2).toSeq
trace("Marked %d offsets in %s for deletion.".format(messages.size, appendPartition))
@@ -593,7 +591,7 @@ class GroupMetadataManager(val brokerId: Int,
}
private def getHighWatermark(partitionId: Int): Long = {
- val partitionOpt = replicaManager.getPartition(GroupCoordinator.GroupMetadataTopicName, partitionId)
+ val partitionOpt = replicaManager.getPartition(TopicConstants.GROUP_METADATA_TOPIC_NAME, partitionId)
val hw = partitionOpt.map { partition =>
partition.leaderReplicaIfLocal().map(_.highWatermark.messageOffset).getOrElse(-1L)
@@ -621,7 +619,7 @@ class GroupMetadataManager(val brokerId: Int,
* If the topic does not exist, the configured partition count is returned.
*/
private def getOffsetsTopicPartitionCount = {
- val topic = GroupCoordinator.GroupMetadataTopicName
+ val topic = TopicConstants.GROUP_METADATA_TOPIC_NAME
val topicData = zkUtils.getPartitionAssignmentForTopics(Seq(topic))
if (topicData(topic).nonEmpty)
topicData(topic).size
@@ -630,7 +628,7 @@ class GroupMetadataManager(val brokerId: Int,
}
private def getMessageFormatVersionAndTimestamp(partition: Int): (Byte, Long) = {
- val groupMetadataTopicAndPartition = new TopicAndPartition(GroupCoordinator.GroupMetadataTopicName, partition)
+ val groupMetadataTopicAndPartition = new TopicAndPartition(TopicConstants.GROUP_METADATA_TOPIC_NAME, partition)
val messageFormatVersion = replicaManager.getMessageFormatVersion(groupMetadataTopicAndPartition).getOrElse {
throw new IllegalArgumentException(s"Message format version for partition $groupMetadataTopicPartitionCount not found")
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/30e78fa0/core/src/main/scala/kafka/server/KafkaApis.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index 452f721..0fb4d74 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -45,6 +45,7 @@ MetadataRequest, MetadataResponse, OffsetCommitRequest, OffsetCommitResponse, Of
import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse
import org.apache.kafka.common.utils.Utils
import org.apache.kafka.common.{TopicPartition, Node}
+import org.apache.kafka.common.internals.TopicConstants
import scala.collection._
import scala.collection.JavaConverters._
@@ -129,11 +130,11 @@ class KafkaApis(val requestChannel: RequestChannel,
// this callback is invoked under the replica state change lock to ensure proper order of
// leadership changes
updatedLeaders.foreach { partition =>
- if (partition.topic == GroupCoordinator.GroupMetadataTopicName)
+ if (partition.topic == TopicConstants.GROUP_METADATA_TOPIC_NAME)
coordinator.handleGroupImmigration(partition.partitionId)
}
updatedFollowers.foreach { partition =>
- if (partition.topic == GroupCoordinator.GroupMetadataTopicName)
+ if (partition.topic == TopicConstants.GROUP_METADATA_TOPIC_NAME)
coordinator.handleGroupEmigration(partition.partitionId)
}
}
@@ -643,12 +644,12 @@ class KafkaApis(val requestChannel: RequestChannel,
Math.min(config.offsetsTopicReplicationFactor.toInt, aliveBrokers.length)
else
config.offsetsTopicReplicationFactor.toInt
- createTopic(GroupCoordinator.GroupMetadataTopicName, config.offsetsTopicPartitions,
+ createTopic(TopicConstants.GROUP_METADATA_TOPIC_NAME, config.offsetsTopicPartitions,
offsetsTopicReplicationFactor, coordinator.offsetsTopicConfigs)
}
private def getOrCreateGroupMetadataTopic(securityProtocol: SecurityProtocol): MetadataResponse.TopicMetadata = {
- val topicMetadata = metadataCache.getTopicMetadata(Set(GroupCoordinator.GroupMetadataTopicName), securityProtocol)
+ val topicMetadata = metadataCache.getTopicMetadata(Set(TopicConstants.GROUP_METADATA_TOPIC_NAME), securityProtocol)
topicMetadata.headOption.getOrElse(createGroupMetadataTopic())
}
@@ -659,7 +660,7 @@ class KafkaApis(val requestChannel: RequestChannel,
} else {
val nonExistentTopics = topics -- topicResponses.map(_.topic).toSet
val responsesForNonExistentTopics = nonExistentTopics.map { topic =>
- if (topic == GroupCoordinator.GroupMetadataTopicName) {
+ if (topic == TopicConstants.GROUP_METADATA_TOPIC_NAME) {
createGroupMetadataTopic()
} else if (config.autoCreateTopicsEnable) {
createTopic(topic, config.numPartitions, config.defaultReplicationFactor)
http://git-wip-us.apache.org/repos/asf/kafka/blob/30e78fa0/core/src/main/scala/kafka/server/ReplicaManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala
index de58e56..f050e27 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -19,7 +19,6 @@ package kafka.server
import java.io.{File, IOException}
import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.{AtomicBoolean, AtomicLong}
-
import com.yammer.metrics.core.Gauge
import kafka.api._
import kafka.cluster.{Partition, Replica}
@@ -38,9 +37,9 @@ import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.requests.{LeaderAndIsrRequest, StopReplicaRequest, UpdateMetadataRequest}
import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse
import org.apache.kafka.common.utils.{Time => JTime}
-
import scala.collection._
import scala.collection.JavaConverters._
+import org.apache.kafka.common.internals.TopicConstants
/*
* Result metadata of a log append operation on the log
@@ -395,7 +394,7 @@ class ReplicaManager(val config: KafkaConfig,
BrokerTopicStats.getBrokerAllTopicsStats().totalProduceRequestRate.mark()
// reject appending to internal topics if it is not allowed
- if (Topic.InternalTopics.contains(topicPartition.topic) && !internalTopicsAllowed) {
+ if (TopicConstants.INTERNAL_TOPICS.contains(topicPartition.topic) && !internalTopicsAllowed) {
(topicPartition, LogAppendResult(
LogAppendInfo.UnknownLogAppendInfo,
Some(new InvalidTopicException("Cannot append to internal topic %s".format(topicPartition.topic)))))
http://git-wip-us.apache.org/repos/asf/kafka/blob/30e78fa0/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
index b09c541..fad7657 100644
--- a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
@@ -17,7 +17,6 @@ import java.net.Socket
import java.nio.ByteBuffer
import java.util.concurrent.ExecutionException
import java.util.{ArrayList, Collections, Properties}
-
import kafka.cluster.EndPoint
import kafka.common.TopicAndPartition
import kafka.coordinator.GroupCoordinator
@@ -34,10 +33,10 @@ import org.apache.kafka.common.security.auth.KafkaPrincipal
import org.apache.kafka.common.{BrokerEndPoint, TopicPartition, requests}
import org.junit.Assert._
import org.junit.{After, Assert, Before, Test}
-
import scala.collection.JavaConverters._
import scala.collection.mutable
import scala.collection.mutable.Buffer
+import org.apache.kafka.common.internals.TopicConstants
class AuthorizerIntegrationTest extends KafkaServerTestHarness {
val topic = "topic"
@@ -143,7 +142,7 @@ class AuthorizerIntegrationTest extends KafkaServerTestHarness {
consumers += TestUtils.createNewConsumer(TestUtils.getBrokerListStrFromServers(servers), groupId = group, securityProtocol = SecurityProtocol.PLAINTEXT)
// create the consumer offset topic
- TestUtils.createTopic(zkUtils, GroupCoordinator.GroupMetadataTopicName,
+ TestUtils.createTopic(zkUtils, TopicConstants.GROUP_METADATA_TOPIC_NAME,
1,
1,
servers,
http://git-wip-us.apache.org/repos/asf/kafka/blob/30e78fa0/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala b/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala
index 684b38f..f576be5 100644
--- a/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala
+++ b/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala
@@ -13,23 +13,20 @@
package kafka.api
import java.util
-
import kafka.coordinator.GroupCoordinator
import org.apache.kafka.clients.consumer._
import org.apache.kafka.clients.producer.{ProducerConfig, ProducerRecord}
import org.apache.kafka.common.record.TimestampType
import org.apache.kafka.common.serialization.ByteArrayDeserializer
import org.apache.kafka.common.{PartitionInfo, TopicPartition}
-
import kafka.utils.{TestUtils, Logging, ShutdownableThread}
import kafka.server.KafkaConfig
-
import java.util.ArrayList
import org.junit.Assert._
import org.junit.{Before, Test}
-
import scala.collection.JavaConverters._
import scala.collection.mutable.Buffer
+import org.apache.kafka.common.internals.TopicConstants
/**
* Integration tests for the new consumer that cover basic usage as well as server failures
@@ -196,7 +193,7 @@ abstract class BaseConsumerTest extends IntegrationTestHarness with Logging {
// get metadata for the topic
var parts: Seq[PartitionInfo] = null
while (parts == null)
- parts = consumer0.partitionsFor(GroupCoordinator.GroupMetadataTopicName).asScala
+ parts = consumer0.partitionsFor(TopicConstants.GROUP_METADATA_TOPIC_NAME).asScala
assertEquals(1, parts.size)
assertNotNull(parts(0).leader())
http://git-wip-us.apache.org/repos/asf/kafka/blob/30e78fa0/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala b/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala
index b4f31c4..d0680b8 100644
--- a/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala
+++ b/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala
@@ -24,10 +24,10 @@ import java.util.Properties
import org.apache.kafka.clients.producer.KafkaProducer
import kafka.server.KafkaConfig
import kafka.integration.KafkaServerTestHarness
-
import org.junit.{After, Before}
import scala.collection.mutable.Buffer
import kafka.coordinator.GroupCoordinator
+import org.apache.kafka.common.internals.TopicConstants
/**
* A helper class for writing integration tests that involve producers, consumers, and servers
@@ -75,7 +75,7 @@ trait IntegrationTestHarness extends KafkaServerTestHarness {
}
// create the consumer offset topic
- TestUtils.createTopic(zkUtils, GroupCoordinator.GroupMetadataTopicName,
+ TestUtils.createTopic(zkUtils, TopicConstants.GROUP_METADATA_TOPIC_NAME,
serverConfig.getProperty(KafkaConfig.OffsetsTopicPartitionsProp).toInt,
serverConfig.getProperty(KafkaConfig.OffsetsTopicReplicationFactorProp).toInt,
servers,
http://git-wip-us.apache.org/repos/asf/kafka/blob/30e78fa0/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala b/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala
index 63a6b6f..2bb203d 100644
--- a/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala
+++ b/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala
@@ -19,7 +19,6 @@ package kafka.api
import java.util.concurrent.{ExecutionException, TimeUnit, TimeoutException}
import java.util.{Properties, Random}
-
import kafka.common.Topic
import kafka.consumer.SimpleConsumer
import kafka.integration.KafkaServerTestHarness
@@ -31,6 +30,7 @@ import org.apache.kafka.common.KafkaException
import org.apache.kafka.common.errors.{InvalidTopicException, NotEnoughReplicasAfterAppendException, NotEnoughReplicasException}
import org.junit.Assert._
import org.junit.{After, Before, Test}
+import org.apache.kafka.common.internals.TopicConstants
class ProducerFailureHandlingTest extends KafkaServerTestHarness {
private val producerBufferSize = 30000
@@ -198,7 +198,7 @@ class ProducerFailureHandlingTest extends KafkaServerTestHarness {
@Test
def testCannotSendToInternalTopic() {
val thrown = intercept[ExecutionException] {
- producer2.send(new ProducerRecord[Array[Byte],Array[Byte]](Topic.InternalTopics.head, "test".getBytes, "test".getBytes)).get
+ producer2.send(new ProducerRecord[Array[Byte],Array[Byte]](TopicConstants.INTERNAL_TOPICS.iterator.next, "test".getBytes, "test".getBytes)).get
}
assertTrue("Unexpected exception while sending to an invalid topic " + thrown.getCause, thrown.getCause.isInstanceOf[InvalidTopicException])
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/30e78fa0/core/src/test/scala/unit/kafka/admin/TopicCommandTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/admin/TopicCommandTest.scala b/core/src/test/scala/unit/kafka/admin/TopicCommandTest.scala
index b42aaf4..e0107da 100644
--- a/core/src/test/scala/unit/kafka/admin/TopicCommandTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/TopicCommandTest.scala
@@ -26,6 +26,7 @@ import kafka.server.ConfigType
import kafka.admin.TopicCommand.TopicCommandOptions
import kafka.utils.ZkUtils._
import kafka.coordinator.GroupCoordinator
+import org.apache.kafka.common.internals.TopicConstants
class TopicCommandTest extends ZooKeeperTestHarness with Logging with RackAwareTest {
@@ -86,12 +87,12 @@ class TopicCommandTest extends ZooKeeperTestHarness with Logging with RackAwareT
// create the offset topic
val createOffsetTopicOpts = new TopicCommandOptions(Array("--partitions", numPartitionsOriginal.toString,
"--replication-factor", "1",
- "--topic", GroupCoordinator.GroupMetadataTopicName))
+ "--topic", TopicConstants.GROUP_METADATA_TOPIC_NAME))
TopicCommand.createTopic(zkUtils, createOffsetTopicOpts)
- // try to delete the GroupCoordinator.GroupMetadataTopicName and make sure it doesn't
- val deleteOffsetTopicOpts = new TopicCommandOptions(Array("--topic", GroupCoordinator.GroupMetadataTopicName))
- val deleteOffsetTopicPath = getDeleteTopicPath(GroupCoordinator.GroupMetadataTopicName)
+ // try to delete the TopicConstants.GROUP_METADATA_TOPIC_NAME and make sure it doesn't
+ val deleteOffsetTopicOpts = new TopicCommandOptions(Array("--topic", TopicConstants.GROUP_METADATA_TOPIC_NAME))
+ val deleteOffsetTopicPath = getDeleteTopicPath(TopicConstants.GROUP_METADATA_TOPIC_NAME)
assertFalse("Delete path for topic shouldn't exist before deletion.", zkUtils.zkClient.exists(deleteOffsetTopicPath))
intercept[AdminOperationException] {
TopicCommand.deleteTopic(zkUtils, deleteOffsetTopicOpts)
http://git-wip-us.apache.org/repos/asf/kafka/blob/30e78fa0/core/src/test/scala/unit/kafka/consumer/TopicFilterTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/consumer/TopicFilterTest.scala b/core/src/test/scala/unit/kafka/consumer/TopicFilterTest.scala
index 1e8d04e..0e0a06a 100644
--- a/core/src/test/scala/unit/kafka/consumer/TopicFilterTest.scala
+++ b/core/src/test/scala/unit/kafka/consumer/TopicFilterTest.scala
@@ -22,6 +22,7 @@ import org.junit.Assert._
import org.scalatest.junit.JUnitSuite
import org.junit.Test
import kafka.coordinator.GroupCoordinator
+import org.apache.kafka.common.internals.TopicConstants
class TopicFilterTest extends JUnitSuite {
@@ -37,8 +38,8 @@ class TopicFilterTest extends JUnitSuite {
val topicFilter2 = new Whitelist(".+")
assertTrue(topicFilter2.isTopicAllowed("alltopics", excludeInternalTopics = true))
- assertFalse(topicFilter2.isTopicAllowed(GroupCoordinator.GroupMetadataTopicName, excludeInternalTopics = true))
- assertTrue(topicFilter2.isTopicAllowed(GroupCoordinator.GroupMetadataTopicName, excludeInternalTopics = false))
+ assertFalse(topicFilter2.isTopicAllowed(TopicConstants.GROUP_METADATA_TOPIC_NAME, excludeInternalTopics = true))
+ assertTrue(topicFilter2.isTopicAllowed(TopicConstants.GROUP_METADATA_TOPIC_NAME, excludeInternalTopics = false))
val topicFilter3 = new Whitelist("white_listed-topic.+")
assertTrue(topicFilter3.isTopicAllowed("white_listed-topic1", excludeInternalTopics = true))
@@ -57,8 +58,8 @@ class TopicFilterTest extends JUnitSuite {
assertFalse(topicFilter1.isTopicAllowed("black1", excludeInternalTopics = true))
assertFalse(topicFilter1.isTopicAllowed("black1", excludeInternalTopics = false))
- assertFalse(topicFilter1.isTopicAllowed(GroupCoordinator.GroupMetadataTopicName, excludeInternalTopics = true))
- assertTrue(topicFilter1.isTopicAllowed(GroupCoordinator.GroupMetadataTopicName, excludeInternalTopics = false))
+ assertFalse(topicFilter1.isTopicAllowed(TopicConstants.GROUP_METADATA_TOPIC_NAME, excludeInternalTopics = true))
+ assertTrue(topicFilter1.isTopicAllowed(TopicConstants.GROUP_METADATA_TOPIC_NAME, excludeInternalTopics = false))
}
@Test
http://git-wip-us.apache.org/repos/asf/kafka/blob/30e78fa0/core/src/test/scala/unit/kafka/coordinator/GroupCoordinatorResponseTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/coordinator/GroupCoordinatorResponseTest.scala b/core/src/test/scala/unit/kafka/coordinator/GroupCoordinatorResponseTest.scala
index 50fa09e..acdb660 100644
--- a/core/src/test/scala/unit/kafka/coordinator/GroupCoordinatorResponseTest.scala
+++ b/core/src/test/scala/unit/kafka/coordinator/GroupCoordinatorResponseTest.scala
@@ -19,7 +19,6 @@ package kafka.coordinator
import org.apache.kafka.common.record.Record
import org.junit.Assert._
-
import kafka.common.{OffsetAndMetadata, TopicAndPartition}
import kafka.message.{Message, MessageSet}
import kafka.server.{ReplicaManager, KafkaConfig}
@@ -32,12 +31,11 @@ import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse
import org.easymock.{Capture, IAnswer, EasyMock}
import org.junit.{After, Before, Test}
import org.scalatest.junit.JUnitSuite
-
import java.util.concurrent.TimeUnit
-
import scala.collection._
import scala.concurrent.duration.Duration
import scala.concurrent.{Await, Future, Promise}
+import org.apache.kafka.common.internals.TopicConstants
/**
* Test GroupCoordinator responses
@@ -81,12 +79,12 @@ class GroupCoordinatorResponseTest extends JUnitSuite {
// make two partitions of the group topic to make sure some partitions are not owned by the coordinator
val ret = mutable.Map[String, Map[Int, Seq[Int]]]()
- ret += (GroupCoordinator.GroupMetadataTopicName -> Map(0 -> Seq(1), 1 -> Seq(1)))
+ ret += (TopicConstants.GROUP_METADATA_TOPIC_NAME -> Map(0 -> Seq(1), 1 -> Seq(1)))
replicaManager = EasyMock.createNiceMock(classOf[ReplicaManager])
zkUtils = EasyMock.createNiceMock(classOf[ZkUtils])
- EasyMock.expect(zkUtils.getPartitionAssignmentForTopics(Seq(GroupCoordinator.GroupMetadataTopicName))).andReturn(ret)
+ EasyMock.expect(zkUtils.getPartitionAssignmentForTopics(Seq(TopicConstants.GROUP_METADATA_TOPIC_NAME))).andReturn(ret)
EasyMock.replay(zkUtils)
groupCoordinator = GroupCoordinator(KafkaConfig.fromProps(props), zkUtils, replicaManager, new SystemTime)
@@ -834,7 +832,7 @@ class GroupCoordinatorResponseTest extends JUnitSuite {
EasyMock.anyObject().asInstanceOf[Map[TopicPartition, MessageSet]],
EasyMock.capture(capturedArgument))).andAnswer(new IAnswer[Unit] {
override def answer = capturedArgument.getValue.apply(
- Map(new TopicPartition(GroupCoordinator.GroupMetadataTopicName, groupPartitionId) ->
+ Map(new TopicPartition(TopicConstants.GROUP_METADATA_TOPIC_NAME, groupPartitionId) ->
new PartitionResponse(Errors.NONE.code, 0L, Record.NO_TIMESTAMP)
)
)})
@@ -911,7 +909,7 @@ class GroupCoordinatorResponseTest extends JUnitSuite {
EasyMock.anyObject().asInstanceOf[Map[TopicPartition, MessageSet]],
EasyMock.capture(capturedArgument))).andAnswer(new IAnswer[Unit] {
override def answer = capturedArgument.getValue.apply(
- Map(new TopicPartition(GroupCoordinator.GroupMetadataTopicName, groupPartitionId) ->
+ Map(new TopicPartition(TopicConstants.GROUP_METADATA_TOPIC_NAME, groupPartitionId) ->
new PartitionResponse(Errors.NONE.code, 0L, Record.NO_TIMESTAMP)
)
)})
@@ -925,7 +923,7 @@ class GroupCoordinatorResponseTest extends JUnitSuite {
private def leaveGroup(groupId: String, consumerId: String): LeaveGroupCallbackParams = {
val (responseFuture, responseCallback) = setupHeartbeatCallback
- EasyMock.expect(replicaManager.getPartition(GroupCoordinator.GroupMetadataTopicName, groupPartitionId)).andReturn(None)
+ EasyMock.expect(replicaManager.getPartition(TopicConstants.GROUP_METADATA_TOPIC_NAME, groupPartitionId)).andReturn(None)
EasyMock.expect(replicaManager.getMessageFormatVersion(EasyMock.anyObject())).andReturn(Some(Message.MagicValue_V1)).anyTimes()
EasyMock.replay(replicaManager)