You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ew...@apache.org on 2016/03/10 23:54:58 UTC
kafka git commit: MINOR: Add unit test for internal topics
Repository: kafka
Updated Branches:
refs/heads/trunk 04ef9c354 -> 9c4c5ae1c
MINOR: Add unit test for internal topics
Author: Guozhang Wang <wa...@gmail.com>
Reviewers: Yasuhiro Matsuda <ya...@confluent.io>, Ewen Cheslack-Postava <ew...@confluent.io>
Closes #1047 from guozhangwang/KInternal
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/9c4c5ae1
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/9c4c5ae1
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/9c4c5ae1
Branch: refs/heads/trunk
Commit: 9c4c5ae1cd15aa0afe3156e572362fbb40130573
Parents: 04ef9c3
Author: Guozhang Wang <wa...@gmail.com>
Authored: Thu Mar 10 14:54:47 2016 -0800
Committer: Ewen Cheslack-Postava <me...@ewencp.org>
Committed: Thu Mar 10 14:54:47 2016 -0800
----------------------------------------------------------------------
.../internals/InternalTopicManager.java | 13 ++--
.../internals/StreamPartitionAssignor.java | 6 +-
.../internals/StreamPartitionAssignorTest.java | 66 ++++++++++++++++++++
3 files changed, 80 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/9c4c5ae1/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java
index ce95bb0..3725c4c 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java
@@ -73,6 +73,11 @@ public class InternalTopicManager {
}
}
+ public InternalTopicManager() {
+ this.zkClient = null;
+ this.replicationFactor = 0;
+ }
+
public InternalTopicManager(String zkConnect, int replicationFactor) {
this.zkClient = new ZkClient(zkConnect, 30 * 1000, 30 * 1000, new ZKStringSerializer());
this.replicationFactor = replicationFactor;
@@ -125,7 +130,7 @@ public class InternalTopicManager {
}
@SuppressWarnings("unchecked")
- public Map<Integer, List<Integer>> getTopicMetadata(String topic) {
+ private Map<Integer, List<Integer>> getTopicMetadata(String topic) {
String data = zkClient.readData(ZK_TOPIC_PATH + "/" + topic, true);
if (data == null) return null;
@@ -147,7 +152,7 @@ public class InternalTopicManager {
}
}
- public void createTopic(String topic, int numPartitions, int replicationFactor) throws ZkNodeExistsException {
+ private void createTopic(String topic, int numPartitions, int replicationFactor) throws ZkNodeExistsException {
log.debug("Creating topic {} with {} partitions from ZK in partition assignor.", topic, numPartitions);
List<Integer> brokers = getBrokers();
@@ -183,13 +188,13 @@ public class InternalTopicManager {
}
}
- public void deleteTopic(String topic) throws ZkNodeExistsException {
+ private void deleteTopic(String topic) throws ZkNodeExistsException {
log.debug("Deleting topic {} from ZK in partition assignor.", topic);
zkClient.createPersistent(ZK_DELETE_TOPIC_PATH + "/" + topic, "", ZooDefs.Ids.OPEN_ACL_UNSAFE);
}
- public void addPartitions(String topic, int numPartitions, int replicationFactor, Map<Integer, List<Integer>> existingAssignment) {
+ private void addPartitions(String topic, int numPartitions, int replicationFactor, Map<Integer, List<Integer>> existingAssignment) {
log.debug("Adding {} partitions topic {} from ZK with existing partitions assigned as {} in partition assignor.", topic, numPartitions, existingAssignment);
List<Integer> brokers = getBrokers();
http://git-wip-us.apache.org/repos/asf/kafka/blob/9c4c5ae1/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
index 1b3bf10..13f269b 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
@@ -231,7 +231,7 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable
log.debug("Starting to validate internal source topics in partition assignor.");
for (Map.Entry<String, Set<TaskId>> entry : internalSourceTopicToTaskIds.entrySet()) {
- String topic = streamThread.jobId + "-" + entry.getKey();
+ String topic = entry.getKey();
// should have size 1 only
int numPartitions = -1;
@@ -455,4 +455,8 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable
public Map<TaskId, Set<TopicPartition>> standbyTasks() {
return standbyTasks;
}
+
+ public void setInternalTopicManager(InternalTopicManager internalTopicManager) {
+ this.internalTopicManager = internalTopicManager;
+ }
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/9c4c5ae1/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java
index 15b114a..9ff0af0 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java
@@ -506,4 +506,70 @@ public class StreamPartitionAssignorTest {
assertEquals(standbyTasks, partitionAssignor.standbyTasks());
}
+ @Test
+ public void testAssignWithInternalTopics() throws Exception {
+ StreamsConfig config = new StreamsConfig(configProps());
+
+ MockProducer<byte[], byte[]> producer = new MockProducer<>(true, serializer, serializer);
+ MockConsumer<byte[], byte[]> consumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST);
+ MockConsumer<byte[], byte[]> mockRestoreConsumer = new MockConsumer<>(OffsetResetStrategy.LATEST);
+
+ TopologyBuilder builder = new TopologyBuilder();
+ builder.addInternalTopic("topicX");
+ builder.addSource("source1", "topic1");
+ builder.addProcessor("processor1", new MockProcessorSupplier(), "source1");
+ builder.addSink("sink1", "topicX", "processor1");
+ builder.addSource("source2", "topicX");
+ builder.addProcessor("processor2", new MockProcessorSupplier(), "source2");
+ List<String> topics = Utils.mkList("topic1", "topicX");
+ Set<TaskId> allTasks = Utils.mkSet(task0, task1, task2);
+
+ UUID uuid1 = UUID.randomUUID();
+ UUID uuid2 = UUID.randomUUID();
+ String client1 = "client1";
+
+ StreamThread thread10 = new StreamThread(builder, config, producer, consumer, mockRestoreConsumer, "test", client1, uuid1, new Metrics(), new SystemTime());
+
+ StreamPartitionAssignor partitionAssignor = new StreamPartitionAssignor();
+ partitionAssignor.configure(config.getConsumerConfigs(thread10, "test", client1));
+ MockInternalTopicManager internalTopicManager = new MockInternalTopicManager(mockRestoreConsumer);
+ partitionAssignor.setInternalTopicManager(internalTopicManager);
+
+ Map<String, PartitionAssignor.Subscription> subscriptions = new HashMap<>();
+ Set<TaskId> emptyTasks = Collections.<TaskId>emptySet();
+ subscriptions.put("consumer10",
+ new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid1, emptyTasks, emptyTasks).encode()));
+
+ Map<String, PartitionAssignor.Assignment> assignments = partitionAssignor.assign(metadata, subscriptions);
+
+ // check prepared internal topics
+ // TODO: we need to change it to 1 after fixing the prefix
+ assertEquals(2, internalTopicManager.readyTopics.size());
+ assertEquals(allTasks.size(), (long) internalTopicManager.readyTopics.get("topicX"));
+ assertEquals(allTasks.size(), (long) internalTopicManager.readyTopics.get("test-topicX"));
+ }
+
+ private class MockInternalTopicManager extends InternalTopicManager {
+
+ public Map<String, Integer> readyTopics = new HashMap<>();
+ public MockConsumer<byte[], byte[]> restoreConsumer;
+
+ public MockInternalTopicManager(MockConsumer<byte[], byte[]> restoreConsumer) {
+ super();
+
+ this.restoreConsumer = restoreConsumer;
+ }
+
+ @Override
+ public void makeReady(String topic, int numPartitions) {
+ readyTopics.put(topic, numPartitions);
+
+ List<PartitionInfo> partitions = new ArrayList<>();
+ for (int i = 0; i < numPartitions; i++) {
+ partitions.add(new PartitionInfo(topic, i, null, null, null));
+ }
+
+ restoreConsumer.updatePartitions(topic, partitions);
+ }
+ }
}