You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ew...@apache.org on 2016/08/09 03:33:07 UTC

kafka git commit: KAFKA-3954; Consumer should use internal topics information returned by the broker

Repository: kafka
Updated Branches:
  refs/heads/trunk da015585a -> fb65ff40a


KAFKA-3954; Consumer should use internal topics information returned by the broker

It previously hardcoded it.

Author: Ismael Juma <is...@juma.me.uk>

Reviewers: Grant Henke <gr...@gmail.com>, Jason Gustafson <ja...@confluent.io>, Ewen Cheslack-Postava <ew...@confluent.io>

Closes #1613 from ijuma/kafka-3954-consumer-internal-topics-from-broker


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/fb65ff40
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/fb65ff40
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/fb65ff40

Branch: refs/heads/trunk
Commit: fb65ff40a8d89bc7ad9b079cf1f2a8b902abd182
Parents: da01558
Author: Ismael Juma <is...@juma.me.uk>
Authored: Mon Aug 8 20:33:59 2016 -0700
Committer: Ewen Cheslack-Postava <me...@ewencp.org>
Committed: Mon Aug 8 20:33:59 2016 -0700

----------------------------------------------------------------------
 .../java/org/apache/kafka/clients/Metadata.java |  4 ++-
 .../consumer/internals/ConsumerCoordinator.java |  3 +-
 .../java/org/apache/kafka/common/Cluster.java   | 33 +++++++++++++++---
 .../kafka/common/internals/TopicConstants.java  | 33 ------------------
 .../kafka/common/requests/MetadataResponse.java |  5 ++-
 .../org/apache/kafka/clients/MetadataTest.java  |  4 +++
 .../internals/ConsumerCoordinatorTest.java      |  5 ++-
 .../clients/producer/KafkaProducerTest.java     |  2 ++
 .../clients/producer/MockProducerTest.java      |  2 +-
 .../internals/DefaultPartitionerTest.java       |  3 +-
 .../internals/RecordAccumulatorTest.java        |  2 +-
 .../java/org/apache/kafka/test/TestUtils.java   |  6 +++-
 .../main/scala/kafka/admin/TopicCommand.scala   |  3 +-
 core/src/main/scala/kafka/common/Topic.scala    |  9 +++--
 .../main/scala/kafka/consumer/TopicFilter.scala |  7 ++--
 .../coordinator/GroupMetadataManager.scala      | 27 +++++++--------
 .../src/main/scala/kafka/server/KafkaApis.scala | 36 ++++++++++----------
 .../scala/kafka/server/ReplicaManager.scala     |  3 +-
 .../kafka/api/AuthorizerIntegrationTest.scala   |  4 +--
 .../kafka/api/BaseConsumerTest.scala            |  4 +--
 .../api/GroupCoordinatorIntegrationTest.scala   |  7 ++--
 .../kafka/api/IntegrationTestHarness.scala      |  7 ++--
 .../kafka/api/ProducerFailureHandlingTest.scala |  8 ++---
 .../unit/kafka/admin/TopicCommandTest.scala     | 11 +++---
 .../unit/kafka/consumer/TopicFilterTest.scala   | 14 ++++----
 .../GroupCoordinatorResponseTest.scala          | 15 ++++----
 .../coordinator/GroupMetadataManagerTest.scala  | 15 ++++----
 .../unit/kafka/server/MetadataRequestTest.scala |  6 ++--
 .../WindowedStreamPartitionerTest.java          |  3 +-
 .../processor/DefaultPartitionGrouperTest.java  |  3 +-
 .../internals/RecordCollectorTest.java          |  3 +-
 .../internals/StreamPartitionAssignorTest.java  |  3 +-
 .../processor/internals/StreamThreadTest.java   |  3 +-
 33 files changed, 148 insertions(+), 145 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/fb65ff40/clients/src/main/java/org/apache/kafka/clients/Metadata.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/Metadata.java b/clients/src/main/java/org/apache/kafka/clients/Metadata.java
index 3934627..0fd5d63 100644
--- a/clients/src/main/java/org/apache/kafka/clients/Metadata.java
+++ b/clients/src/main/java/org/apache/kafka/clients/Metadata.java
@@ -287,7 +287,9 @@ public final class Metadata {
         Set<String> unauthorizedTopics = new HashSet<>();
         Collection<PartitionInfo> partitionInfos = new ArrayList<>();
         List<Node> nodes = Collections.emptyList();
+        Set<String> internalTopics = Collections.<String>emptySet();
         if (cluster != null) {
+            internalTopics = cluster.internalTopics();
             unauthorizedTopics.addAll(cluster.unauthorizedTopics());
             unauthorizedTopics.retainAll(this.topics.keySet());
 
@@ -299,6 +301,6 @@ public final class Metadata {
             }
             nodes = cluster.nodes();
         }
-        return new Cluster(nodes, partitionInfos, unauthorizedTopics);
+        return new Cluster(nodes, partitionInfos, unauthorizedTopics, internalTopics);
     }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/fb65ff40/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
index 2880efc..b210746 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
@@ -28,7 +28,6 @@ import org.apache.kafka.common.errors.GroupAuthorizationException;
 import org.apache.kafka.common.errors.RetriableException;
 import org.apache.kafka.common.errors.TopicAuthorizationException;
 import org.apache.kafka.common.errors.WakeupException;
-import org.apache.kafka.common.internals.TopicConstants;
 import org.apache.kafka.common.metrics.Measurable;
 import org.apache.kafka.common.metrics.MetricConfig;
 import org.apache.kafka.common.metrics.Metrics;
@@ -155,7 +154,7 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
 
                     for (String topic : cluster.topics())
                         if (subscriptions.getSubscribedPattern().matcher(topic).matches() &&
-                                !(excludeInternalTopics && TopicConstants.INTERNAL_TOPICS.contains(topic)))
+                                !(excludeInternalTopics && cluster.internalTopics().contains(topic)))
                             topicsToSubscribe.add(topic);
 
                     subscriptions.changeSubscription(topicsToSubscribe);

http://git-wip-us.apache.org/repos/asf/kafka/blob/fb65ff40/clients/src/main/java/org/apache/kafka/common/Cluster.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/Cluster.java b/clients/src/main/java/org/apache/kafka/common/Cluster.java
index e1bf581..31447c5 100644
--- a/clients/src/main/java/org/apache/kafka/common/Cluster.java
+++ b/clients/src/main/java/org/apache/kafka/common/Cluster.java
@@ -32,6 +32,7 @@ public final class Cluster {
     private final boolean isBootstrapConfigured;
     private final List<Node> nodes;
     private final Set<String> unauthorizedTopics;
+    private final Set<String> internalTopics;
     private final Map<TopicPartition, PartitionInfo> partitionsByTopicPartition;
     private final Map<String, List<PartitionInfo>> partitionsByTopic;
     private final Map<String, List<PartitionInfo>> availablePartitionsByTopic;
@@ -42,17 +43,32 @@ public final class Cluster {
      * Create a new cluster with the given nodes and partitions
      * @param nodes The nodes in the cluster
      * @param partitions Information about a subset of the topic-partitions this cluster hosts
+     * @deprecated Use the Cluster constructor with 4 parameters
      */
+    @Deprecated
     public Cluster(Collection<Node> nodes,
                    Collection<PartitionInfo> partitions,
                    Set<String> unauthorizedTopics) {
-        this(false, nodes, partitions, unauthorizedTopics);
+        this(false, nodes, partitions, unauthorizedTopics, Collections.<String>emptySet());
+    }
+
+    /**
+     * Create a new cluster with the given nodes and partitions
+     * @param nodes The nodes in the cluster
+     * @param partitions Information about a subset of the topic-partitions this cluster hosts
+     */
+    public Cluster(Collection<Node> nodes,
+                   Collection<PartitionInfo> partitions,
+                   Set<String> unauthorizedTopics,
+                   Set<String> internalTopics) {
+        this(false, nodes, partitions, unauthorizedTopics, internalTopics);
     }
 
     private Cluster(boolean isBootstrapConfigured,
                     Collection<Node> nodes,
                     Collection<PartitionInfo> partitions,
-                    Set<String> unauthorizedTopics) {
+                    Set<String> unauthorizedTopics,
+                    Set<String> internalTopics) {
         this.isBootstrapConfigured = isBootstrapConfigured;
 
         // make a randomized, unmodifiable copy of the nodes
@@ -105,13 +121,15 @@ public final class Cluster {
             this.partitionsByNode.put(entry.getKey(), Collections.unmodifiableList(entry.getValue()));
 
         this.unauthorizedTopics = Collections.unmodifiableSet(unauthorizedTopics);
+        this.internalTopics = Collections.unmodifiableSet(internalTopics);
     }
 
     /**
      * Create an empty cluster instance with no nodes and no topic-partitions.
      */
     public static Cluster empty() {
-        return new Cluster(new ArrayList<Node>(0), new ArrayList<PartitionInfo>(0), Collections.<String>emptySet());
+        return new Cluster(new ArrayList<Node>(0), new ArrayList<PartitionInfo>(0), Collections.<String>emptySet(),
+                Collections.<String>emptySet());
     }
 
     /**
@@ -124,7 +142,7 @@ public final class Cluster {
         int nodeId = -1;
         for (InetSocketAddress address : addresses)
             nodes.add(new Node(nodeId--, address.getHostString(), address.getPort()));
-        return new Cluster(true, nodes, new ArrayList<PartitionInfo>(0), Collections.<String>emptySet());
+        return new Cluster(true, nodes, new ArrayList<PartitionInfo>(0), Collections.<String>emptySet(), Collections.<String>emptySet());
     }
 
     /**
@@ -133,7 +151,8 @@ public final class Cluster {
     public Cluster withPartitions(Map<TopicPartition, PartitionInfo> partitions) {
         Map<TopicPartition, PartitionInfo> combinedPartitions = new HashMap<>(this.partitionsByTopicPartition);
         combinedPartitions.putAll(partitions);
-        return new Cluster(this.nodes, combinedPartitions.values(), new HashSet<>(this.unauthorizedTopics));
+        return new Cluster(this.nodes, combinedPartitions.values(), new HashSet<>(this.unauthorizedTopics),
+                new HashSet<>(this.internalTopics));
     }
 
     /**
@@ -223,6 +242,10 @@ public final class Cluster {
         return unauthorizedTopics;
     }
 
+    public Set<String> internalTopics() {
+        return internalTopics;
+    }
+
     public boolean isBootstrapConfigured() {
         return isBootstrapConfigured;
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/fb65ff40/clients/src/main/java/org/apache/kafka/common/internals/TopicConstants.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/internals/TopicConstants.java b/clients/src/main/java/org/apache/kafka/common/internals/TopicConstants.java
deleted file mode 100644
index 5d6b992..0000000
--- a/clients/src/main/java/org/apache/kafka/common/internals/TopicConstants.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.common.internals;
-
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashSet;
-
-public final class TopicConstants {
-
-    //avoid instantiation
-    private TopicConstants() {
-    }
-
-    // TODO: we store both group metadata and offset data here despite the topic name being offsets only
-    public static final String GROUP_METADATA_TOPIC_NAME = "__consumer_offsets";
-    public static final Collection<String> INTERNAL_TOPICS = Collections.unmodifiableSet(new HashSet<String>(Arrays.asList(GROUP_METADATA_TOPIC_NAME)));
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/fb65ff40/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java
index 78b35f8..4bf162d 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java
@@ -252,9 +252,12 @@ public class MetadataResponse extends AbstractRequestResponse {
      * @return the cluster snapshot
      */
     public Cluster cluster() {
+        Set<String> internalTopics = new HashSet<>();
         List<PartitionInfo> partitions = new ArrayList<>();
         for (TopicMetadata metadata : topicMetadata) {
             if (metadata.error == Errors.NONE) {
+                if (metadata.isInternal)
+                    internalTopics.add(metadata.topic);
                 for (PartitionMetadata partitionMetadata : metadata.partitionMetadata)
                     partitions.add(new PartitionInfo(
                             metadata.topic,
@@ -265,7 +268,7 @@ public class MetadataResponse extends AbstractRequestResponse {
             }
         }
 
-        return new Cluster(this.brokers, partitions, topicsByError(Errors.TOPIC_AUTHORIZATION_FAILED));
+        return new Cluster(this.brokers, partitions, topicsByError(Errors.TOPIC_AUTHORIZATION_FAILED), internalTopics);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/kafka/blob/fb65ff40/clients/src/test/java/org/apache/kafka/clients/MetadataTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/MetadataTest.java b/clients/src/test/java/org/apache/kafka/clients/MetadataTest.java
index 5defb13..5eaa737 100644
--- a/clients/src/test/java/org/apache/kafka/clients/MetadataTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/MetadataTest.java
@@ -134,6 +134,7 @@ public class MetadataTest {
                 Arrays.asList(
                     new PartitionInfo("topic", 0, null, null, null),
                     new PartitionInfo("topic1", 0, null, null, null)),
+                Collections.<String>emptySet(),
                 Collections.<String>emptySet()),
             100);
 
@@ -161,6 +162,7 @@ public class MetadataTest {
                 Arrays.asList(
                     new PartitionInfo("topic", 0, null, null, null),
                     new PartitionInfo("topic1", 0, null, null, null)),
+                Collections.<String>emptySet(),
                 Collections.<String>emptySet()),
             100);
 
@@ -187,6 +189,7 @@ public class MetadataTest {
                 Arrays.asList(
                     new PartitionInfo("topic", 0, null, null, null),
                     new PartitionInfo("topic1", 0, null, null, null)),
+                Collections.<String>emptySet(),
                 Collections.<String>emptySet()),
             100);
 
@@ -197,6 +200,7 @@ public class MetadataTest {
                 Arrays.asList(
                     new PartitionInfo("topic2", 0, null, null, null),
                     new PartitionInfo("topic3", 0, null, null, null)),
+                Collections.<String>emptySet(),
                 Collections.<String>emptySet()),
             100);
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/fb65ff40/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
index 040824f..176571c 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
@@ -37,7 +37,6 @@ import org.apache.kafka.common.errors.DisconnectException;
 import org.apache.kafka.common.errors.GroupAuthorizationException;
 import org.apache.kafka.common.errors.OffsetMetadataTooLarge;
 import org.apache.kafka.common.errors.WakeupException;
-import org.apache.kafka.common.internals.TopicConstants;
 import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.protocol.types.Struct;
@@ -637,7 +636,7 @@ public class ConsumerCoordinatorTest {
     public void testExcludeInternalTopicsConfigOption() { 
         subscriptions.subscribe(Pattern.compile(".*"), rebalanceListener);
 
-        metadata.update(TestUtils.singletonCluster(TopicConstants.GROUP_METADATA_TOPIC_NAME, 2), time.milliseconds());
+        metadata.update(TestUtils.singletonCluster(TestUtils.GROUP_METADATA_TOPIC_NAME, 2), time.milliseconds());
 
         assertFalse(subscriptions.partitionAssignmentNeeded());
     }
@@ -647,7 +646,7 @@ public class ConsumerCoordinatorTest {
         coordinator = buildCoordinator(new Metrics(), assignors, false, false);
         subscriptions.subscribe(Pattern.compile(".*"), rebalanceListener);
 
-        metadata.update(TestUtils.singletonCluster(TopicConstants.GROUP_METADATA_TOPIC_NAME, 2), time.milliseconds());
+        metadata.update(TestUtils.singletonCluster(TestUtils.GROUP_METADATA_TOPIC_NAME, 2), time.milliseconds());
 
         assertTrue(subscriptions.partitionAssignmentNeeded());
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/fb65ff40/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
index 1780e2f..740a57d 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
@@ -154,10 +154,12 @@ public class KafkaProducerTest {
         Collection<Node> nodes = Collections.singletonList(new Node(0, "host1", 1000));
         final Cluster emptyCluster = new Cluster(nodes,
                 Collections.<PartitionInfo>emptySet(),
+                Collections.<String>emptySet(),
                 Collections.<String>emptySet());
         final Cluster cluster = new Cluster(
                 Collections.singletonList(new Node(0, "host1", 1000)),
                 Arrays.asList(new PartitionInfo(topic, 0, null, null, null)),
+                Collections.<String>emptySet(),
                 Collections.<String>emptySet());
 
         // Expect exactly one fetch for each attempt to refresh while topic metadata is not available

http://git-wip-us.apache.org/repos/asf/kafka/blob/fb65ff40/clients/src/test/java/org/apache/kafka/clients/producer/MockProducerTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/MockProducerTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/MockProducerTest.java
index 0a0bdd8..9017869 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/MockProducerTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/MockProducerTest.java
@@ -59,7 +59,7 @@ public class MockProducerTest {
     public void testPartitioner() throws Exception {
         PartitionInfo partitionInfo0 = new PartitionInfo(topic, 0, null, null, null);
         PartitionInfo partitionInfo1 = new PartitionInfo(topic, 1, null, null, null);
-        Cluster cluster = new Cluster(new ArrayList<Node>(0), asList(partitionInfo0, partitionInfo1), Collections.<String>emptySet());
+        Cluster cluster = new Cluster(new ArrayList<Node>(0), asList(partitionInfo0, partitionInfo1), Collections.<String>emptySet(), Collections.<String>emptySet());
         MockProducer<String, String> producer = new MockProducer<String, String>(cluster, true, new DefaultPartitioner(), new StringSerializer(), new StringSerializer());
         ProducerRecord<String, String> record = new ProducerRecord<String, String>(topic, "key", "value");
         Future<RecordMetadata> metadata = producer.send(record);

http://git-wip-us.apache.org/repos/asf/kafka/blob/fb65ff40/clients/src/test/java/org/apache/kafka/clients/producer/internals/DefaultPartitionerTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/DefaultPartitionerTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/DefaultPartitionerTest.java
index fd8a5bc..9748222 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/DefaultPartitionerTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/DefaultPartitionerTest.java
@@ -37,7 +37,8 @@ public class DefaultPartitionerTest {
     private List<PartitionInfo> partitions = asList(new PartitionInfo(topic, 1, null, nodes, nodes),
                                                     new PartitionInfo(topic, 2, node1, nodes, nodes),
                                                     new PartitionInfo(topic, 0, node0, nodes, nodes));
-    private Cluster cluster = new Cluster(asList(node0, node1, node2), partitions, Collections.<String>emptySet());
+    private Cluster cluster = new Cluster(asList(node0, node1, node2), partitions, Collections.<String>emptySet(),
+            Collections.<String>emptySet());
 
     @Test
     public void testKeyPartitionIsStable() {

http://git-wip-us.apache.org/repos/asf/kafka/blob/fb65ff40/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java
index 43ac15a..37f87cc 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java
@@ -64,7 +64,7 @@ public class RecordAccumulatorTest {
     private byte[] key = "key".getBytes();
     private byte[] value = "value".getBytes();
     private int msgSize = Records.LOG_OVERHEAD + Record.recordSize(key, value);
-    private Cluster cluster = new Cluster(Arrays.asList(node1, node2), Arrays.asList(part1, part2, part3), Collections.<String>emptySet());
+    private Cluster cluster = new Cluster(Arrays.asList(node1, node2), Arrays.asList(part1, part2, part3), Collections.<String>emptySet(), Collections.<String>emptySet());
     private Metrics metrics = new Metrics(time);
     private final long maxBlockTimeMs = 1000;
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/fb65ff40/clients/src/test/java/org/apache/kafka/test/TestUtils.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/test/TestUtils.java b/clients/src/test/java/org/apache/kafka/test/TestUtils.java
index e5e4d9b..4baa63b 100644
--- a/clients/src/test/java/org/apache/kafka/test/TestUtils.java
+++ b/clients/src/test/java/org/apache/kafka/test/TestUtils.java
@@ -29,6 +29,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Properties;
 import java.util.Random;
+import java.util.Set;
 import java.util.UUID;
 
 import org.apache.kafka.clients.consumer.ConsumerConfig;
@@ -53,6 +54,9 @@ public class TestUtils {
     public static final String DIGITS = "0123456789";
     public static final String LETTERS_AND_DIGITS = LETTERS + DIGITS;
 
+    public static final String GROUP_METADATA_TOPIC_NAME = "__consumer_offsets";
+    public static final Set<String> INTERNAL_TOPICS = Collections.singleton(GROUP_METADATA_TOPIC_NAME);
+
     /* A consistent random number generator to make tests repeatable */
     public static final Random SEEDED_RANDOM = new Random(192348092834L);
     public static final Random RANDOM = new Random();
@@ -77,7 +81,7 @@ public class TestUtils {
             for (int i = 0; i < partitions; i++)
                 parts.add(new PartitionInfo(topic, i, ns[i % ns.length], ns, ns));
         }
-        return new Cluster(asList(ns), parts, Collections.<String>emptySet());
+        return new Cluster(asList(ns), parts, Collections.<String>emptySet(), INTERNAL_TOPICS);
     }
 
     public static Cluster clusterWith(int nodes, String topic, int partitions) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/fb65ff40/core/src/main/scala/kafka/admin/TopicCommand.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/TopicCommand.scala b/core/src/main/scala/kafka/admin/TopicCommand.scala
index 57a5458..657f26c 100755
--- a/core/src/main/scala/kafka/admin/TopicCommand.scala
+++ b/core/src/main/scala/kafka/admin/TopicCommand.scala
@@ -32,7 +32,6 @@ import org.apache.kafka.common.utils.Utils
 import scala.collection.JavaConversions._
 import scala.collection._
 import org.apache.kafka.clients.consumer.{ConsumerConfig => NewConsumerConfig}
-import org.apache.kafka.common.internals.TopicConstants
 
 
 object TopicCommand extends Logging {
@@ -138,7 +137,7 @@ object TopicCommand extends Logging {
       }
 
       if(opts.options.has(opts.partitionsOpt)) {
-        if (topic == TopicConstants.GROUP_METADATA_TOPIC_NAME) {
+        if (topic == Topic.GroupMetadataTopicName) {
           throw new IllegalArgumentException("The number of partitions for the offsets topic cannot be changed.")
         }
         println("WARNING: If partitions are increased for a topic that has a key, the partition " +

http://git-wip-us.apache.org/repos/asf/kafka/blob/fb65ff40/core/src/main/scala/kafka/common/Topic.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/common/Topic.scala b/core/src/main/scala/kafka/common/Topic.scala
index 054c5eb..4a65afb 100644
--- a/core/src/main/scala/kafka/common/Topic.scala
+++ b/core/src/main/scala/kafka/common/Topic.scala
@@ -18,9 +18,14 @@
 package kafka.common
 
 import util.matching.Regex
-import org.apache.kafka.common.internals.TopicConstants.INTERNAL_TOPICS
+
+import scala.collection.immutable
 
 object Topic {
+
+  val GroupMetadataTopicName = "__consumer_offsets"
+  val InternalTopics = immutable.Set(GroupMetadataTopicName)
+
   val legalChars = "[a-zA-Z0-9\\._\\-]"
   private val maxNameLength = 249
   private val rgx = new Regex(legalChars + "+")
@@ -63,6 +68,6 @@ object Topic {
   }
 
   def isInternal(topic: String): Boolean =
-    INTERNAL_TOPICS.contains(topic)
+    InternalTopics.contains(topic)
 
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/fb65ff40/core/src/main/scala/kafka/consumer/TopicFilter.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/TopicFilter.scala b/core/src/main/scala/kafka/consumer/TopicFilter.scala
index b89968e..1ab4e5c 100644
--- a/core/src/main/scala/kafka/consumer/TopicFilter.scala
+++ b/core/src/main/scala/kafka/consumer/TopicFilter.scala
@@ -17,12 +17,9 @@
 
 package kafka.consumer
 
-
 import kafka.utils.Logging
 import java.util.regex.{PatternSyntaxException, Pattern}
 import kafka.common.Topic
-import org.apache.kafka.common.internals.TopicConstants
-
 
 sealed abstract class TopicFilter(rawRegex: String) extends Logging {
 
@@ -48,7 +45,7 @@ sealed abstract class TopicFilter(rawRegex: String) extends Logging {
 
 case class Whitelist(rawRegex: String) extends TopicFilter(rawRegex) {
   override def isTopicAllowed(topic: String, excludeInternalTopics: Boolean) = {
-    val allowed = topic.matches(regex) && !(TopicConstants.INTERNAL_TOPICS.contains(topic) && excludeInternalTopics)
+    val allowed = topic.matches(regex) && !(Topic.isInternal(topic) && excludeInternalTopics)
 
     debug("%s %s".format(
       topic, if (allowed) "allowed" else "filtered"))
@@ -61,7 +58,7 @@ case class Whitelist(rawRegex: String) extends TopicFilter(rawRegex) {
 
 case class Blacklist(rawRegex: String) extends TopicFilter(rawRegex) {
   override def isTopicAllowed(topic: String, excludeInternalTopics: Boolean) = {
-    val allowed = (!topic.matches(regex)) && !(TopicConstants.INTERNAL_TOPICS.contains(topic) && excludeInternalTopics)
+    val allowed = (!topic.matches(regex)) && !(Topic.isInternal(topic) && excludeInternalTopics)
 
     debug("%s %s".format(
       topic, if (allowed) "allowed" else "filtered"))

http://git-wip-us.apache.org/repos/asf/kafka/blob/fb65ff40/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala b/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala
index e75e23b..ef8b295 100644
--- a/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala
+++ b/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala
@@ -48,7 +48,6 @@ import java.util.concurrent.locks.ReentrantLock
 
 import com.yammer.metrics.core.Gauge
 import kafka.utils.CoreUtils.inLock
-import org.apache.kafka.common.internals.TopicConstants
 
 
 class GroupMetadataManager(val brokerId: Int,
@@ -153,9 +152,9 @@ class GroupMetadataManager(val brokerId: Int,
       val tombstone = new Message(bytes = null, key = GroupMetadataManager.groupMetadataKey(group.groupId),
         timestamp = timestamp, magicValue = magicValue)
 
-      val partitionOpt = replicaManager.getPartition(TopicConstants.GROUP_METADATA_TOPIC_NAME, groupPartition)
+      val partitionOpt = replicaManager.getPartition(Topic.GroupMetadataTopicName, groupPartition)
       partitionOpt.foreach { partition =>
-        val appendPartition = TopicAndPartition(TopicConstants.GROUP_METADATA_TOPIC_NAME, groupPartition)
+        val appendPartition = TopicAndPartition(Topic.GroupMetadataTopicName, groupPartition)
 
         trace("Marking group %s as deleted.".format(group.groupId))
 
@@ -182,7 +181,7 @@ class GroupMetadataManager(val brokerId: Int,
       timestamp = timestamp,
       magicValue = magicValue)
 
-    val groupMetadataPartition = new TopicPartition(TopicConstants.GROUP_METADATA_TOPIC_NAME, partitionFor(group.groupId))
+    val groupMetadataPartition = new TopicPartition(Topic.GroupMetadataTopicName, partitionFor(group.groupId))
 
     val groupMetadataMessageSet = Map(groupMetadataPartition ->
       new ByteBufferMessageSet(config.offsetsTopicCompressionCodec, message))
@@ -268,7 +267,7 @@ class GroupMetadataManager(val brokerId: Int,
       )
     }.toSeq
 
-    val offsetTopicPartition = new TopicPartition(TopicConstants.GROUP_METADATA_TOPIC_NAME, partitionFor(group.groupId))
+    val offsetTopicPartition = new TopicPartition(Topic.GroupMetadataTopicName, partitionFor(group.groupId))
 
     val offsetsAndMetadataMessageSet = Map(offsetTopicPartition ->
       new ByteBufferMessageSet(config.offsetsTopicCompressionCodec, messages:_*))
@@ -378,7 +377,7 @@ class GroupMetadataManager(val brokerId: Int,
    */
   def loadGroupsForPartition(offsetsPartition: Int,
                              onGroupLoaded: GroupMetadata => Unit) {
-    val topicPartition = TopicAndPartition(TopicConstants.GROUP_METADATA_TOPIC_NAME, offsetsPartition)
+    val topicPartition = TopicAndPartition(Topic.GroupMetadataTopicName, offsetsPartition)
     scheduler.schedule(topicPartition.toString, loadGroupsAndOffsets)
 
     def loadGroupsAndOffsets() {
@@ -520,7 +519,7 @@ class GroupMetadataManager(val brokerId: Int,
    */
   def removeGroupsForPartition(offsetsPartition: Int,
                                onGroupUnloaded: GroupMetadata => Unit) {
-    val topicPartition = TopicAndPartition(TopicConstants.GROUP_METADATA_TOPIC_NAME, offsetsPartition)
+    val topicPartition = TopicAndPartition(Topic.GroupMetadataTopicName, offsetsPartition)
     scheduler.schedule(topicPartition.toString, removeGroupsAndOffsets)
 
     def removeGroupsAndOffsets() {
@@ -543,10 +542,10 @@ class GroupMetadataManager(val brokerId: Int,
       }
 
       if (numOffsetsRemoved > 0) info("Removed %d cached offsets for %s on follower transition."
-        .format(numOffsetsRemoved, TopicAndPartition(TopicConstants.GROUP_METADATA_TOPIC_NAME, offsetsPartition)))
+        .format(numOffsetsRemoved, TopicAndPartition(Topic.GroupMetadataTopicName, offsetsPartition)))
 
       if (numGroupsRemoved > 0) info("Removed %d cached groups for %s on follower transition."
-        .format(numGroupsRemoved, TopicAndPartition(TopicConstants.GROUP_METADATA_TOPIC_NAME, offsetsPartition)))
+        .format(numGroupsRemoved, TopicAndPartition(Topic.GroupMetadataTopicName, offsetsPartition)))
     }
   }
 
@@ -568,9 +567,9 @@ class GroupMetadataManager(val brokerId: Int,
             new Message(bytes = null, key = commitKey, timestamp = timestamp, magicValue = magicValue)
           }.toBuffer
 
-          val partitionOpt = replicaManager.getPartition(TopicConstants.GROUP_METADATA_TOPIC_NAME, offsetsPartition)
+          val partitionOpt = replicaManager.getPartition(Topic.GroupMetadataTopicName, offsetsPartition)
           partitionOpt.foreach { partition =>
-            val appendPartition = TopicAndPartition(TopicConstants.GROUP_METADATA_TOPIC_NAME, offsetsPartition)
+            val appendPartition = TopicAndPartition(Topic.GroupMetadataTopicName, offsetsPartition)
             trace("Marked %d offsets in %s for deletion.".format(tombstones.size, appendPartition))
 
             try {
@@ -600,7 +599,7 @@ class GroupMetadataManager(val brokerId: Int,
   }
 
   private def getHighWatermark(partitionId: Int): Long = {
-    val partitionOpt = replicaManager.getPartition(TopicConstants.GROUP_METADATA_TOPIC_NAME, partitionId)
+    val partitionOpt = replicaManager.getPartition(Topic.GroupMetadataTopicName, partitionId)
 
     val hw = partitionOpt.map { partition =>
       partition.leaderReplicaIfLocal().map(_.highWatermark.messageOffset).getOrElse(-1L)
@@ -630,7 +629,7 @@ class GroupMetadataManager(val brokerId: Int,
    * If the topic does not exist, the configured partition count is returned.
    */
   private def getOffsetsTopicPartitionCount = {
-    val topic = TopicConstants.GROUP_METADATA_TOPIC_NAME
+    val topic = Topic.GroupMetadataTopicName
     val topicData = zkUtils.getPartitionAssignmentForTopics(Seq(topic))
     if (topicData(topic).nonEmpty)
       topicData(topic).size
@@ -639,7 +638,7 @@ class GroupMetadataManager(val brokerId: Int,
   }
 
   private def getMessageFormatVersionAndTimestamp(partition: Int): (Byte, Long) = {
-    val groupMetadataTopicAndPartition = new TopicAndPartition(TopicConstants.GROUP_METADATA_TOPIC_NAME, partition)
+    val groupMetadataTopicAndPartition = new TopicAndPartition(Topic.GroupMetadataTopicName, partition)
     val messageFormatVersion = replicaManager.getMessageFormatVersion(groupMetadataTopicAndPartition).getOrElse {
       throw new IllegalArgumentException(s"Message format version for partition $groupMetadataTopicPartitionCount not found")
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/fb65ff40/core/src/main/scala/kafka/server/KafkaApis.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index 5cadb8b..0c85de0 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -32,7 +32,8 @@ import kafka.log._
 import kafka.message.{ByteBufferMessageSet, Message, MessageSet}
 import kafka.network._
 import kafka.network.RequestChannel.{Response, Session}
-import kafka.security.auth.{Authorizer, ClusterAction, Create, Describe, Group, Operation, Read, Resource, Topic, Write}
+import kafka.security.auth
+import kafka.security.auth.{Authorizer, ClusterAction, Create, Describe, Group, Operation, Read, Resource, Write}
 import kafka.utils.{Logging, SystemTime, ZKGroupTopicDirs, ZkUtils}
 import org.apache.kafka.common.errors.{ClusterAuthorizationException, InvalidTopicException, NotLeaderForPartitionException, UnknownTopicOrPartitionException, TopicExistsException}
 import org.apache.kafka.common.metrics.Metrics
@@ -41,7 +42,6 @@ import org.apache.kafka.common.requests.{ApiVersionsResponse, DescribeGroupsRequ
 import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse
 import org.apache.kafka.common.utils.Utils
 import org.apache.kafka.common.{Node, TopicPartition}
-import org.apache.kafka.common.internals.TopicConstants
 
 import scala.collection._
 import scala.collection.JavaConverters._
@@ -131,11 +131,11 @@ class KafkaApis(val requestChannel: RequestChannel,
         // this callback is invoked under the replica state change lock to ensure proper order of
         // leadership changes
         updatedLeaders.foreach { partition =>
-          if (partition.topic == TopicConstants.GROUP_METADATA_TOPIC_NAME)
+          if (partition.topic == Topic.GroupMetadataTopicName)
             coordinator.handleGroupImmigration(partition.partitionId)
         }
         updatedFollowers.foreach { partition =>
-          if (partition.topic == TopicConstants.GROUP_METADATA_TOPIC_NAME)
+          if (partition.topic == Topic.GroupMetadataTopicName)
             coordinator.handleGroupEmigration(partition.partitionId)
         }
       }
@@ -238,7 +238,7 @@ class KafkaApis(val requestChannel: RequestChannel,
       val filteredRequestInfo = offsetCommitRequest.offsetData.asScala.toMap -- invalidRequestsInfo.keys
 
       val (authorizedRequestInfo, unauthorizedRequestInfo) = filteredRequestInfo.partition {
-        case (topicPartition, offsetMetadata) => authorize(request.session, Read, new Resource(Topic, topicPartition.topic))
+        case (topicPartition, offsetMetadata) => authorize(request.session, Read, new Resource(auth.Topic, topicPartition.topic))
       }
 
       // the callback for sending an offset commit response
@@ -335,7 +335,7 @@ class KafkaApis(val requestChannel: RequestChannel,
     val numBytesAppended = request.header.sizeOf + produceRequest.sizeOf
 
     val (authorizedRequestInfo, unauthorizedRequestInfo) = produceRequest.partitionRecords.asScala.partition {
-      case (topicPartition, _) => authorize(request.session, Write, new Resource(Topic, topicPartition.topic))
+      case (topicPartition, _) => authorize(request.session, Write, new Resource(auth.Topic, topicPartition.topic))
     }
 
     // the callback for sending a produce response
@@ -430,7 +430,7 @@ class KafkaApis(val requestChannel: RequestChannel,
     val fetchRequest = request.requestObj.asInstanceOf[FetchRequest]
 
     val (authorizedRequestInfo, unauthorizedRequestInfo) = fetchRequest.requestInfo.partition {
-      case (topicAndPartition, _) => authorize(request.session, Read, new Resource(Topic, topicAndPartition.topic))
+      case (topicAndPartition, _) => authorize(request.session, Read, new Resource(auth.Topic, topicAndPartition.topic))
     }
 
     val unauthorizedPartitionData = unauthorizedRequestInfo.mapValues { _ =>
@@ -518,7 +518,7 @@ class KafkaApis(val requestChannel: RequestChannel,
     val offsetRequest = request.body.asInstanceOf[ListOffsetRequest]
 
     val (authorizedRequestInfo, unauthorizedRequestInfo) = offsetRequest.offsetData.asScala.partition {
-      case (topicPartition, _) => authorize(request.session, Describe, new Resource(Topic, topicPartition.topic))
+      case (topicPartition, _) => authorize(request.session, Describe, new Resource(auth.Topic, topicPartition.topic))
     }
 
     val unauthorizedResponseStatus = unauthorizedRequestInfo.mapValues(_ =>
@@ -636,14 +636,14 @@ class KafkaApis(val requestChannel: RequestChannel,
       AdminUtils.createTopic(zkUtils, topic, numPartitions, replicationFactor, properties, RackAwareMode.Safe)
       info("Auto creation of topic %s with %d partitions and replication factor %d is successful"
         .format(topic, numPartitions, replicationFactor))
-      new MetadataResponse.TopicMetadata(Errors.LEADER_NOT_AVAILABLE, topic, common.Topic.isInternal(topic),
+      new MetadataResponse.TopicMetadata(Errors.LEADER_NOT_AVAILABLE, topic, Topic.isInternal(topic),
         java.util.Collections.emptyList())
     } catch {
       case e: TopicExistsException => // let it go, possibly another broker created this topic
-        new MetadataResponse.TopicMetadata(Errors.LEADER_NOT_AVAILABLE, topic, common.Topic.isInternal(topic),
+        new MetadataResponse.TopicMetadata(Errors.LEADER_NOT_AVAILABLE, topic, Topic.isInternal(topic),
           java.util.Collections.emptyList())
       case itex: InvalidTopicException =>
-        new MetadataResponse.TopicMetadata(Errors.INVALID_TOPIC_EXCEPTION, topic, common.Topic.isInternal(topic),
+        new MetadataResponse.TopicMetadata(Errors.INVALID_TOPIC_EXCEPTION, topic, Topic.isInternal(topic),
           java.util.Collections.emptyList())
     }
   }
@@ -655,12 +655,12 @@ class KafkaApis(val requestChannel: RequestChannel,
         Math.min(config.offsetsTopicReplicationFactor.toInt, aliveBrokers.length)
       else
         config.offsetsTopicReplicationFactor.toInt
-    createTopic(TopicConstants.GROUP_METADATA_TOPIC_NAME, config.offsetsTopicPartitions,
+    createTopic(Topic.GroupMetadataTopicName, config.offsetsTopicPartitions,
       offsetsTopicReplicationFactor, coordinator.offsetsTopicConfigs)
   }
 
   private def getOrCreateGroupMetadataTopic(securityProtocol: SecurityProtocol): MetadataResponse.TopicMetadata = {
-    val topicMetadata = metadataCache.getTopicMetadata(Set(TopicConstants.GROUP_METADATA_TOPIC_NAME), securityProtocol)
+    val topicMetadata = metadataCache.getTopicMetadata(Set(Topic.GroupMetadataTopicName), securityProtocol)
     topicMetadata.headOption.getOrElse(createGroupMetadataTopic())
   }
 
@@ -671,12 +671,12 @@ class KafkaApis(val requestChannel: RequestChannel,
     } else {
       val nonExistentTopics = topics -- topicResponses.map(_.topic).toSet
       val responsesForNonExistentTopics = nonExistentTopics.map { topic =>
-        if (topic == TopicConstants.GROUP_METADATA_TOPIC_NAME) {
+        if (topic == Topic.GroupMetadataTopicName) {
           createGroupMetadataTopic()
         } else if (config.autoCreateTopicsEnable) {
           createTopic(topic, config.numPartitions, config.defaultReplicationFactor)
         } else {
-          new MetadataResponse.TopicMetadata(Errors.UNKNOWN_TOPIC_OR_PARTITION, topic, common.Topic.isInternal(topic),
+          new MetadataResponse.TopicMetadata(Errors.UNKNOWN_TOPIC_OR_PARTITION, topic, Topic.isInternal(topic),
             java.util.Collections.emptyList())
         }
       }
@@ -706,7 +706,7 @@ class KafkaApis(val requestChannel: RequestChannel,
       }
 
     var (authorizedTopics, unauthorizedTopics) =
-      topics.partition(topic => authorize(request.session, Describe, new Resource(Topic, topic)))
+      topics.partition(topic => authorize(request.session, Describe, new Resource(auth.Topic, topic)))
 
     if (authorizedTopics.nonEmpty) {
       val nonExistingTopics = metadataCache.getNonExistingTopics(authorizedTopics)
@@ -721,7 +721,7 @@ class KafkaApis(val requestChannel: RequestChannel,
     }
 
     val unauthorizedTopicMetadata = unauthorizedTopics.map(topic =>
-      new MetadataResponse.TopicMetadata(Errors.TOPIC_AUTHORIZATION_FAILED, topic, common.Topic.isInternal(topic),
+      new MetadataResponse.TopicMetadata(Errors.TOPIC_AUTHORIZATION_FAILED, topic, Topic.isInternal(topic),
         java.util.Collections.emptyList()))
 
     // In version 0, we returned an error when brokers with replicas were unavailable,
@@ -767,7 +767,7 @@ class KafkaApis(val requestChannel: RequestChannel,
       new OffsetFetchResponse(results.asJava)
     } else {
       val (authorizedTopicPartitions, unauthorizedTopicPartitions) = offsetFetchRequest.partitions.asScala.partition { topicPartition =>
-        authorize(request.session, Describe, new Resource(Topic, topicPartition.topic))
+        authorize(request.session, Describe, new Resource(auth.Topic, topicPartition.topic))
       }
       val unauthorizedTopicResponse = new OffsetFetchResponse.PartitionData(OffsetFetchResponse.INVALID_OFFSET, "", Errors.TOPIC_AUTHORIZATION_FAILED.code)
       val unauthorizedStatus = unauthorizedTopicPartitions.map(topicPartition => (topicPartition, unauthorizedTopicResponse)).toMap

http://git-wip-us.apache.org/repos/asf/kafka/blob/fb65ff40/core/src/main/scala/kafka/server/ReplicaManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala
index 77df029..2b97783 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -34,7 +34,6 @@ import org.apache.kafka.common.errors.{ControllerMovedException, CorruptRecordEx
                                         RecordBatchTooLargeException, RecordTooLargeException, ReplicaNotAvailableException,
                                         UnknownTopicOrPartitionException}
 import org.apache.kafka.common.TopicPartition
-import org.apache.kafka.common.internals.TopicConstants
 import org.apache.kafka.common.metrics.Metrics
 import org.apache.kafka.common.protocol.Errors
 import org.apache.kafka.common.requests.{LeaderAndIsrRequest, PartitionState, StopReplicaRequest, UpdateMetadataRequest}
@@ -539,7 +538,7 @@ class ReplicaManager(val config: KafkaConfig,
           val initialLogEndOffset = localReplica.logEndOffset
           val logReadInfo = localReplica.log match {
             case Some(log) =>
-              val adjustedFetchSize = if (TopicConstants.INTERNAL_TOPICS.contains(topic) && !readOnlyCommitted) Math.max(fetchSize, log.config.maxMessageSize) else fetchSize
+              val adjustedFetchSize = if (Topic.isInternal(topic) && !readOnlyCommitted) Math.max(fetchSize, log.config.maxMessageSize) else fetchSize
               log.read(offset, adjustedFetchSize, maxOffsetOpt)
             case None =>
               error("Leader for partition [%s,%d] does not have a local log".format(topic, partition))

http://git-wip-us.apache.org/repos/asf/kafka/blob/fb65ff40/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
index 8f23c49..f35151f 100644
--- a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
@@ -19,6 +19,7 @@ import java.util.concurrent.ExecutionException
 import java.util.{ArrayList, Collections, Properties}
 
 import kafka.cluster.EndPoint
+import kafka.common
 import kafka.common.TopicAndPartition
 import kafka.integration.KafkaServerTestHarness
 import kafka.security.auth._
@@ -38,7 +39,6 @@ import org.junit.{After, Assert, Before, Test}
 import scala.collection.JavaConverters._
 import scala.collection.mutable
 import scala.collection.mutable.Buffer
-import org.apache.kafka.common.internals.TopicConstants
 
 class AuthorizerIntegrationTest extends KafkaServerTestHarness {
   val topic = "topic"
@@ -149,7 +149,7 @@ class AuthorizerIntegrationTest extends KafkaServerTestHarness {
       consumers += TestUtils.createNewConsumer(TestUtils.getBrokerListStrFromServers(servers), groupId = group, securityProtocol = SecurityProtocol.PLAINTEXT)
 
     // create the consumer offset topic
-    TestUtils.createTopic(zkUtils, TopicConstants.GROUP_METADATA_TOPIC_NAME,
+    TestUtils.createTopic(zkUtils, common.Topic.GroupMetadataTopicName,
       1,
       1,
       servers,

http://git-wip-us.apache.org/repos/asf/kafka/blob/fb65ff40/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala b/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala
index ea74d5d..f039750 100644
--- a/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala
+++ b/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala
@@ -19,13 +19,13 @@ import org.apache.kafka.common.record.TimestampType
 import org.apache.kafka.common.serialization.ByteArrayDeserializer
 import org.apache.kafka.common.{PartitionInfo, TopicPartition}
 import kafka.utils.{TestUtils, Logging, ShutdownableThread}
+import kafka.common.Topic
 import kafka.server.KafkaConfig
 import java.util.ArrayList
 import org.junit.Assert._
 import org.junit.{Before, Test}
 import scala.collection.JavaConverters._
 import scala.collection.mutable.Buffer
-import org.apache.kafka.common.internals.TopicConstants
 import org.apache.kafka.clients.producer.KafkaProducer
 
 /**
@@ -192,7 +192,7 @@ abstract class BaseConsumerTest extends IntegrationTestHarness with Logging {
     // get metadata for the topic
     var parts: Seq[PartitionInfo] = null
     while (parts == null)
-      parts = consumer0.partitionsFor(TopicConstants.GROUP_METADATA_TOPIC_NAME).asScala
+      parts = consumer0.partitionsFor(Topic.GroupMetadataTopicName).asScala
     assertEquals(1, parts.size)
     assertNotNull(parts.head.leader())
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/fb65ff40/core/src/test/scala/integration/kafka/api/GroupCoordinatorIntegrationTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/GroupCoordinatorIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/GroupCoordinatorIntegrationTest.scala
index 9183d0f..f36e146 100644
--- a/core/src/test/scala/integration/kafka/api/GroupCoordinatorIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/GroupCoordinatorIntegrationTest.scala
@@ -12,7 +12,7 @@
  */
 package integration.kafka.api
 
-import kafka.common.TopicAndPartition
+import kafka.common.{Topic, TopicAndPartition}
 import kafka.integration.KafkaServerTestHarness
 import kafka.log.Log
 import kafka.message.GZIPCompressionCodec
@@ -20,7 +20,6 @@ import kafka.server.KafkaConfig
 import kafka.utils.TestUtils
 import org.apache.kafka.clients.consumer.OffsetAndMetadata
 import org.apache.kafka.common.TopicPartition
-import org.apache.kafka.common.internals.TopicConstants
 import org.apache.kafka.common.protocol.SecurityProtocol
 import org.junit.Test
 import org.junit.Assert._
@@ -43,13 +42,13 @@ class GroupCoordinatorIntegrationTest extends KafkaServerTestHarness {
     val consumer = TestUtils.createNewConsumer(TestUtils.getBrokerListStrFromServers(servers),
                                                securityProtocol = SecurityProtocol.PLAINTEXT)
     val offsetMap = Map(
-      new TopicPartition(TopicConstants.GROUP_METADATA_TOPIC_NAME, 0) -> new OffsetAndMetadata(10, "")
+      new TopicPartition(Topic.GroupMetadataTopicName, 0) -> new OffsetAndMetadata(10, "")
     ).asJava
     consumer.commitSync(offsetMap)
     val logManager = servers.head.getLogManager
 
     def getGroupMetadataLogOpt: Option[Log] =
-      logManager.getLog(TopicAndPartition(TopicConstants.GROUP_METADATA_TOPIC_NAME, 0))
+      logManager.getLog(TopicAndPartition(Topic.GroupMetadataTopicName, 0))
 
     TestUtils.waitUntilTrue(() => getGroupMetadataLogOpt.exists(_.logSegments.exists(_.log.nonEmpty)),
                             "Commit message not appended in time")

http://git-wip-us.apache.org/repos/asf/kafka/blob/fb65ff40/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala b/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala
index 29d3bd6..9595ad6 100644
--- a/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala
+++ b/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala
@@ -21,13 +21,14 @@ import org.apache.kafka.clients.producer.ProducerConfig
 import org.apache.kafka.clients.consumer.{ConsumerConfig, KafkaConsumer}
 import kafka.utils.TestUtils
 import java.util.Properties
+
+import kafka.common.Topic
 import org.apache.kafka.clients.producer.KafkaProducer
 import kafka.server.KafkaConfig
 import kafka.integration.KafkaServerTestHarness
 import org.junit.{After, Before}
+
 import scala.collection.mutable.Buffer
-import kafka.coordinator.GroupCoordinator
-import org.apache.kafka.common.internals.TopicConstants
 
 /**
  * A helper class for writing integration tests that involve producers, consumers, and servers
@@ -77,7 +78,7 @@ trait IntegrationTestHarness extends KafkaServerTestHarness {
     }
 
     // create the consumer offset topic
-    TestUtils.createTopic(zkUtils, TopicConstants.GROUP_METADATA_TOPIC_NAME,
+    TestUtils.createTopic(zkUtils, Topic.GroupMetadataTopicName,
       serverConfig.getProperty(KafkaConfig.OffsetsTopicPartitionsProp).toInt,
       serverConfig.getProperty(KafkaConfig.OffsetsTopicReplicationFactorProp).toInt,
       servers,

http://git-wip-us.apache.org/repos/asf/kafka/blob/fb65ff40/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala b/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala
index 7a22c73..c8fcba6 100644
--- a/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala
+++ b/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala
@@ -17,8 +17,8 @@
 
 package kafka.api
 
-import java.util.concurrent.{ExecutionException, TimeUnit, TimeoutException}
-import java.util.{Properties, Random}
+import java.util.concurrent.{ExecutionException, TimeoutException}
+import java.util.{Properties}
 import kafka.common.Topic
 import kafka.consumer.SimpleConsumer
 import kafka.integration.KafkaServerTestHarness
@@ -26,11 +26,9 @@ import kafka.server.KafkaConfig
 import kafka.utils.{ShutdownableThread, TestUtils}
 import org.apache.kafka.clients.producer._
 import org.apache.kafka.clients.producer.internals.ErrorLoggingCallback
-import org.apache.kafka.common.KafkaException
 import org.apache.kafka.common.errors.{InvalidTopicException, NotEnoughReplicasAfterAppendException, NotEnoughReplicasException}
 import org.junit.Assert._
 import org.junit.{After, Before, Test}
-import org.apache.kafka.common.internals.TopicConstants
 
 class ProducerFailureHandlingTest extends KafkaServerTestHarness {
   private val producerBufferSize = 30000
@@ -201,7 +199,7 @@ class ProducerFailureHandlingTest extends KafkaServerTestHarness {
   @Test
   def testCannotSendToInternalTopic() {
     val thrown = intercept[ExecutionException] {
-      producer2.send(new ProducerRecord[Array[Byte],Array[Byte]](TopicConstants.INTERNAL_TOPICS.iterator.next, "test".getBytes, "test".getBytes)).get
+      producer2.send(new ProducerRecord[Array[Byte],Array[Byte]](Topic.InternalTopics.head, "test".getBytes, "test".getBytes)).get
     }
     assertTrue("Unexpected exception while sending to an invalid topic " + thrown.getCause, thrown.getCause.isInstanceOf[InvalidTopicException])
   }

http://git-wip-us.apache.org/repos/asf/kafka/blob/fb65ff40/core/src/test/scala/unit/kafka/admin/TopicCommandTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/admin/TopicCommandTest.scala b/core/src/test/scala/unit/kafka/admin/TopicCommandTest.scala
index 11dc36e..8ce7c90 100644
--- a/core/src/test/scala/unit/kafka/admin/TopicCommandTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/TopicCommandTest.scala
@@ -18,16 +18,15 @@ package kafka.admin
 
 import org.junit.Assert._
 import org.junit.Test
+import kafka.common.Topic
 import kafka.utils.Logging
 import kafka.utils.TestUtils
 import kafka.zk.ZooKeeperTestHarness
 import kafka.server.ConfigType
 import kafka.admin.TopicCommand.TopicCommandOptions
 import kafka.utils.ZkUtils._
-import org.apache.kafka.common.internals.TopicConstants
 import org.apache.kafka.common.errors.TopicExistsException
 
-
 class TopicCommandTest extends ZooKeeperTestHarness with Logging with RackAwareTest {
 
   @Test
@@ -87,12 +86,12 @@ class TopicCommandTest extends ZooKeeperTestHarness with Logging with RackAwareT
     // create the offset topic
     val createOffsetTopicOpts = new TopicCommandOptions(Array("--partitions", numPartitionsOriginal.toString,
       "--replication-factor", "1",
-      "--topic", TopicConstants.GROUP_METADATA_TOPIC_NAME))
+      "--topic", Topic.GroupMetadataTopicName))
     TopicCommand.createTopic(zkUtils, createOffsetTopicOpts)
 
-    // try to delete the TopicConstants.GROUP_METADATA_TOPIC_NAME and make sure it doesn't
-    val deleteOffsetTopicOpts = new TopicCommandOptions(Array("--topic", TopicConstants.GROUP_METADATA_TOPIC_NAME))
-    val deleteOffsetTopicPath = getDeleteTopicPath(TopicConstants.GROUP_METADATA_TOPIC_NAME)
+    // try to delete the Topic.GroupMetadataTopicName and make sure it doesn't
+    val deleteOffsetTopicOpts = new TopicCommandOptions(Array("--topic", Topic.GroupMetadataTopicName))
+    val deleteOffsetTopicPath = getDeleteTopicPath(Topic.GroupMetadataTopicName)
     assertFalse("Delete path for topic shouldn't exist before deletion.", zkUtils.zkClient.exists(deleteOffsetTopicPath))
     intercept[AdminOperationException] {
         TopicCommand.deleteTopic(zkUtils, deleteOffsetTopicOpts)

http://git-wip-us.apache.org/repos/asf/kafka/blob/fb65ff40/core/src/test/scala/unit/kafka/consumer/TopicFilterTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/consumer/TopicFilterTest.scala b/core/src/test/scala/unit/kafka/consumer/TopicFilterTest.scala
index 0e0a06a..1e073aa 100644
--- a/core/src/test/scala/unit/kafka/consumer/TopicFilterTest.scala
+++ b/core/src/test/scala/unit/kafka/consumer/TopicFilterTest.scala
@@ -21,9 +21,7 @@ package kafka.consumer
 import org.junit.Assert._
 import org.scalatest.junit.JUnitSuite
 import org.junit.Test
-import kafka.coordinator.GroupCoordinator
-import org.apache.kafka.common.internals.TopicConstants
-
+import kafka.common.Topic
 
 class TopicFilterTest extends JUnitSuite {
 
@@ -38,8 +36,8 @@ class TopicFilterTest extends JUnitSuite {
 
     val topicFilter2 = new Whitelist(".+")
     assertTrue(topicFilter2.isTopicAllowed("alltopics", excludeInternalTopics = true))
-    assertFalse(topicFilter2.isTopicAllowed(TopicConstants.GROUP_METADATA_TOPIC_NAME, excludeInternalTopics = true))
-    assertTrue(topicFilter2.isTopicAllowed(TopicConstants.GROUP_METADATA_TOPIC_NAME, excludeInternalTopics = false))
+    assertFalse(topicFilter2.isTopicAllowed(Topic.GroupMetadataTopicName, excludeInternalTopics = true))
+    assertTrue(topicFilter2.isTopicAllowed(Topic.GroupMetadataTopicName, excludeInternalTopics = false))
 
     val topicFilter3 = new Whitelist("white_listed-topic.+")
     assertTrue(topicFilter3.isTopicAllowed("white_listed-topic1", excludeInternalTopics = true))
@@ -58,8 +56,8 @@ class TopicFilterTest extends JUnitSuite {
     assertFalse(topicFilter1.isTopicAllowed("black1", excludeInternalTopics = true))
     assertFalse(topicFilter1.isTopicAllowed("black1", excludeInternalTopics = false))
 
-    assertFalse(topicFilter1.isTopicAllowed(TopicConstants.GROUP_METADATA_TOPIC_NAME, excludeInternalTopics = true))
-    assertTrue(topicFilter1.isTopicAllowed(TopicConstants.GROUP_METADATA_TOPIC_NAME, excludeInternalTopics = false))
+    assertFalse(topicFilter1.isTopicAllowed(Topic.GroupMetadataTopicName, excludeInternalTopics = true))
+    assertTrue(topicFilter1.isTopicAllowed(Topic.GroupMetadataTopicName, excludeInternalTopics = false))
   }
 
   @Test
@@ -83,4 +81,4 @@ class TopicFilterTest extends JUnitSuite {
     assertEquals("-\\\\u007f-", getTopicCountMapKey("-\\u007f-"))
     assertEquals("-\\\\u009f-", getTopicCountMapKey("-\\u009f-"))
   }    
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/fb65ff40/core/src/test/scala/unit/kafka/coordinator/GroupCoordinatorResponseTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/coordinator/GroupCoordinatorResponseTest.scala b/core/src/test/scala/unit/kafka/coordinator/GroupCoordinatorResponseTest.scala
index fa13a92..c917ca4 100644
--- a/core/src/test/scala/unit/kafka/coordinator/GroupCoordinatorResponseTest.scala
+++ b/core/src/test/scala/unit/kafka/coordinator/GroupCoordinatorResponseTest.scala
@@ -20,7 +20,7 @@ package kafka.coordinator
 import kafka.utils.timer.MockTimer
 import org.apache.kafka.common.record.Record
 import org.junit.Assert._
-import kafka.common.OffsetAndMetadata
+import kafka.common.{OffsetAndMetadata, Topic}
 import kafka.message.{Message, MessageSet}
 import kafka.server.{DelayedOperationPurgatory, ReplicaManager, KafkaConfig}
 import kafka.utils._
@@ -35,7 +35,6 @@ import java.util.concurrent.TimeUnit
 import scala.collection._
 import scala.concurrent.duration.Duration
 import scala.concurrent.{Await, Future, Promise}
-import org.apache.kafka.common.internals.TopicConstants
 
 /**
  * Test GroupCoordinator responses
@@ -80,12 +79,12 @@ class GroupCoordinatorResponseTest extends JUnitSuite {
 
     // make two partitions of the group topic to make sure some partitions are not owned by the coordinator
     val ret = mutable.Map[String, Map[Int, Seq[Int]]]()
-    ret += (TopicConstants.GROUP_METADATA_TOPIC_NAME -> Map(0 -> Seq(1), 1 -> Seq(1)))
+    ret += (Topic.GroupMetadataTopicName -> Map(0 -> Seq(1), 1 -> Seq(1)))
 
     replicaManager = EasyMock.createNiceMock(classOf[ReplicaManager])
 
     zkUtils = EasyMock.createNiceMock(classOf[ZkUtils])
-    EasyMock.expect(zkUtils.getPartitionAssignmentForTopics(Seq(TopicConstants.GROUP_METADATA_TOPIC_NAME))).andReturn(ret)
+    EasyMock.expect(zkUtils.getPartitionAssignmentForTopics(Seq(Topic.GroupMetadataTopicName))).andReturn(ret)
     EasyMock.replay(zkUtils)
 
     timer = new MockTimer
@@ -307,7 +306,7 @@ class GroupCoordinatorResponseTest extends JUnitSuite {
     assertEquals(Errors.NONE.code, syncGroupErrorCode)
 
     EasyMock.reset(replicaManager)
-    EasyMock.expect(replicaManager.getPartition(TopicConstants.GROUP_METADATA_TOPIC_NAME, groupPartitionId)).andReturn(None)
+    EasyMock.expect(replicaManager.getPartition(Topic.GroupMetadataTopicName, groupPartitionId)).andReturn(None)
     EasyMock.expect(replicaManager.getMessageFormatVersion(EasyMock.anyObject())).andReturn(Some(Message.MagicValue_V1)).anyTimes()
     EasyMock.replay(replicaManager)
 
@@ -931,7 +930,7 @@ class GroupCoordinatorResponseTest extends JUnitSuite {
       EasyMock.anyObject().asInstanceOf[Map[TopicPartition, MessageSet]],
       EasyMock.capture(capturedArgument))).andAnswer(new IAnswer[Unit] {
       override def answer = capturedArgument.getValue.apply(
-        Map(new TopicPartition(TopicConstants.GROUP_METADATA_TOPIC_NAME, groupPartitionId) ->
+        Map(new TopicPartition(Topic.GroupMetadataTopicName, groupPartitionId) ->
           new PartitionResponse(Errors.NONE.code, 0L, Record.NO_TIMESTAMP)
         )
       )})
@@ -1009,7 +1008,7 @@ class GroupCoordinatorResponseTest extends JUnitSuite {
       EasyMock.anyObject().asInstanceOf[Map[TopicPartition, MessageSet]],
       EasyMock.capture(capturedArgument))).andAnswer(new IAnswer[Unit] {
       override def answer = capturedArgument.getValue.apply(
-        Map(new TopicPartition(TopicConstants.GROUP_METADATA_TOPIC_NAME, groupPartitionId) ->
+        Map(new TopicPartition(Topic.GroupMetadataTopicName, groupPartitionId) ->
           new PartitionResponse(Errors.NONE.code, 0L, Record.NO_TIMESTAMP)
         )
       )})
@@ -1023,7 +1022,7 @@ class GroupCoordinatorResponseTest extends JUnitSuite {
   private def leaveGroup(groupId: String, consumerId: String): LeaveGroupCallbackParams = {
     val (responseFuture, responseCallback) = setupHeartbeatCallback
 
-    EasyMock.expect(replicaManager.getPartition(TopicConstants.GROUP_METADATA_TOPIC_NAME, groupPartitionId)).andReturn(None)
+    EasyMock.expect(replicaManager.getPartition(Topic.GroupMetadataTopicName, groupPartitionId)).andReturn(None)
     EasyMock.expect(replicaManager.getMessageFormatVersion(EasyMock.anyObject())).andReturn(Some(Message.MagicValue_V1)).anyTimes()
     EasyMock.replay(replicaManager)
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/fb65ff40/core/src/test/scala/unit/kafka/coordinator/GroupMetadataManagerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/coordinator/GroupMetadataManagerTest.scala b/core/src/test/scala/unit/kafka/coordinator/GroupMetadataManagerTest.scala
index 0bd6d71..b9569ca 100644
--- a/core/src/test/scala/unit/kafka/coordinator/GroupMetadataManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/coordinator/GroupMetadataManagerTest.scala
@@ -18,13 +18,12 @@
 package kafka.coordinator
 
 import kafka.cluster.Partition
-import kafka.common.OffsetAndMetadata
+import kafka.common.{OffsetAndMetadata, Topic}
 import kafka.log.LogAppendInfo
 import kafka.message.{ByteBufferMessageSet, Message, MessageSet}
 import kafka.server.{KafkaConfig, ReplicaManager}
 import kafka.utils.{KafkaScheduler, MockTime, TestUtils, ZkUtils}
 import org.apache.kafka.common.TopicPartition
-import org.apache.kafka.common.internals.TopicConstants
 import org.apache.kafka.common.protocol.Errors
 import org.apache.kafka.common.record.Record
 import org.apache.kafka.common.requests.OffsetFetchResponse
@@ -67,10 +66,10 @@ class GroupMetadataManagerTest {
 
     // make two partitions of the group topic to make sure some partitions are not owned by the coordinator
     val ret = mutable.Map[String, Map[Int, Seq[Int]]]()
-    ret += (TopicConstants.GROUP_METADATA_TOPIC_NAME -> Map(0 -> Seq(1), 1 -> Seq(1)))
+    ret += (Topic.GroupMetadataTopicName -> Map(0 -> Seq(1), 1 -> Seq(1)))
 
     zkUtils = EasyMock.createNiceMock(classOf[ZkUtils])
-    EasyMock.expect(zkUtils.getPartitionAssignmentForTopics(Seq(TopicConstants.GROUP_METADATA_TOPIC_NAME))).andReturn(ret)
+    EasyMock.expect(zkUtils.getPartitionAssignmentForTopics(Seq(Topic.GroupMetadataTopicName))).andReturn(ret)
     EasyMock.replay(zkUtils)
 
     time = new MockTime
@@ -236,7 +235,7 @@ class GroupMetadataManagerTest {
       topicPartition1 -> OffsetAndMetadata(offset, "", startMs, startMs + 1),
       topicPartition2 -> OffsetAndMetadata(offset, "", startMs, startMs + 3))
 
-    EasyMock.expect(replicaManager.getPartition(TopicConstants.GROUP_METADATA_TOPIC_NAME, groupPartitionId)).andStubReturn(Some(partition))
+    EasyMock.expect(replicaManager.getPartition(Topic.GroupMetadataTopicName, groupPartitionId)).andStubReturn(Some(partition))
     expectAppendMessage(Errors.NONE)
     EasyMock.replay(replicaManager)
 
@@ -290,7 +289,7 @@ class GroupMetadataManagerTest {
       topicPartition1 -> OffsetAndMetadata(offset, "", startMs, startMs + 1),
       topicPartition2 -> OffsetAndMetadata(offset, "", startMs, startMs + 3))
 
-    EasyMock.expect(replicaManager.getPartition(TopicConstants.GROUP_METADATA_TOPIC_NAME, groupPartitionId)).andStubReturn(Some(partition))
+    EasyMock.expect(replicaManager.getPartition(Topic.GroupMetadataTopicName, groupPartitionId)).andStubReturn(Some(partition))
     expectAppendMessage(Errors.NONE)
     EasyMock.replay(replicaManager)
 
@@ -351,7 +350,7 @@ class GroupMetadataManagerTest {
       topicPartition1 -> OffsetAndMetadata(offset, "", startMs, startMs + 1),
       topicPartition2 -> OffsetAndMetadata(offset, "", startMs, startMs + 3))
 
-    EasyMock.expect(replicaManager.getPartition(TopicConstants.GROUP_METADATA_TOPIC_NAME, groupPartitionId)).andStubReturn(Some(partition))
+    EasyMock.expect(replicaManager.getPartition(Topic.GroupMetadataTopicName, groupPartitionId)).andStubReturn(Some(partition))
     expectAppendMessage(Errors.NONE)
     EasyMock.replay(replicaManager)
 
@@ -396,7 +395,7 @@ class GroupMetadataManagerTest {
       EasyMock.anyObject().asInstanceOf[Map[TopicPartition, MessageSet]],
       EasyMock.capture(capturedArgument))).andAnswer(new IAnswer[Unit] {
       override def answer = capturedArgument.getValue.apply(
-        Map(new TopicPartition(TopicConstants.GROUP_METADATA_TOPIC_NAME, groupPartitionId) ->
+        Map(new TopicPartition(Topic.GroupMetadataTopicName, groupPartitionId) ->
           new PartitionResponse(error.code, 0L, Record.NO_TIMESTAMP)
         )
       )})

http://git-wip-us.apache.org/repos/asf/kafka/blob/fb65ff40/core/src/test/scala/unit/kafka/server/MetadataRequestTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/MetadataRequestTest.scala b/core/src/test/scala/unit/kafka/server/MetadataRequestTest.scala
index 55eb6f8..46a79de 100644
--- a/core/src/test/scala/unit/kafka/server/MetadataRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/MetadataRequestTest.scala
@@ -19,8 +19,8 @@ package kafka.server
 
 import java.util.Properties
 
+import kafka.common.Topic
 import kafka.utils.TestUtils
-import org.apache.kafka.common.internals.TopicConstants
 import org.apache.kafka.common.protocol.{ApiKeys, Errors}
 import org.apache.kafka.common.requests.{MetadataRequest, MetadataResponse}
 import org.junit.Assert._
@@ -67,7 +67,7 @@ class MetadataRequestTest extends BaseRequestTest {
 
   @Test
   def testIsInternal() {
-    val internalTopic = TopicConstants.GROUP_METADATA_TOPIC_NAME
+    val internalTopic = Topic.GroupMetadataTopicName
     val notInternalTopic = "notInternal"
     // create the topics
     TestUtils.createTopic(zkUtils, internalTopic, 3, 2, servers)
@@ -82,6 +82,8 @@ class MetadataRequestTest extends BaseRequestTest {
 
     assertTrue("internalTopic should show isInternal", internalTopicMetadata.isInternal)
     assertFalse("notInternalTopic topic not should show isInternal", notInternalTopicMetadata.isInternal)
+
+    assertEquals(Set(internalTopic).asJava, metadataResponse.cluster.internalTopics)
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/kafka/blob/fb65ff40/streams/src/test/java/org/apache/kafka/streams/kstream/internals/WindowedStreamPartitionerTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/WindowedStreamPartitionerTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/WindowedStreamPartitionerTest.java
index b31b20d..ba6289c 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/WindowedStreamPartitionerTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/WindowedStreamPartitionerTest.java
@@ -49,7 +49,8 @@ public class WindowedStreamPartitionerTest {
             new PartitionInfo(topicName, 5, Node.noNode(), new Node[0], new Node[0])
     );
 
-    private Cluster cluster = new Cluster(Collections.singletonList(Node.noNode()), infos, Collections.<String>emptySet());
+    private Cluster cluster = new Cluster(Collections.singletonList(Node.noNode()), infos, Collections.<String>emptySet(),
+        Collections.<String>emptySet());
 
     @Test
     public void testCopartitioning() {

http://git-wip-us.apache.org/repos/asf/kafka/blob/fb65ff40/streams/src/test/java/org/apache/kafka/streams/processor/DefaultPartitionGrouperTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/DefaultPartitionGrouperTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/DefaultPartitionGrouperTest.java
index c11d0c1..9683da9 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/DefaultPartitionGrouperTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/DefaultPartitionGrouperTest.java
@@ -44,7 +44,8 @@ public class DefaultPartitionGrouperTest {
             new PartitionInfo("topic2", 1, Node.noNode(), new Node[0], new Node[0])
     );
 
-    private Cluster metadata = new Cluster(Collections.singletonList(Node.noNode()), infos, Collections.<String>emptySet());
+    private Cluster metadata = new Cluster(Collections.singletonList(Node.noNode()), infos, Collections.<String>emptySet(),
+        Collections.<String>emptySet());
 
     @Test
     public void testGrouping() {

http://git-wip-us.apache.org/repos/asf/kafka/blob/fb65ff40/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java
index 32f2598..b1a4a02 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java
@@ -47,7 +47,8 @@ public class RecordCollectorTest {
             new PartitionInfo("topic1", 2, Node.noNode(), new Node[0], new Node[0])
     );
 
-    private Cluster cluster = new Cluster(Collections.singletonList(Node.noNode()), infos, Collections.<String>emptySet());
+    private Cluster cluster = new Cluster(Collections.singletonList(Node.noNode()), infos,
+            Collections.<String>emptySet(), Collections.<String>emptySet());
 
 
     private final ByteArraySerializer byteArraySerializer = new ByteArraySerializer();

http://git-wip-us.apache.org/repos/asf/kafka/blob/fb65ff40/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java
index 4f7037c..21de73a 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java
@@ -78,7 +78,8 @@ public class StreamPartitionAssignorTest {
             new PartitionInfo("topic3", 3, Node.noNode(), new Node[0], new Node[0])
     );
 
-    private Cluster metadata = new Cluster(Arrays.asList(Node.noNode()), infos, Collections.<String>emptySet());
+    private Cluster metadata = new Cluster(Arrays.asList(Node.noNode()), infos, Collections.<String>emptySet(),
+        Collections.<String>emptySet());
 
     private final TaskId task0 = new TaskId(0, 0);
     private final TaskId task1 = new TaskId(0, 1);

http://git-wip-us.apache.org/repos/asf/kafka/blob/fb65ff40/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
index cb3dee0..d1aaa07 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
@@ -81,7 +81,8 @@ public class StreamThreadTest {
             new PartitionInfo("topic3", 2, Node.noNode(), new Node[0], new Node[0])
     );
 
-    private Cluster metadata = new Cluster(Arrays.asList(Node.noNode()), infos, Collections.<String>emptySet());
+    private Cluster metadata = new Cluster(Arrays.asList(Node.noNode()), infos, Collections.<String>emptySet(),
+            Collections.<String>emptySet());
 
     private final PartitionAssignor.Subscription subscription =
             new PartitionAssignor.Subscription(Arrays.asList("topic1", "topic2", "topic3"), subscriptionUserData());