You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ew...@apache.org on 2016/03/10 23:54:58 UTC

kafka git commit: MINOR: Add unit test for internal topics

Repository: kafka
Updated Branches:
  refs/heads/trunk 04ef9c354 -> 9c4c5ae1c


MINOR: Add unit test for internal topics

Author: Guozhang Wang <wa...@gmail.com>

Reviewers: Yasuhiro Matsuda <ya...@confluent.io>, Ewen Cheslack-Postava <ew...@confluent.io>

Closes #1047 from guozhangwang/KInternal


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/9c4c5ae1
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/9c4c5ae1
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/9c4c5ae1

Branch: refs/heads/trunk
Commit: 9c4c5ae1cd15aa0afe3156e572362fbb40130573
Parents: 04ef9c3
Author: Guozhang Wang <wa...@gmail.com>
Authored: Thu Mar 10 14:54:47 2016 -0800
Committer: Ewen Cheslack-Postava <me...@ewencp.org>
Committed: Thu Mar 10 14:54:47 2016 -0800

----------------------------------------------------------------------
 .../internals/InternalTopicManager.java         | 13 ++--
 .../internals/StreamPartitionAssignor.java      |  6 +-
 .../internals/StreamPartitionAssignorTest.java  | 66 ++++++++++++++++++++
 3 files changed, 80 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/9c4c5ae1/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java
index ce95bb0..3725c4c 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java
@@ -73,6 +73,11 @@ public class InternalTopicManager {
         }
     }
 
+    public InternalTopicManager() {
+        this.zkClient = null;
+        this.replicationFactor = 0;
+    }
+
     public InternalTopicManager(String zkConnect, int replicationFactor) {
         this.zkClient = new ZkClient(zkConnect, 30 * 1000, 30 * 1000, new ZKStringSerializer());
         this.replicationFactor = replicationFactor;
@@ -125,7 +130,7 @@ public class InternalTopicManager {
     }
 
     @SuppressWarnings("unchecked")
-    public Map<Integer, List<Integer>> getTopicMetadata(String topic) {
+    private Map<Integer, List<Integer>> getTopicMetadata(String topic) {
         String data = zkClient.readData(ZK_TOPIC_PATH + "/" + topic, true);
 
         if (data == null) return null;
@@ -147,7 +152,7 @@ public class InternalTopicManager {
         }
     }
 
-    public void createTopic(String topic, int numPartitions, int replicationFactor) throws ZkNodeExistsException {
+    private void createTopic(String topic, int numPartitions, int replicationFactor) throws ZkNodeExistsException {
         log.debug("Creating topic {} with {} partitions from ZK in partition assignor.", topic, numPartitions);
 
         List<Integer> brokers = getBrokers();
@@ -183,13 +188,13 @@ public class InternalTopicManager {
         }
     }
 
-    public void deleteTopic(String topic) throws ZkNodeExistsException {
+    private void deleteTopic(String topic) throws ZkNodeExistsException {
         log.debug("Deleting topic {} from ZK in partition assignor.", topic);
 
         zkClient.createPersistent(ZK_DELETE_TOPIC_PATH + "/" + topic, "", ZooDefs.Ids.OPEN_ACL_UNSAFE);
     }
 
-    public void addPartitions(String topic, int numPartitions, int replicationFactor, Map<Integer, List<Integer>> existingAssignment) {
+    private void addPartitions(String topic, int numPartitions, int replicationFactor, Map<Integer, List<Integer>> existingAssignment) {
         log.debug("Adding {} partitions topic {} from ZK with existing partitions assigned as {} in partition assignor.", topic, numPartitions, existingAssignment);
 
         List<Integer> brokers = getBrokers();

http://git-wip-us.apache.org/repos/asf/kafka/blob/9c4c5ae1/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
index 1b3bf10..13f269b 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
@@ -231,7 +231,7 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable
             log.debug("Starting to validate internal source topics in partition assignor.");
 
             for (Map.Entry<String, Set<TaskId>> entry : internalSourceTopicToTaskIds.entrySet()) {
-                String topic = streamThread.jobId + "-" + entry.getKey();
+                String topic = entry.getKey();
 
                 // should have size 1 only
                 int numPartitions = -1;
@@ -455,4 +455,8 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable
     public Map<TaskId, Set<TopicPartition>> standbyTasks() {
         return standbyTasks;
     }
+
+    public void setInternalTopicManager(InternalTopicManager internalTopicManager) {
+        this.internalTopicManager = internalTopicManager;
+    }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/9c4c5ae1/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java
index 15b114a..9ff0af0 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java
@@ -506,4 +506,70 @@ public class StreamPartitionAssignorTest {
         assertEquals(standbyTasks, partitionAssignor.standbyTasks());
     }
 
+    @Test
+    public void testAssignWithInternalTopics() throws Exception {
+        StreamsConfig config = new StreamsConfig(configProps());
+
+        MockProducer<byte[], byte[]> producer = new MockProducer<>(true, serializer, serializer);
+        MockConsumer<byte[], byte[]> consumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST);
+        MockConsumer<byte[], byte[]> mockRestoreConsumer = new MockConsumer<>(OffsetResetStrategy.LATEST);
+
+        TopologyBuilder builder = new TopologyBuilder();
+        builder.addInternalTopic("topicX");
+        builder.addSource("source1", "topic1");
+        builder.addProcessor("processor1", new MockProcessorSupplier(), "source1");
+        builder.addSink("sink1", "topicX", "processor1");
+        builder.addSource("source2", "topicX");
+        builder.addProcessor("processor2", new MockProcessorSupplier(), "source2");
+        List<String> topics = Utils.mkList("topic1", "topicX");
+        Set<TaskId> allTasks = Utils.mkSet(task0, task1, task2);
+
+        UUID uuid1 = UUID.randomUUID();
+        UUID uuid2 = UUID.randomUUID();
+        String client1 = "client1";
+
+        StreamThread thread10 = new StreamThread(builder, config, producer, consumer, mockRestoreConsumer, "test", client1, uuid1, new Metrics(), new SystemTime());
+
+        StreamPartitionAssignor partitionAssignor = new StreamPartitionAssignor();
+        partitionAssignor.configure(config.getConsumerConfigs(thread10, "test", client1));
+        MockInternalTopicManager internalTopicManager = new MockInternalTopicManager(mockRestoreConsumer);
+        partitionAssignor.setInternalTopicManager(internalTopicManager);
+
+        Map<String, PartitionAssignor.Subscription> subscriptions = new HashMap<>();
+        Set<TaskId> emptyTasks = Collections.<TaskId>emptySet();
+        subscriptions.put("consumer10",
+                new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid1, emptyTasks, emptyTasks).encode()));
+
+        Map<String, PartitionAssignor.Assignment> assignments = partitionAssignor.assign(metadata, subscriptions);
+
+        // check prepared internal topics
+        // TODO: we need to change it to 1 after fixing the prefix
+        assertEquals(2, internalTopicManager.readyTopics.size());
+        assertEquals(allTasks.size(), (long) internalTopicManager.readyTopics.get("topicX"));
+        assertEquals(allTasks.size(), (long) internalTopicManager.readyTopics.get("test-topicX"));
+    }
+
+    private class MockInternalTopicManager extends InternalTopicManager {
+
+        public Map<String, Integer> readyTopics = new HashMap<>();
+        public MockConsumer<byte[], byte[]> restoreConsumer;
+
+        public MockInternalTopicManager(MockConsumer<byte[], byte[]> restoreConsumer) {
+            super();
+
+            this.restoreConsumer = restoreConsumer;
+        }
+
+        @Override
+        public void makeReady(String topic, int numPartitions) {
+            readyTopics.put(topic, numPartitions);
+
+            List<PartitionInfo> partitions = new ArrayList<>();
+            for (int i = 0; i < numPartitions; i++) {
+                partitions.add(new PartitionInfo(topic, i, null, null, null));
+            }
+
+            restoreConsumer.updatePartitions(topic, partitions);
+        }
+    }
 }