You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ew...@apache.org on 2016/08/09 03:33:07 UTC
kafka git commit: KAFKA-3954;
Consumer should use internal topics information returned by the broker
Repository: kafka
Updated Branches:
refs/heads/trunk da015585a -> fb65ff40a
KAFKA-3954; Consumer should use internal topics information returned by the broker
It previously hardcoded it.
Author: Ismael Juma <is...@juma.me.uk>
Reviewers: Grant Henke <gr...@gmail.com>, Jason Gustafson <ja...@confluent.io>, Ewen Cheslack-Postava <ew...@confluent.io>
Closes #1613 from ijuma/kafka-3954-consumer-internal-topics-from-broker
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/fb65ff40
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/fb65ff40
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/fb65ff40
Branch: refs/heads/trunk
Commit: fb65ff40a8d89bc7ad9b079cf1f2a8b902abd182
Parents: da01558
Author: Ismael Juma <is...@juma.me.uk>
Authored: Mon Aug 8 20:33:59 2016 -0700
Committer: Ewen Cheslack-Postava <me...@ewencp.org>
Committed: Mon Aug 8 20:33:59 2016 -0700
----------------------------------------------------------------------
.../java/org/apache/kafka/clients/Metadata.java | 4 ++-
.../consumer/internals/ConsumerCoordinator.java | 3 +-
.../java/org/apache/kafka/common/Cluster.java | 33 +++++++++++++++---
.../kafka/common/internals/TopicConstants.java | 33 ------------------
.../kafka/common/requests/MetadataResponse.java | 5 ++-
.../org/apache/kafka/clients/MetadataTest.java | 4 +++
.../internals/ConsumerCoordinatorTest.java | 5 ++-
.../clients/producer/KafkaProducerTest.java | 2 ++
.../clients/producer/MockProducerTest.java | 2 +-
.../internals/DefaultPartitionerTest.java | 3 +-
.../internals/RecordAccumulatorTest.java | 2 +-
.../java/org/apache/kafka/test/TestUtils.java | 6 +++-
.../main/scala/kafka/admin/TopicCommand.scala | 3 +-
core/src/main/scala/kafka/common/Topic.scala | 9 +++--
.../main/scala/kafka/consumer/TopicFilter.scala | 7 ++--
.../coordinator/GroupMetadataManager.scala | 27 +++++++--------
.../src/main/scala/kafka/server/KafkaApis.scala | 36 ++++++++++----------
.../scala/kafka/server/ReplicaManager.scala | 3 +-
.../kafka/api/AuthorizerIntegrationTest.scala | 4 +--
.../kafka/api/BaseConsumerTest.scala | 4 +--
.../api/GroupCoordinatorIntegrationTest.scala | 7 ++--
.../kafka/api/IntegrationTestHarness.scala | 7 ++--
.../kafka/api/ProducerFailureHandlingTest.scala | 8 ++---
.../unit/kafka/admin/TopicCommandTest.scala | 11 +++---
.../unit/kafka/consumer/TopicFilterTest.scala | 14 ++++----
.../GroupCoordinatorResponseTest.scala | 15 ++++----
.../coordinator/GroupMetadataManagerTest.scala | 15 ++++----
.../unit/kafka/server/MetadataRequestTest.scala | 6 ++--
.../WindowedStreamPartitionerTest.java | 3 +-
.../processor/DefaultPartitionGrouperTest.java | 3 +-
.../internals/RecordCollectorTest.java | 3 +-
.../internals/StreamPartitionAssignorTest.java | 3 +-
.../processor/internals/StreamThreadTest.java | 3 +-
33 files changed, 148 insertions(+), 145 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/fb65ff40/clients/src/main/java/org/apache/kafka/clients/Metadata.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/Metadata.java b/clients/src/main/java/org/apache/kafka/clients/Metadata.java
index 3934627..0fd5d63 100644
--- a/clients/src/main/java/org/apache/kafka/clients/Metadata.java
+++ b/clients/src/main/java/org/apache/kafka/clients/Metadata.java
@@ -287,7 +287,9 @@ public final class Metadata {
Set<String> unauthorizedTopics = new HashSet<>();
Collection<PartitionInfo> partitionInfos = new ArrayList<>();
List<Node> nodes = Collections.emptyList();
+ Set<String> internalTopics = Collections.<String>emptySet();
if (cluster != null) {
+ internalTopics = cluster.internalTopics();
unauthorizedTopics.addAll(cluster.unauthorizedTopics());
unauthorizedTopics.retainAll(this.topics.keySet());
@@ -299,6 +301,6 @@ public final class Metadata {
}
nodes = cluster.nodes();
}
- return new Cluster(nodes, partitionInfos, unauthorizedTopics);
+ return new Cluster(nodes, partitionInfos, unauthorizedTopics, internalTopics);
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/fb65ff40/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
index 2880efc..b210746 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
@@ -28,7 +28,6 @@ import org.apache.kafka.common.errors.GroupAuthorizationException;
import org.apache.kafka.common.errors.RetriableException;
import org.apache.kafka.common.errors.TopicAuthorizationException;
import org.apache.kafka.common.errors.WakeupException;
-import org.apache.kafka.common.internals.TopicConstants;
import org.apache.kafka.common.metrics.Measurable;
import org.apache.kafka.common.metrics.MetricConfig;
import org.apache.kafka.common.metrics.Metrics;
@@ -155,7 +154,7 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
for (String topic : cluster.topics())
if (subscriptions.getSubscribedPattern().matcher(topic).matches() &&
- !(excludeInternalTopics && TopicConstants.INTERNAL_TOPICS.contains(topic)))
+ !(excludeInternalTopics && cluster.internalTopics().contains(topic)))
topicsToSubscribe.add(topic);
subscriptions.changeSubscription(topicsToSubscribe);
http://git-wip-us.apache.org/repos/asf/kafka/blob/fb65ff40/clients/src/main/java/org/apache/kafka/common/Cluster.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/Cluster.java b/clients/src/main/java/org/apache/kafka/common/Cluster.java
index e1bf581..31447c5 100644
--- a/clients/src/main/java/org/apache/kafka/common/Cluster.java
+++ b/clients/src/main/java/org/apache/kafka/common/Cluster.java
@@ -32,6 +32,7 @@ public final class Cluster {
private final boolean isBootstrapConfigured;
private final List<Node> nodes;
private final Set<String> unauthorizedTopics;
+ private final Set<String> internalTopics;
private final Map<TopicPartition, PartitionInfo> partitionsByTopicPartition;
private final Map<String, List<PartitionInfo>> partitionsByTopic;
private final Map<String, List<PartitionInfo>> availablePartitionsByTopic;
@@ -42,17 +43,32 @@ public final class Cluster {
* Create a new cluster with the given nodes and partitions
* @param nodes The nodes in the cluster
* @param partitions Information about a subset of the topic-partitions this cluster hosts
+ * @deprecated Use the Cluster constructor with 4 parameters
*/
+ @Deprecated
public Cluster(Collection<Node> nodes,
Collection<PartitionInfo> partitions,
Set<String> unauthorizedTopics) {
- this(false, nodes, partitions, unauthorizedTopics);
+ this(false, nodes, partitions, unauthorizedTopics, Collections.<String>emptySet());
+ }
+
+ /**
+ * Create a new cluster with the given nodes and partitions
+ * @param nodes The nodes in the cluster
+ * @param partitions Information about a subset of the topic-partitions this cluster hosts
+ */
+ public Cluster(Collection<Node> nodes,
+ Collection<PartitionInfo> partitions,
+ Set<String> unauthorizedTopics,
+ Set<String> internalTopics) {
+ this(false, nodes, partitions, unauthorizedTopics, internalTopics);
}
private Cluster(boolean isBootstrapConfigured,
Collection<Node> nodes,
Collection<PartitionInfo> partitions,
- Set<String> unauthorizedTopics) {
+ Set<String> unauthorizedTopics,
+ Set<String> internalTopics) {
this.isBootstrapConfigured = isBootstrapConfigured;
// make a randomized, unmodifiable copy of the nodes
@@ -105,13 +121,15 @@ public final class Cluster {
this.partitionsByNode.put(entry.getKey(), Collections.unmodifiableList(entry.getValue()));
this.unauthorizedTopics = Collections.unmodifiableSet(unauthorizedTopics);
+ this.internalTopics = Collections.unmodifiableSet(internalTopics);
}
/**
* Create an empty cluster instance with no nodes and no topic-partitions.
*/
public static Cluster empty() {
- return new Cluster(new ArrayList<Node>(0), new ArrayList<PartitionInfo>(0), Collections.<String>emptySet());
+ return new Cluster(new ArrayList<Node>(0), new ArrayList<PartitionInfo>(0), Collections.<String>emptySet(),
+ Collections.<String>emptySet());
}
/**
@@ -124,7 +142,7 @@ public final class Cluster {
int nodeId = -1;
for (InetSocketAddress address : addresses)
nodes.add(new Node(nodeId--, address.getHostString(), address.getPort()));
- return new Cluster(true, nodes, new ArrayList<PartitionInfo>(0), Collections.<String>emptySet());
+ return new Cluster(true, nodes, new ArrayList<PartitionInfo>(0), Collections.<String>emptySet(), Collections.<String>emptySet());
}
/**
@@ -133,7 +151,8 @@ public final class Cluster {
public Cluster withPartitions(Map<TopicPartition, PartitionInfo> partitions) {
Map<TopicPartition, PartitionInfo> combinedPartitions = new HashMap<>(this.partitionsByTopicPartition);
combinedPartitions.putAll(partitions);
- return new Cluster(this.nodes, combinedPartitions.values(), new HashSet<>(this.unauthorizedTopics));
+ return new Cluster(this.nodes, combinedPartitions.values(), new HashSet<>(this.unauthorizedTopics),
+ new HashSet<>(this.internalTopics));
}
/**
@@ -223,6 +242,10 @@ public final class Cluster {
return unauthorizedTopics;
}
+ public Set<String> internalTopics() {
+ return internalTopics;
+ }
+
public boolean isBootstrapConfigured() {
return isBootstrapConfigured;
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/fb65ff40/clients/src/main/java/org/apache/kafka/common/internals/TopicConstants.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/internals/TopicConstants.java b/clients/src/main/java/org/apache/kafka/common/internals/TopicConstants.java
deleted file mode 100644
index 5d6b992..0000000
--- a/clients/src/main/java/org/apache/kafka/common/internals/TopicConstants.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.common.internals;
-
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashSet;
-
-public final class TopicConstants {
-
- //avoid instantiation
- private TopicConstants() {
- }
-
- // TODO: we store both group metadata and offset data here despite the topic name being offsets only
- public static final String GROUP_METADATA_TOPIC_NAME = "__consumer_offsets";
- public static final Collection<String> INTERNAL_TOPICS = Collections.unmodifiableSet(new HashSet<String>(Arrays.asList(GROUP_METADATA_TOPIC_NAME)));
-}
http://git-wip-us.apache.org/repos/asf/kafka/blob/fb65ff40/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java
index 78b35f8..4bf162d 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java
@@ -252,9 +252,12 @@ public class MetadataResponse extends AbstractRequestResponse {
* @return the cluster snapshot
*/
public Cluster cluster() {
+ Set<String> internalTopics = new HashSet<>();
List<PartitionInfo> partitions = new ArrayList<>();
for (TopicMetadata metadata : topicMetadata) {
if (metadata.error == Errors.NONE) {
+ if (metadata.isInternal)
+ internalTopics.add(metadata.topic);
for (PartitionMetadata partitionMetadata : metadata.partitionMetadata)
partitions.add(new PartitionInfo(
metadata.topic,
@@ -265,7 +268,7 @@ public class MetadataResponse extends AbstractRequestResponse {
}
}
- return new Cluster(this.brokers, partitions, topicsByError(Errors.TOPIC_AUTHORIZATION_FAILED));
+ return new Cluster(this.brokers, partitions, topicsByError(Errors.TOPIC_AUTHORIZATION_FAILED), internalTopics);
}
/**
http://git-wip-us.apache.org/repos/asf/kafka/blob/fb65ff40/clients/src/test/java/org/apache/kafka/clients/MetadataTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/MetadataTest.java b/clients/src/test/java/org/apache/kafka/clients/MetadataTest.java
index 5defb13..5eaa737 100644
--- a/clients/src/test/java/org/apache/kafka/clients/MetadataTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/MetadataTest.java
@@ -134,6 +134,7 @@ public class MetadataTest {
Arrays.asList(
new PartitionInfo("topic", 0, null, null, null),
new PartitionInfo("topic1", 0, null, null, null)),
+ Collections.<String>emptySet(),
Collections.<String>emptySet()),
100);
@@ -161,6 +162,7 @@ public class MetadataTest {
Arrays.asList(
new PartitionInfo("topic", 0, null, null, null),
new PartitionInfo("topic1", 0, null, null, null)),
+ Collections.<String>emptySet(),
Collections.<String>emptySet()),
100);
@@ -187,6 +189,7 @@ public class MetadataTest {
Arrays.asList(
new PartitionInfo("topic", 0, null, null, null),
new PartitionInfo("topic1", 0, null, null, null)),
+ Collections.<String>emptySet(),
Collections.<String>emptySet()),
100);
@@ -197,6 +200,7 @@ public class MetadataTest {
Arrays.asList(
new PartitionInfo("topic2", 0, null, null, null),
new PartitionInfo("topic3", 0, null, null, null)),
+ Collections.<String>emptySet(),
Collections.<String>emptySet()),
100);
http://git-wip-us.apache.org/repos/asf/kafka/blob/fb65ff40/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
index 040824f..176571c 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
@@ -37,7 +37,6 @@ import org.apache.kafka.common.errors.DisconnectException;
import org.apache.kafka.common.errors.GroupAuthorizationException;
import org.apache.kafka.common.errors.OffsetMetadataTooLarge;
import org.apache.kafka.common.errors.WakeupException;
-import org.apache.kafka.common.internals.TopicConstants;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.protocol.types.Struct;
@@ -637,7 +636,7 @@ public class ConsumerCoordinatorTest {
public void testExcludeInternalTopicsConfigOption() {
subscriptions.subscribe(Pattern.compile(".*"), rebalanceListener);
- metadata.update(TestUtils.singletonCluster(TopicConstants.GROUP_METADATA_TOPIC_NAME, 2), time.milliseconds());
+ metadata.update(TestUtils.singletonCluster(TestUtils.GROUP_METADATA_TOPIC_NAME, 2), time.milliseconds());
assertFalse(subscriptions.partitionAssignmentNeeded());
}
@@ -647,7 +646,7 @@ public class ConsumerCoordinatorTest {
coordinator = buildCoordinator(new Metrics(), assignors, false, false);
subscriptions.subscribe(Pattern.compile(".*"), rebalanceListener);
- metadata.update(TestUtils.singletonCluster(TopicConstants.GROUP_METADATA_TOPIC_NAME, 2), time.milliseconds());
+ metadata.update(TestUtils.singletonCluster(TestUtils.GROUP_METADATA_TOPIC_NAME, 2), time.milliseconds());
assertTrue(subscriptions.partitionAssignmentNeeded());
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/fb65ff40/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
index 1780e2f..740a57d 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
@@ -154,10 +154,12 @@ public class KafkaProducerTest {
Collection<Node> nodes = Collections.singletonList(new Node(0, "host1", 1000));
final Cluster emptyCluster = new Cluster(nodes,
Collections.<PartitionInfo>emptySet(),
+ Collections.<String>emptySet(),
Collections.<String>emptySet());
final Cluster cluster = new Cluster(
Collections.singletonList(new Node(0, "host1", 1000)),
Arrays.asList(new PartitionInfo(topic, 0, null, null, null)),
+ Collections.<String>emptySet(),
Collections.<String>emptySet());
// Expect exactly one fetch for each attempt to refresh while topic metadata is not available
http://git-wip-us.apache.org/repos/asf/kafka/blob/fb65ff40/clients/src/test/java/org/apache/kafka/clients/producer/MockProducerTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/MockProducerTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/MockProducerTest.java
index 0a0bdd8..9017869 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/MockProducerTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/MockProducerTest.java
@@ -59,7 +59,7 @@ public class MockProducerTest {
public void testPartitioner() throws Exception {
PartitionInfo partitionInfo0 = new PartitionInfo(topic, 0, null, null, null);
PartitionInfo partitionInfo1 = new PartitionInfo(topic, 1, null, null, null);
- Cluster cluster = new Cluster(new ArrayList<Node>(0), asList(partitionInfo0, partitionInfo1), Collections.<String>emptySet());
+ Cluster cluster = new Cluster(new ArrayList<Node>(0), asList(partitionInfo0, partitionInfo1), Collections.<String>emptySet(), Collections.<String>emptySet());
MockProducer<String, String> producer = new MockProducer<String, String>(cluster, true, new DefaultPartitioner(), new StringSerializer(), new StringSerializer());
ProducerRecord<String, String> record = new ProducerRecord<String, String>(topic, "key", "value");
Future<RecordMetadata> metadata = producer.send(record);
http://git-wip-us.apache.org/repos/asf/kafka/blob/fb65ff40/clients/src/test/java/org/apache/kafka/clients/producer/internals/DefaultPartitionerTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/DefaultPartitionerTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/DefaultPartitionerTest.java
index fd8a5bc..9748222 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/DefaultPartitionerTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/DefaultPartitionerTest.java
@@ -37,7 +37,8 @@ public class DefaultPartitionerTest {
private List<PartitionInfo> partitions = asList(new PartitionInfo(topic, 1, null, nodes, nodes),
new PartitionInfo(topic, 2, node1, nodes, nodes),
new PartitionInfo(topic, 0, node0, nodes, nodes));
- private Cluster cluster = new Cluster(asList(node0, node1, node2), partitions, Collections.<String>emptySet());
+ private Cluster cluster = new Cluster(asList(node0, node1, node2), partitions, Collections.<String>emptySet(),
+ Collections.<String>emptySet());
@Test
public void testKeyPartitionIsStable() {
http://git-wip-us.apache.org/repos/asf/kafka/blob/fb65ff40/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java
index 43ac15a..37f87cc 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java
@@ -64,7 +64,7 @@ public class RecordAccumulatorTest {
private byte[] key = "key".getBytes();
private byte[] value = "value".getBytes();
private int msgSize = Records.LOG_OVERHEAD + Record.recordSize(key, value);
- private Cluster cluster = new Cluster(Arrays.asList(node1, node2), Arrays.asList(part1, part2, part3), Collections.<String>emptySet());
+ private Cluster cluster = new Cluster(Arrays.asList(node1, node2), Arrays.asList(part1, part2, part3), Collections.<String>emptySet(), Collections.<String>emptySet());
private Metrics metrics = new Metrics(time);
private final long maxBlockTimeMs = 1000;
http://git-wip-us.apache.org/repos/asf/kafka/blob/fb65ff40/clients/src/test/java/org/apache/kafka/test/TestUtils.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/test/TestUtils.java b/clients/src/test/java/org/apache/kafka/test/TestUtils.java
index e5e4d9b..4baa63b 100644
--- a/clients/src/test/java/org/apache/kafka/test/TestUtils.java
+++ b/clients/src/test/java/org/apache/kafka/test/TestUtils.java
@@ -29,6 +29,7 @@ import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Random;
+import java.util.Set;
import java.util.UUID;
import org.apache.kafka.clients.consumer.ConsumerConfig;
@@ -53,6 +54,9 @@ public class TestUtils {
public static final String DIGITS = "0123456789";
public static final String LETTERS_AND_DIGITS = LETTERS + DIGITS;
+ public static final String GROUP_METADATA_TOPIC_NAME = "__consumer_offsets";
+ public static final Set<String> INTERNAL_TOPICS = Collections.singleton(GROUP_METADATA_TOPIC_NAME);
+
/* A consistent random number generator to make tests repeatable */
public static final Random SEEDED_RANDOM = new Random(192348092834L);
public static final Random RANDOM = new Random();
@@ -77,7 +81,7 @@ public class TestUtils {
for (int i = 0; i < partitions; i++)
parts.add(new PartitionInfo(topic, i, ns[i % ns.length], ns, ns));
}
- return new Cluster(asList(ns), parts, Collections.<String>emptySet());
+ return new Cluster(asList(ns), parts, Collections.<String>emptySet(), INTERNAL_TOPICS);
}
public static Cluster clusterWith(int nodes, String topic, int partitions) {
http://git-wip-us.apache.org/repos/asf/kafka/blob/fb65ff40/core/src/main/scala/kafka/admin/TopicCommand.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/TopicCommand.scala b/core/src/main/scala/kafka/admin/TopicCommand.scala
index 57a5458..657f26c 100755
--- a/core/src/main/scala/kafka/admin/TopicCommand.scala
+++ b/core/src/main/scala/kafka/admin/TopicCommand.scala
@@ -32,7 +32,6 @@ import org.apache.kafka.common.utils.Utils
import scala.collection.JavaConversions._
import scala.collection._
import org.apache.kafka.clients.consumer.{ConsumerConfig => NewConsumerConfig}
-import org.apache.kafka.common.internals.TopicConstants
object TopicCommand extends Logging {
@@ -138,7 +137,7 @@ object TopicCommand extends Logging {
}
if(opts.options.has(opts.partitionsOpt)) {
- if (topic == TopicConstants.GROUP_METADATA_TOPIC_NAME) {
+ if (topic == Topic.GroupMetadataTopicName) {
throw new IllegalArgumentException("The number of partitions for the offsets topic cannot be changed.")
}
println("WARNING: If partitions are increased for a topic that has a key, the partition " +
http://git-wip-us.apache.org/repos/asf/kafka/blob/fb65ff40/core/src/main/scala/kafka/common/Topic.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/common/Topic.scala b/core/src/main/scala/kafka/common/Topic.scala
index 054c5eb..4a65afb 100644
--- a/core/src/main/scala/kafka/common/Topic.scala
+++ b/core/src/main/scala/kafka/common/Topic.scala
@@ -18,9 +18,14 @@
package kafka.common
import util.matching.Regex
-import org.apache.kafka.common.internals.TopicConstants.INTERNAL_TOPICS
+
+import scala.collection.immutable
object Topic {
+
+ val GroupMetadataTopicName = "__consumer_offsets"
+ val InternalTopics = immutable.Set(GroupMetadataTopicName)
+
val legalChars = "[a-zA-Z0-9\\._\\-]"
private val maxNameLength = 249
private val rgx = new Regex(legalChars + "+")
@@ -63,6 +68,6 @@ object Topic {
}
def isInternal(topic: String): Boolean =
- INTERNAL_TOPICS.contains(topic)
+ InternalTopics.contains(topic)
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/fb65ff40/core/src/main/scala/kafka/consumer/TopicFilter.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/TopicFilter.scala b/core/src/main/scala/kafka/consumer/TopicFilter.scala
index b89968e..1ab4e5c 100644
--- a/core/src/main/scala/kafka/consumer/TopicFilter.scala
+++ b/core/src/main/scala/kafka/consumer/TopicFilter.scala
@@ -17,12 +17,9 @@
package kafka.consumer
-
import kafka.utils.Logging
import java.util.regex.{PatternSyntaxException, Pattern}
import kafka.common.Topic
-import org.apache.kafka.common.internals.TopicConstants
-
sealed abstract class TopicFilter(rawRegex: String) extends Logging {
@@ -48,7 +45,7 @@ sealed abstract class TopicFilter(rawRegex: String) extends Logging {
case class Whitelist(rawRegex: String) extends TopicFilter(rawRegex) {
override def isTopicAllowed(topic: String, excludeInternalTopics: Boolean) = {
- val allowed = topic.matches(regex) && !(TopicConstants.INTERNAL_TOPICS.contains(topic) && excludeInternalTopics)
+ val allowed = topic.matches(regex) && !(Topic.isInternal(topic) && excludeInternalTopics)
debug("%s %s".format(
topic, if (allowed) "allowed" else "filtered"))
@@ -61,7 +58,7 @@ case class Whitelist(rawRegex: String) extends TopicFilter(rawRegex) {
case class Blacklist(rawRegex: String) extends TopicFilter(rawRegex) {
override def isTopicAllowed(topic: String, excludeInternalTopics: Boolean) = {
- val allowed = (!topic.matches(regex)) && !(TopicConstants.INTERNAL_TOPICS.contains(topic) && excludeInternalTopics)
+ val allowed = (!topic.matches(regex)) && !(Topic.isInternal(topic) && excludeInternalTopics)
debug("%s %s".format(
topic, if (allowed) "allowed" else "filtered"))
http://git-wip-us.apache.org/repos/asf/kafka/blob/fb65ff40/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala b/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala
index e75e23b..ef8b295 100644
--- a/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala
+++ b/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala
@@ -48,7 +48,6 @@ import java.util.concurrent.locks.ReentrantLock
import com.yammer.metrics.core.Gauge
import kafka.utils.CoreUtils.inLock
-import org.apache.kafka.common.internals.TopicConstants
class GroupMetadataManager(val brokerId: Int,
@@ -153,9 +152,9 @@ class GroupMetadataManager(val brokerId: Int,
val tombstone = new Message(bytes = null, key = GroupMetadataManager.groupMetadataKey(group.groupId),
timestamp = timestamp, magicValue = magicValue)
- val partitionOpt = replicaManager.getPartition(TopicConstants.GROUP_METADATA_TOPIC_NAME, groupPartition)
+ val partitionOpt = replicaManager.getPartition(Topic.GroupMetadataTopicName, groupPartition)
partitionOpt.foreach { partition =>
- val appendPartition = TopicAndPartition(TopicConstants.GROUP_METADATA_TOPIC_NAME, groupPartition)
+ val appendPartition = TopicAndPartition(Topic.GroupMetadataTopicName, groupPartition)
trace("Marking group %s as deleted.".format(group.groupId))
@@ -182,7 +181,7 @@ class GroupMetadataManager(val brokerId: Int,
timestamp = timestamp,
magicValue = magicValue)
- val groupMetadataPartition = new TopicPartition(TopicConstants.GROUP_METADATA_TOPIC_NAME, partitionFor(group.groupId))
+ val groupMetadataPartition = new TopicPartition(Topic.GroupMetadataTopicName, partitionFor(group.groupId))
val groupMetadataMessageSet = Map(groupMetadataPartition ->
new ByteBufferMessageSet(config.offsetsTopicCompressionCodec, message))
@@ -268,7 +267,7 @@ class GroupMetadataManager(val brokerId: Int,
)
}.toSeq
- val offsetTopicPartition = new TopicPartition(TopicConstants.GROUP_METADATA_TOPIC_NAME, partitionFor(group.groupId))
+ val offsetTopicPartition = new TopicPartition(Topic.GroupMetadataTopicName, partitionFor(group.groupId))
val offsetsAndMetadataMessageSet = Map(offsetTopicPartition ->
new ByteBufferMessageSet(config.offsetsTopicCompressionCodec, messages:_*))
@@ -378,7 +377,7 @@ class GroupMetadataManager(val brokerId: Int,
*/
def loadGroupsForPartition(offsetsPartition: Int,
onGroupLoaded: GroupMetadata => Unit) {
- val topicPartition = TopicAndPartition(TopicConstants.GROUP_METADATA_TOPIC_NAME, offsetsPartition)
+ val topicPartition = TopicAndPartition(Topic.GroupMetadataTopicName, offsetsPartition)
scheduler.schedule(topicPartition.toString, loadGroupsAndOffsets)
def loadGroupsAndOffsets() {
@@ -520,7 +519,7 @@ class GroupMetadataManager(val brokerId: Int,
*/
def removeGroupsForPartition(offsetsPartition: Int,
onGroupUnloaded: GroupMetadata => Unit) {
- val topicPartition = TopicAndPartition(TopicConstants.GROUP_METADATA_TOPIC_NAME, offsetsPartition)
+ val topicPartition = TopicAndPartition(Topic.GroupMetadataTopicName, offsetsPartition)
scheduler.schedule(topicPartition.toString, removeGroupsAndOffsets)
def removeGroupsAndOffsets() {
@@ -543,10 +542,10 @@ class GroupMetadataManager(val brokerId: Int,
}
if (numOffsetsRemoved > 0) info("Removed %d cached offsets for %s on follower transition."
- .format(numOffsetsRemoved, TopicAndPartition(TopicConstants.GROUP_METADATA_TOPIC_NAME, offsetsPartition)))
+ .format(numOffsetsRemoved, TopicAndPartition(Topic.GroupMetadataTopicName, offsetsPartition)))
if (numGroupsRemoved > 0) info("Removed %d cached groups for %s on follower transition."
- .format(numGroupsRemoved, TopicAndPartition(TopicConstants.GROUP_METADATA_TOPIC_NAME, offsetsPartition)))
+ .format(numGroupsRemoved, TopicAndPartition(Topic.GroupMetadataTopicName, offsetsPartition)))
}
}
@@ -568,9 +567,9 @@ class GroupMetadataManager(val brokerId: Int,
new Message(bytes = null, key = commitKey, timestamp = timestamp, magicValue = magicValue)
}.toBuffer
- val partitionOpt = replicaManager.getPartition(TopicConstants.GROUP_METADATA_TOPIC_NAME, offsetsPartition)
+ val partitionOpt = replicaManager.getPartition(Topic.GroupMetadataTopicName, offsetsPartition)
partitionOpt.foreach { partition =>
- val appendPartition = TopicAndPartition(TopicConstants.GROUP_METADATA_TOPIC_NAME, offsetsPartition)
+ val appendPartition = TopicAndPartition(Topic.GroupMetadataTopicName, offsetsPartition)
trace("Marked %d offsets in %s for deletion.".format(tombstones.size, appendPartition))
try {
@@ -600,7 +599,7 @@ class GroupMetadataManager(val brokerId: Int,
}
private def getHighWatermark(partitionId: Int): Long = {
- val partitionOpt = replicaManager.getPartition(TopicConstants.GROUP_METADATA_TOPIC_NAME, partitionId)
+ val partitionOpt = replicaManager.getPartition(Topic.GroupMetadataTopicName, partitionId)
val hw = partitionOpt.map { partition =>
partition.leaderReplicaIfLocal().map(_.highWatermark.messageOffset).getOrElse(-1L)
@@ -630,7 +629,7 @@ class GroupMetadataManager(val brokerId: Int,
* If the topic does not exist, the configured partition count is returned.
*/
private def getOffsetsTopicPartitionCount = {
- val topic = TopicConstants.GROUP_METADATA_TOPIC_NAME
+ val topic = Topic.GroupMetadataTopicName
val topicData = zkUtils.getPartitionAssignmentForTopics(Seq(topic))
if (topicData(topic).nonEmpty)
topicData(topic).size
@@ -639,7 +638,7 @@ class GroupMetadataManager(val brokerId: Int,
}
private def getMessageFormatVersionAndTimestamp(partition: Int): (Byte, Long) = {
- val groupMetadataTopicAndPartition = new TopicAndPartition(TopicConstants.GROUP_METADATA_TOPIC_NAME, partition)
+ val groupMetadataTopicAndPartition = new TopicAndPartition(Topic.GroupMetadataTopicName, partition)
val messageFormatVersion = replicaManager.getMessageFormatVersion(groupMetadataTopicAndPartition).getOrElse {
throw new IllegalArgumentException(s"Message format version for partition $groupMetadataTopicPartitionCount not found")
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/fb65ff40/core/src/main/scala/kafka/server/KafkaApis.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index 5cadb8b..0c85de0 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -32,7 +32,8 @@ import kafka.log._
import kafka.message.{ByteBufferMessageSet, Message, MessageSet}
import kafka.network._
import kafka.network.RequestChannel.{Response, Session}
-import kafka.security.auth.{Authorizer, ClusterAction, Create, Describe, Group, Operation, Read, Resource, Topic, Write}
+import kafka.security.auth
+import kafka.security.auth.{Authorizer, ClusterAction, Create, Describe, Group, Operation, Read, Resource, Write}
import kafka.utils.{Logging, SystemTime, ZKGroupTopicDirs, ZkUtils}
import org.apache.kafka.common.errors.{ClusterAuthorizationException, InvalidTopicException, NotLeaderForPartitionException, UnknownTopicOrPartitionException, TopicExistsException}
import org.apache.kafka.common.metrics.Metrics
@@ -41,7 +42,6 @@ import org.apache.kafka.common.requests.{ApiVersionsResponse, DescribeGroupsRequ
import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse
import org.apache.kafka.common.utils.Utils
import org.apache.kafka.common.{Node, TopicPartition}
-import org.apache.kafka.common.internals.TopicConstants
import scala.collection._
import scala.collection.JavaConverters._
@@ -131,11 +131,11 @@ class KafkaApis(val requestChannel: RequestChannel,
// this callback is invoked under the replica state change lock to ensure proper order of
// leadership changes
updatedLeaders.foreach { partition =>
- if (partition.topic == TopicConstants.GROUP_METADATA_TOPIC_NAME)
+ if (partition.topic == Topic.GroupMetadataTopicName)
coordinator.handleGroupImmigration(partition.partitionId)
}
updatedFollowers.foreach { partition =>
- if (partition.topic == TopicConstants.GROUP_METADATA_TOPIC_NAME)
+ if (partition.topic == Topic.GroupMetadataTopicName)
coordinator.handleGroupEmigration(partition.partitionId)
}
}
@@ -238,7 +238,7 @@ class KafkaApis(val requestChannel: RequestChannel,
val filteredRequestInfo = offsetCommitRequest.offsetData.asScala.toMap -- invalidRequestsInfo.keys
val (authorizedRequestInfo, unauthorizedRequestInfo) = filteredRequestInfo.partition {
- case (topicPartition, offsetMetadata) => authorize(request.session, Read, new Resource(Topic, topicPartition.topic))
+ case (topicPartition, offsetMetadata) => authorize(request.session, Read, new Resource(auth.Topic, topicPartition.topic))
}
// the callback for sending an offset commit response
@@ -335,7 +335,7 @@ class KafkaApis(val requestChannel: RequestChannel,
val numBytesAppended = request.header.sizeOf + produceRequest.sizeOf
val (authorizedRequestInfo, unauthorizedRequestInfo) = produceRequest.partitionRecords.asScala.partition {
- case (topicPartition, _) => authorize(request.session, Write, new Resource(Topic, topicPartition.topic))
+ case (topicPartition, _) => authorize(request.session, Write, new Resource(auth.Topic, topicPartition.topic))
}
// the callback for sending a produce response
@@ -430,7 +430,7 @@ class KafkaApis(val requestChannel: RequestChannel,
val fetchRequest = request.requestObj.asInstanceOf[FetchRequest]
val (authorizedRequestInfo, unauthorizedRequestInfo) = fetchRequest.requestInfo.partition {
- case (topicAndPartition, _) => authorize(request.session, Read, new Resource(Topic, topicAndPartition.topic))
+ case (topicAndPartition, _) => authorize(request.session, Read, new Resource(auth.Topic, topicAndPartition.topic))
}
val unauthorizedPartitionData = unauthorizedRequestInfo.mapValues { _ =>
@@ -518,7 +518,7 @@ class KafkaApis(val requestChannel: RequestChannel,
val offsetRequest = request.body.asInstanceOf[ListOffsetRequest]
val (authorizedRequestInfo, unauthorizedRequestInfo) = offsetRequest.offsetData.asScala.partition {
- case (topicPartition, _) => authorize(request.session, Describe, new Resource(Topic, topicPartition.topic))
+ case (topicPartition, _) => authorize(request.session, Describe, new Resource(auth.Topic, topicPartition.topic))
}
val unauthorizedResponseStatus = unauthorizedRequestInfo.mapValues(_ =>
@@ -636,14 +636,14 @@ class KafkaApis(val requestChannel: RequestChannel,
AdminUtils.createTopic(zkUtils, topic, numPartitions, replicationFactor, properties, RackAwareMode.Safe)
info("Auto creation of topic %s with %d partitions and replication factor %d is successful"
.format(topic, numPartitions, replicationFactor))
- new MetadataResponse.TopicMetadata(Errors.LEADER_NOT_AVAILABLE, topic, common.Topic.isInternal(topic),
+ new MetadataResponse.TopicMetadata(Errors.LEADER_NOT_AVAILABLE, topic, Topic.isInternal(topic),
java.util.Collections.emptyList())
} catch {
case e: TopicExistsException => // let it go, possibly another broker created this topic
- new MetadataResponse.TopicMetadata(Errors.LEADER_NOT_AVAILABLE, topic, common.Topic.isInternal(topic),
+ new MetadataResponse.TopicMetadata(Errors.LEADER_NOT_AVAILABLE, topic, Topic.isInternal(topic),
java.util.Collections.emptyList())
case itex: InvalidTopicException =>
- new MetadataResponse.TopicMetadata(Errors.INVALID_TOPIC_EXCEPTION, topic, common.Topic.isInternal(topic),
+ new MetadataResponse.TopicMetadata(Errors.INVALID_TOPIC_EXCEPTION, topic, Topic.isInternal(topic),
java.util.Collections.emptyList())
}
}
@@ -655,12 +655,12 @@ class KafkaApis(val requestChannel: RequestChannel,
Math.min(config.offsetsTopicReplicationFactor.toInt, aliveBrokers.length)
else
config.offsetsTopicReplicationFactor.toInt
- createTopic(TopicConstants.GROUP_METADATA_TOPIC_NAME, config.offsetsTopicPartitions,
+ createTopic(Topic.GroupMetadataTopicName, config.offsetsTopicPartitions,
offsetsTopicReplicationFactor, coordinator.offsetsTopicConfigs)
}
private def getOrCreateGroupMetadataTopic(securityProtocol: SecurityProtocol): MetadataResponse.TopicMetadata = {
- val topicMetadata = metadataCache.getTopicMetadata(Set(TopicConstants.GROUP_METADATA_TOPIC_NAME), securityProtocol)
+ val topicMetadata = metadataCache.getTopicMetadata(Set(Topic.GroupMetadataTopicName), securityProtocol)
topicMetadata.headOption.getOrElse(createGroupMetadataTopic())
}
@@ -671,12 +671,12 @@ class KafkaApis(val requestChannel: RequestChannel,
} else {
val nonExistentTopics = topics -- topicResponses.map(_.topic).toSet
val responsesForNonExistentTopics = nonExistentTopics.map { topic =>
- if (topic == TopicConstants.GROUP_METADATA_TOPIC_NAME) {
+ if (topic == Topic.GroupMetadataTopicName) {
createGroupMetadataTopic()
} else if (config.autoCreateTopicsEnable) {
createTopic(topic, config.numPartitions, config.defaultReplicationFactor)
} else {
- new MetadataResponse.TopicMetadata(Errors.UNKNOWN_TOPIC_OR_PARTITION, topic, common.Topic.isInternal(topic),
+ new MetadataResponse.TopicMetadata(Errors.UNKNOWN_TOPIC_OR_PARTITION, topic, Topic.isInternal(topic),
java.util.Collections.emptyList())
}
}
@@ -706,7 +706,7 @@ class KafkaApis(val requestChannel: RequestChannel,
}
var (authorizedTopics, unauthorizedTopics) =
- topics.partition(topic => authorize(request.session, Describe, new Resource(Topic, topic)))
+ topics.partition(topic => authorize(request.session, Describe, new Resource(auth.Topic, topic)))
if (authorizedTopics.nonEmpty) {
val nonExistingTopics = metadataCache.getNonExistingTopics(authorizedTopics)
@@ -721,7 +721,7 @@ class KafkaApis(val requestChannel: RequestChannel,
}
val unauthorizedTopicMetadata = unauthorizedTopics.map(topic =>
- new MetadataResponse.TopicMetadata(Errors.TOPIC_AUTHORIZATION_FAILED, topic, common.Topic.isInternal(topic),
+ new MetadataResponse.TopicMetadata(Errors.TOPIC_AUTHORIZATION_FAILED, topic, Topic.isInternal(topic),
java.util.Collections.emptyList()))
// In version 0, we returned an error when brokers with replicas were unavailable,
@@ -767,7 +767,7 @@ class KafkaApis(val requestChannel: RequestChannel,
new OffsetFetchResponse(results.asJava)
} else {
val (authorizedTopicPartitions, unauthorizedTopicPartitions) = offsetFetchRequest.partitions.asScala.partition { topicPartition =>
- authorize(request.session, Describe, new Resource(Topic, topicPartition.topic))
+ authorize(request.session, Describe, new Resource(auth.Topic, topicPartition.topic))
}
val unauthorizedTopicResponse = new OffsetFetchResponse.PartitionData(OffsetFetchResponse.INVALID_OFFSET, "", Errors.TOPIC_AUTHORIZATION_FAILED.code)
val unauthorizedStatus = unauthorizedTopicPartitions.map(topicPartition => (topicPartition, unauthorizedTopicResponse)).toMap
http://git-wip-us.apache.org/repos/asf/kafka/blob/fb65ff40/core/src/main/scala/kafka/server/ReplicaManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala
index 77df029..2b97783 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -34,7 +34,6 @@ import org.apache.kafka.common.errors.{ControllerMovedException, CorruptRecordEx
RecordBatchTooLargeException, RecordTooLargeException, ReplicaNotAvailableException,
UnknownTopicOrPartitionException}
import org.apache.kafka.common.TopicPartition
-import org.apache.kafka.common.internals.TopicConstants
import org.apache.kafka.common.metrics.Metrics
import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.requests.{LeaderAndIsrRequest, PartitionState, StopReplicaRequest, UpdateMetadataRequest}
@@ -539,7 +538,7 @@ class ReplicaManager(val config: KafkaConfig,
val initialLogEndOffset = localReplica.logEndOffset
val logReadInfo = localReplica.log match {
case Some(log) =>
- val adjustedFetchSize = if (TopicConstants.INTERNAL_TOPICS.contains(topic) && !readOnlyCommitted) Math.max(fetchSize, log.config.maxMessageSize) else fetchSize
+ val adjustedFetchSize = if (Topic.isInternal(topic) && !readOnlyCommitted) Math.max(fetchSize, log.config.maxMessageSize) else fetchSize
log.read(offset, adjustedFetchSize, maxOffsetOpt)
case None =>
error("Leader for partition [%s,%d] does not have a local log".format(topic, partition))
http://git-wip-us.apache.org/repos/asf/kafka/blob/fb65ff40/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
index 8f23c49..f35151f 100644
--- a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
@@ -19,6 +19,7 @@ import java.util.concurrent.ExecutionException
import java.util.{ArrayList, Collections, Properties}
import kafka.cluster.EndPoint
+import kafka.common
import kafka.common.TopicAndPartition
import kafka.integration.KafkaServerTestHarness
import kafka.security.auth._
@@ -38,7 +39,6 @@ import org.junit.{After, Assert, Before, Test}
import scala.collection.JavaConverters._
import scala.collection.mutable
import scala.collection.mutable.Buffer
-import org.apache.kafka.common.internals.TopicConstants
class AuthorizerIntegrationTest extends KafkaServerTestHarness {
val topic = "topic"
@@ -149,7 +149,7 @@ class AuthorizerIntegrationTest extends KafkaServerTestHarness {
consumers += TestUtils.createNewConsumer(TestUtils.getBrokerListStrFromServers(servers), groupId = group, securityProtocol = SecurityProtocol.PLAINTEXT)
// create the consumer offset topic
- TestUtils.createTopic(zkUtils, TopicConstants.GROUP_METADATA_TOPIC_NAME,
+ TestUtils.createTopic(zkUtils, common.Topic.GroupMetadataTopicName,
1,
1,
servers,
http://git-wip-us.apache.org/repos/asf/kafka/blob/fb65ff40/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala b/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala
index ea74d5d..f039750 100644
--- a/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala
+++ b/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala
@@ -19,13 +19,13 @@ import org.apache.kafka.common.record.TimestampType
import org.apache.kafka.common.serialization.ByteArrayDeserializer
import org.apache.kafka.common.{PartitionInfo, TopicPartition}
import kafka.utils.{TestUtils, Logging, ShutdownableThread}
+import kafka.common.Topic
import kafka.server.KafkaConfig
import java.util.ArrayList
import org.junit.Assert._
import org.junit.{Before, Test}
import scala.collection.JavaConverters._
import scala.collection.mutable.Buffer
-import org.apache.kafka.common.internals.TopicConstants
import org.apache.kafka.clients.producer.KafkaProducer
/**
@@ -192,7 +192,7 @@ abstract class BaseConsumerTest extends IntegrationTestHarness with Logging {
// get metadata for the topic
var parts: Seq[PartitionInfo] = null
while (parts == null)
- parts = consumer0.partitionsFor(TopicConstants.GROUP_METADATA_TOPIC_NAME).asScala
+ parts = consumer0.partitionsFor(Topic.GroupMetadataTopicName).asScala
assertEquals(1, parts.size)
assertNotNull(parts.head.leader())
http://git-wip-us.apache.org/repos/asf/kafka/blob/fb65ff40/core/src/test/scala/integration/kafka/api/GroupCoordinatorIntegrationTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/GroupCoordinatorIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/GroupCoordinatorIntegrationTest.scala
index 9183d0f..f36e146 100644
--- a/core/src/test/scala/integration/kafka/api/GroupCoordinatorIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/GroupCoordinatorIntegrationTest.scala
@@ -12,7 +12,7 @@
*/
package integration.kafka.api
-import kafka.common.TopicAndPartition
+import kafka.common.{Topic, TopicAndPartition}
import kafka.integration.KafkaServerTestHarness
import kafka.log.Log
import kafka.message.GZIPCompressionCodec
@@ -20,7 +20,6 @@ import kafka.server.KafkaConfig
import kafka.utils.TestUtils
import org.apache.kafka.clients.consumer.OffsetAndMetadata
import org.apache.kafka.common.TopicPartition
-import org.apache.kafka.common.internals.TopicConstants
import org.apache.kafka.common.protocol.SecurityProtocol
import org.junit.Test
import org.junit.Assert._
@@ -43,13 +42,13 @@ class GroupCoordinatorIntegrationTest extends KafkaServerTestHarness {
val consumer = TestUtils.createNewConsumer(TestUtils.getBrokerListStrFromServers(servers),
securityProtocol = SecurityProtocol.PLAINTEXT)
val offsetMap = Map(
- new TopicPartition(TopicConstants.GROUP_METADATA_TOPIC_NAME, 0) -> new OffsetAndMetadata(10, "")
+ new TopicPartition(Topic.GroupMetadataTopicName, 0) -> new OffsetAndMetadata(10, "")
).asJava
consumer.commitSync(offsetMap)
val logManager = servers.head.getLogManager
def getGroupMetadataLogOpt: Option[Log] =
- logManager.getLog(TopicAndPartition(TopicConstants.GROUP_METADATA_TOPIC_NAME, 0))
+ logManager.getLog(TopicAndPartition(Topic.GroupMetadataTopicName, 0))
TestUtils.waitUntilTrue(() => getGroupMetadataLogOpt.exists(_.logSegments.exists(_.log.nonEmpty)),
"Commit message not appended in time")
http://git-wip-us.apache.org/repos/asf/kafka/blob/fb65ff40/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala b/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala
index 29d3bd6..9595ad6 100644
--- a/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala
+++ b/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala
@@ -21,13 +21,14 @@ import org.apache.kafka.clients.producer.ProducerConfig
import org.apache.kafka.clients.consumer.{ConsumerConfig, KafkaConsumer}
import kafka.utils.TestUtils
import java.util.Properties
+
+import kafka.common.Topic
import org.apache.kafka.clients.producer.KafkaProducer
import kafka.server.KafkaConfig
import kafka.integration.KafkaServerTestHarness
import org.junit.{After, Before}
+
import scala.collection.mutable.Buffer
-import kafka.coordinator.GroupCoordinator
-import org.apache.kafka.common.internals.TopicConstants
/**
* A helper class for writing integration tests that involve producers, consumers, and servers
@@ -77,7 +78,7 @@ trait IntegrationTestHarness extends KafkaServerTestHarness {
}
// create the consumer offset topic
- TestUtils.createTopic(zkUtils, TopicConstants.GROUP_METADATA_TOPIC_NAME,
+ TestUtils.createTopic(zkUtils, Topic.GroupMetadataTopicName,
serverConfig.getProperty(KafkaConfig.OffsetsTopicPartitionsProp).toInt,
serverConfig.getProperty(KafkaConfig.OffsetsTopicReplicationFactorProp).toInt,
servers,
http://git-wip-us.apache.org/repos/asf/kafka/blob/fb65ff40/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala b/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala
index 7a22c73..c8fcba6 100644
--- a/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala
+++ b/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala
@@ -17,8 +17,8 @@
package kafka.api
-import java.util.concurrent.{ExecutionException, TimeUnit, TimeoutException}
-import java.util.{Properties, Random}
+import java.util.concurrent.{ExecutionException, TimeoutException}
+import java.util.{Properties}
import kafka.common.Topic
import kafka.consumer.SimpleConsumer
import kafka.integration.KafkaServerTestHarness
@@ -26,11 +26,9 @@ import kafka.server.KafkaConfig
import kafka.utils.{ShutdownableThread, TestUtils}
import org.apache.kafka.clients.producer._
import org.apache.kafka.clients.producer.internals.ErrorLoggingCallback
-import org.apache.kafka.common.KafkaException
import org.apache.kafka.common.errors.{InvalidTopicException, NotEnoughReplicasAfterAppendException, NotEnoughReplicasException}
import org.junit.Assert._
import org.junit.{After, Before, Test}
-import org.apache.kafka.common.internals.TopicConstants
class ProducerFailureHandlingTest extends KafkaServerTestHarness {
private val producerBufferSize = 30000
@@ -201,7 +199,7 @@ class ProducerFailureHandlingTest extends KafkaServerTestHarness {
@Test
def testCannotSendToInternalTopic() {
val thrown = intercept[ExecutionException] {
- producer2.send(new ProducerRecord[Array[Byte],Array[Byte]](TopicConstants.INTERNAL_TOPICS.iterator.next, "test".getBytes, "test".getBytes)).get
+ producer2.send(new ProducerRecord[Array[Byte],Array[Byte]](Topic.InternalTopics.head, "test".getBytes, "test".getBytes)).get
}
assertTrue("Unexpected exception while sending to an invalid topic " + thrown.getCause, thrown.getCause.isInstanceOf[InvalidTopicException])
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/fb65ff40/core/src/test/scala/unit/kafka/admin/TopicCommandTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/admin/TopicCommandTest.scala b/core/src/test/scala/unit/kafka/admin/TopicCommandTest.scala
index 11dc36e..8ce7c90 100644
--- a/core/src/test/scala/unit/kafka/admin/TopicCommandTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/TopicCommandTest.scala
@@ -18,16 +18,15 @@ package kafka.admin
import org.junit.Assert._
import org.junit.Test
+import kafka.common.Topic
import kafka.utils.Logging
import kafka.utils.TestUtils
import kafka.zk.ZooKeeperTestHarness
import kafka.server.ConfigType
import kafka.admin.TopicCommand.TopicCommandOptions
import kafka.utils.ZkUtils._
-import org.apache.kafka.common.internals.TopicConstants
import org.apache.kafka.common.errors.TopicExistsException
-
class TopicCommandTest extends ZooKeeperTestHarness with Logging with RackAwareTest {
@Test
@@ -87,12 +86,12 @@ class TopicCommandTest extends ZooKeeperTestHarness with Logging with RackAwareT
// create the offset topic
val createOffsetTopicOpts = new TopicCommandOptions(Array("--partitions", numPartitionsOriginal.toString,
"--replication-factor", "1",
- "--topic", TopicConstants.GROUP_METADATA_TOPIC_NAME))
+ "--topic", Topic.GroupMetadataTopicName))
TopicCommand.createTopic(zkUtils, createOffsetTopicOpts)
- // try to delete the TopicConstants.GROUP_METADATA_TOPIC_NAME and make sure it doesn't
- val deleteOffsetTopicOpts = new TopicCommandOptions(Array("--topic", TopicConstants.GROUP_METADATA_TOPIC_NAME))
- val deleteOffsetTopicPath = getDeleteTopicPath(TopicConstants.GROUP_METADATA_TOPIC_NAME)
+ // try to delete the Topic.GroupMetadataTopicName and make sure it doesn't
+ val deleteOffsetTopicOpts = new TopicCommandOptions(Array("--topic", Topic.GroupMetadataTopicName))
+ val deleteOffsetTopicPath = getDeleteTopicPath(Topic.GroupMetadataTopicName)
assertFalse("Delete path for topic shouldn't exist before deletion.", zkUtils.zkClient.exists(deleteOffsetTopicPath))
intercept[AdminOperationException] {
TopicCommand.deleteTopic(zkUtils, deleteOffsetTopicOpts)
http://git-wip-us.apache.org/repos/asf/kafka/blob/fb65ff40/core/src/test/scala/unit/kafka/consumer/TopicFilterTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/consumer/TopicFilterTest.scala b/core/src/test/scala/unit/kafka/consumer/TopicFilterTest.scala
index 0e0a06a..1e073aa 100644
--- a/core/src/test/scala/unit/kafka/consumer/TopicFilterTest.scala
+++ b/core/src/test/scala/unit/kafka/consumer/TopicFilterTest.scala
@@ -21,9 +21,7 @@ package kafka.consumer
import org.junit.Assert._
import org.scalatest.junit.JUnitSuite
import org.junit.Test
-import kafka.coordinator.GroupCoordinator
-import org.apache.kafka.common.internals.TopicConstants
-
+import kafka.common.Topic
class TopicFilterTest extends JUnitSuite {
@@ -38,8 +36,8 @@ class TopicFilterTest extends JUnitSuite {
val topicFilter2 = new Whitelist(".+")
assertTrue(topicFilter2.isTopicAllowed("alltopics", excludeInternalTopics = true))
- assertFalse(topicFilter2.isTopicAllowed(TopicConstants.GROUP_METADATA_TOPIC_NAME, excludeInternalTopics = true))
- assertTrue(topicFilter2.isTopicAllowed(TopicConstants.GROUP_METADATA_TOPIC_NAME, excludeInternalTopics = false))
+ assertFalse(topicFilter2.isTopicAllowed(Topic.GroupMetadataTopicName, excludeInternalTopics = true))
+ assertTrue(topicFilter2.isTopicAllowed(Topic.GroupMetadataTopicName, excludeInternalTopics = false))
val topicFilter3 = new Whitelist("white_listed-topic.+")
assertTrue(topicFilter3.isTopicAllowed("white_listed-topic1", excludeInternalTopics = true))
@@ -58,8 +56,8 @@ class TopicFilterTest extends JUnitSuite {
assertFalse(topicFilter1.isTopicAllowed("black1", excludeInternalTopics = true))
assertFalse(topicFilter1.isTopicAllowed("black1", excludeInternalTopics = false))
- assertFalse(topicFilter1.isTopicAllowed(TopicConstants.GROUP_METADATA_TOPIC_NAME, excludeInternalTopics = true))
- assertTrue(topicFilter1.isTopicAllowed(TopicConstants.GROUP_METADATA_TOPIC_NAME, excludeInternalTopics = false))
+ assertFalse(topicFilter1.isTopicAllowed(Topic.GroupMetadataTopicName, excludeInternalTopics = true))
+ assertTrue(topicFilter1.isTopicAllowed(Topic.GroupMetadataTopicName, excludeInternalTopics = false))
}
@Test
@@ -83,4 +81,4 @@ class TopicFilterTest extends JUnitSuite {
assertEquals("-\\\\u007f-", getTopicCountMapKey("-\\u007f-"))
assertEquals("-\\\\u009f-", getTopicCountMapKey("-\\u009f-"))
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/fb65ff40/core/src/test/scala/unit/kafka/coordinator/GroupCoordinatorResponseTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/coordinator/GroupCoordinatorResponseTest.scala b/core/src/test/scala/unit/kafka/coordinator/GroupCoordinatorResponseTest.scala
index fa13a92..c917ca4 100644
--- a/core/src/test/scala/unit/kafka/coordinator/GroupCoordinatorResponseTest.scala
+++ b/core/src/test/scala/unit/kafka/coordinator/GroupCoordinatorResponseTest.scala
@@ -20,7 +20,7 @@ package kafka.coordinator
import kafka.utils.timer.MockTimer
import org.apache.kafka.common.record.Record
import org.junit.Assert._
-import kafka.common.OffsetAndMetadata
+import kafka.common.{OffsetAndMetadata, Topic}
import kafka.message.{Message, MessageSet}
import kafka.server.{DelayedOperationPurgatory, ReplicaManager, KafkaConfig}
import kafka.utils._
@@ -35,7 +35,6 @@ import java.util.concurrent.TimeUnit
import scala.collection._
import scala.concurrent.duration.Duration
import scala.concurrent.{Await, Future, Promise}
-import org.apache.kafka.common.internals.TopicConstants
/**
* Test GroupCoordinator responses
@@ -80,12 +79,12 @@ class GroupCoordinatorResponseTest extends JUnitSuite {
// make two partitions of the group topic to make sure some partitions are not owned by the coordinator
val ret = mutable.Map[String, Map[Int, Seq[Int]]]()
- ret += (TopicConstants.GROUP_METADATA_TOPIC_NAME -> Map(0 -> Seq(1), 1 -> Seq(1)))
+ ret += (Topic.GroupMetadataTopicName -> Map(0 -> Seq(1), 1 -> Seq(1)))
replicaManager = EasyMock.createNiceMock(classOf[ReplicaManager])
zkUtils = EasyMock.createNiceMock(classOf[ZkUtils])
- EasyMock.expect(zkUtils.getPartitionAssignmentForTopics(Seq(TopicConstants.GROUP_METADATA_TOPIC_NAME))).andReturn(ret)
+ EasyMock.expect(zkUtils.getPartitionAssignmentForTopics(Seq(Topic.GroupMetadataTopicName))).andReturn(ret)
EasyMock.replay(zkUtils)
timer = new MockTimer
@@ -307,7 +306,7 @@ class GroupCoordinatorResponseTest extends JUnitSuite {
assertEquals(Errors.NONE.code, syncGroupErrorCode)
EasyMock.reset(replicaManager)
- EasyMock.expect(replicaManager.getPartition(TopicConstants.GROUP_METADATA_TOPIC_NAME, groupPartitionId)).andReturn(None)
+ EasyMock.expect(replicaManager.getPartition(Topic.GroupMetadataTopicName, groupPartitionId)).andReturn(None)
EasyMock.expect(replicaManager.getMessageFormatVersion(EasyMock.anyObject())).andReturn(Some(Message.MagicValue_V1)).anyTimes()
EasyMock.replay(replicaManager)
@@ -931,7 +930,7 @@ class GroupCoordinatorResponseTest extends JUnitSuite {
EasyMock.anyObject().asInstanceOf[Map[TopicPartition, MessageSet]],
EasyMock.capture(capturedArgument))).andAnswer(new IAnswer[Unit] {
override def answer = capturedArgument.getValue.apply(
- Map(new TopicPartition(TopicConstants.GROUP_METADATA_TOPIC_NAME, groupPartitionId) ->
+ Map(new TopicPartition(Topic.GroupMetadataTopicName, groupPartitionId) ->
new PartitionResponse(Errors.NONE.code, 0L, Record.NO_TIMESTAMP)
)
)})
@@ -1009,7 +1008,7 @@ class GroupCoordinatorResponseTest extends JUnitSuite {
EasyMock.anyObject().asInstanceOf[Map[TopicPartition, MessageSet]],
EasyMock.capture(capturedArgument))).andAnswer(new IAnswer[Unit] {
override def answer = capturedArgument.getValue.apply(
- Map(new TopicPartition(TopicConstants.GROUP_METADATA_TOPIC_NAME, groupPartitionId) ->
+ Map(new TopicPartition(Topic.GroupMetadataTopicName, groupPartitionId) ->
new PartitionResponse(Errors.NONE.code, 0L, Record.NO_TIMESTAMP)
)
)})
@@ -1023,7 +1022,7 @@ class GroupCoordinatorResponseTest extends JUnitSuite {
private def leaveGroup(groupId: String, consumerId: String): LeaveGroupCallbackParams = {
val (responseFuture, responseCallback) = setupHeartbeatCallback
- EasyMock.expect(replicaManager.getPartition(TopicConstants.GROUP_METADATA_TOPIC_NAME, groupPartitionId)).andReturn(None)
+ EasyMock.expect(replicaManager.getPartition(Topic.GroupMetadataTopicName, groupPartitionId)).andReturn(None)
EasyMock.expect(replicaManager.getMessageFormatVersion(EasyMock.anyObject())).andReturn(Some(Message.MagicValue_V1)).anyTimes()
EasyMock.replay(replicaManager)
http://git-wip-us.apache.org/repos/asf/kafka/blob/fb65ff40/core/src/test/scala/unit/kafka/coordinator/GroupMetadataManagerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/coordinator/GroupMetadataManagerTest.scala b/core/src/test/scala/unit/kafka/coordinator/GroupMetadataManagerTest.scala
index 0bd6d71..b9569ca 100644
--- a/core/src/test/scala/unit/kafka/coordinator/GroupMetadataManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/coordinator/GroupMetadataManagerTest.scala
@@ -18,13 +18,12 @@
package kafka.coordinator
import kafka.cluster.Partition
-import kafka.common.OffsetAndMetadata
+import kafka.common.{OffsetAndMetadata, Topic}
import kafka.log.LogAppendInfo
import kafka.message.{ByteBufferMessageSet, Message, MessageSet}
import kafka.server.{KafkaConfig, ReplicaManager}
import kafka.utils.{KafkaScheduler, MockTime, TestUtils, ZkUtils}
import org.apache.kafka.common.TopicPartition
-import org.apache.kafka.common.internals.TopicConstants
import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.record.Record
import org.apache.kafka.common.requests.OffsetFetchResponse
@@ -67,10 +66,10 @@ class GroupMetadataManagerTest {
// make two partitions of the group topic to make sure some partitions are not owned by the coordinator
val ret = mutable.Map[String, Map[Int, Seq[Int]]]()
- ret += (TopicConstants.GROUP_METADATA_TOPIC_NAME -> Map(0 -> Seq(1), 1 -> Seq(1)))
+ ret += (Topic.GroupMetadataTopicName -> Map(0 -> Seq(1), 1 -> Seq(1)))
zkUtils = EasyMock.createNiceMock(classOf[ZkUtils])
- EasyMock.expect(zkUtils.getPartitionAssignmentForTopics(Seq(TopicConstants.GROUP_METADATA_TOPIC_NAME))).andReturn(ret)
+ EasyMock.expect(zkUtils.getPartitionAssignmentForTopics(Seq(Topic.GroupMetadataTopicName))).andReturn(ret)
EasyMock.replay(zkUtils)
time = new MockTime
@@ -236,7 +235,7 @@ class GroupMetadataManagerTest {
topicPartition1 -> OffsetAndMetadata(offset, "", startMs, startMs + 1),
topicPartition2 -> OffsetAndMetadata(offset, "", startMs, startMs + 3))
- EasyMock.expect(replicaManager.getPartition(TopicConstants.GROUP_METADATA_TOPIC_NAME, groupPartitionId)).andStubReturn(Some(partition))
+ EasyMock.expect(replicaManager.getPartition(Topic.GroupMetadataTopicName, groupPartitionId)).andStubReturn(Some(partition))
expectAppendMessage(Errors.NONE)
EasyMock.replay(replicaManager)
@@ -290,7 +289,7 @@ class GroupMetadataManagerTest {
topicPartition1 -> OffsetAndMetadata(offset, "", startMs, startMs + 1),
topicPartition2 -> OffsetAndMetadata(offset, "", startMs, startMs + 3))
- EasyMock.expect(replicaManager.getPartition(TopicConstants.GROUP_METADATA_TOPIC_NAME, groupPartitionId)).andStubReturn(Some(partition))
+ EasyMock.expect(replicaManager.getPartition(Topic.GroupMetadataTopicName, groupPartitionId)).andStubReturn(Some(partition))
expectAppendMessage(Errors.NONE)
EasyMock.replay(replicaManager)
@@ -351,7 +350,7 @@ class GroupMetadataManagerTest {
topicPartition1 -> OffsetAndMetadata(offset, "", startMs, startMs + 1),
topicPartition2 -> OffsetAndMetadata(offset, "", startMs, startMs + 3))
- EasyMock.expect(replicaManager.getPartition(TopicConstants.GROUP_METADATA_TOPIC_NAME, groupPartitionId)).andStubReturn(Some(partition))
+ EasyMock.expect(replicaManager.getPartition(Topic.GroupMetadataTopicName, groupPartitionId)).andStubReturn(Some(partition))
expectAppendMessage(Errors.NONE)
EasyMock.replay(replicaManager)
@@ -396,7 +395,7 @@ class GroupMetadataManagerTest {
EasyMock.anyObject().asInstanceOf[Map[TopicPartition, MessageSet]],
EasyMock.capture(capturedArgument))).andAnswer(new IAnswer[Unit] {
override def answer = capturedArgument.getValue.apply(
- Map(new TopicPartition(TopicConstants.GROUP_METADATA_TOPIC_NAME, groupPartitionId) ->
+ Map(new TopicPartition(Topic.GroupMetadataTopicName, groupPartitionId) ->
new PartitionResponse(error.code, 0L, Record.NO_TIMESTAMP)
)
)})
http://git-wip-us.apache.org/repos/asf/kafka/blob/fb65ff40/core/src/test/scala/unit/kafka/server/MetadataRequestTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/MetadataRequestTest.scala b/core/src/test/scala/unit/kafka/server/MetadataRequestTest.scala
index 55eb6f8..46a79de 100644
--- a/core/src/test/scala/unit/kafka/server/MetadataRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/MetadataRequestTest.scala
@@ -19,8 +19,8 @@ package kafka.server
import java.util.Properties
+import kafka.common.Topic
import kafka.utils.TestUtils
-import org.apache.kafka.common.internals.TopicConstants
import org.apache.kafka.common.protocol.{ApiKeys, Errors}
import org.apache.kafka.common.requests.{MetadataRequest, MetadataResponse}
import org.junit.Assert._
@@ -67,7 +67,7 @@ class MetadataRequestTest extends BaseRequestTest {
@Test
def testIsInternal() {
- val internalTopic = TopicConstants.GROUP_METADATA_TOPIC_NAME
+ val internalTopic = Topic.GroupMetadataTopicName
val notInternalTopic = "notInternal"
// create the topics
TestUtils.createTopic(zkUtils, internalTopic, 3, 2, servers)
@@ -82,6 +82,8 @@ class MetadataRequestTest extends BaseRequestTest {
assertTrue("internalTopic should show isInternal", internalTopicMetadata.isInternal)
assertFalse("notInternalTopic topic not should show isInternal", notInternalTopicMetadata.isInternal)
+
+ assertEquals(Set(internalTopic).asJava, metadataResponse.cluster.internalTopics)
}
@Test
http://git-wip-us.apache.org/repos/asf/kafka/blob/fb65ff40/streams/src/test/java/org/apache/kafka/streams/kstream/internals/WindowedStreamPartitionerTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/WindowedStreamPartitionerTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/WindowedStreamPartitionerTest.java
index b31b20d..ba6289c 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/WindowedStreamPartitionerTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/WindowedStreamPartitionerTest.java
@@ -49,7 +49,8 @@ public class WindowedStreamPartitionerTest {
new PartitionInfo(topicName, 5, Node.noNode(), new Node[0], new Node[0])
);
- private Cluster cluster = new Cluster(Collections.singletonList(Node.noNode()), infos, Collections.<String>emptySet());
+ private Cluster cluster = new Cluster(Collections.singletonList(Node.noNode()), infos, Collections.<String>emptySet(),
+ Collections.<String>emptySet());
@Test
public void testCopartitioning() {
http://git-wip-us.apache.org/repos/asf/kafka/blob/fb65ff40/streams/src/test/java/org/apache/kafka/streams/processor/DefaultPartitionGrouperTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/DefaultPartitionGrouperTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/DefaultPartitionGrouperTest.java
index c11d0c1..9683da9 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/DefaultPartitionGrouperTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/DefaultPartitionGrouperTest.java
@@ -44,7 +44,8 @@ public class DefaultPartitionGrouperTest {
new PartitionInfo("topic2", 1, Node.noNode(), new Node[0], new Node[0])
);
- private Cluster metadata = new Cluster(Collections.singletonList(Node.noNode()), infos, Collections.<String>emptySet());
+ private Cluster metadata = new Cluster(Collections.singletonList(Node.noNode()), infos, Collections.<String>emptySet(),
+ Collections.<String>emptySet());
@Test
public void testGrouping() {
http://git-wip-us.apache.org/repos/asf/kafka/blob/fb65ff40/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java
index 32f2598..b1a4a02 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java
@@ -47,7 +47,8 @@ public class RecordCollectorTest {
new PartitionInfo("topic1", 2, Node.noNode(), new Node[0], new Node[0])
);
- private Cluster cluster = new Cluster(Collections.singletonList(Node.noNode()), infos, Collections.<String>emptySet());
+ private Cluster cluster = new Cluster(Collections.singletonList(Node.noNode()), infos,
+ Collections.<String>emptySet(), Collections.<String>emptySet());
private final ByteArraySerializer byteArraySerializer = new ByteArraySerializer();
http://git-wip-us.apache.org/repos/asf/kafka/blob/fb65ff40/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java
index 4f7037c..21de73a 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java
@@ -78,7 +78,8 @@ public class StreamPartitionAssignorTest {
new PartitionInfo("topic3", 3, Node.noNode(), new Node[0], new Node[0])
);
- private Cluster metadata = new Cluster(Arrays.asList(Node.noNode()), infos, Collections.<String>emptySet());
+ private Cluster metadata = new Cluster(Arrays.asList(Node.noNode()), infos, Collections.<String>emptySet(),
+ Collections.<String>emptySet());
private final TaskId task0 = new TaskId(0, 0);
private final TaskId task1 = new TaskId(0, 1);
http://git-wip-us.apache.org/repos/asf/kafka/blob/fb65ff40/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
index cb3dee0..d1aaa07 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
@@ -81,7 +81,8 @@ public class StreamThreadTest {
new PartitionInfo("topic3", 2, Node.noNode(), new Node[0], new Node[0])
);
- private Cluster metadata = new Cluster(Arrays.asList(Node.noNode()), infos, Collections.<String>emptySet());
+ private Cluster metadata = new Cluster(Arrays.asList(Node.noNode()), infos, Collections.<String>emptySet(),
+ Collections.<String>emptySet());
private final PartitionAssignor.Subscription subscription =
new PartitionAssignor.Subscription(Arrays.asList("topic1", "topic2", "topic3"), subscriptionUserData());