You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by rs...@apache.org on 2018/04/29 14:54:43 UTC

[kafka] branch trunk updated: MINOR: Fix Trogdor tests, partition assignments (#4892)

This is an automated email from the ASF dual-hosted git repository.

rsivaram pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 8577632  MINOR: Fix Trogdor tests, partition assignments (#4892)
8577632 is described below

commit 8577632b3a7dd3dd5ce9cf2562627983bbabba20
Author: Colin Patrick McCabe <co...@cmccabe.xyz>
AuthorDate: Sun Apr 29 07:54:38 2018 -0700

    MINOR: Fix Trogdor tests, partition assignments (#4892)
---
 .../services/trogdor/produce_bench_workload.py       |  4 ++--
 .../services/trogdor/round_trip_workload.py          |  4 ++--
 tests/kafkatest/tests/core/produce_bench_test.py     |  6 ++++--
 tests/kafkatest/tests/core/round_trip_fault_test.py  |  9 +++++++--
 .../kafka/trogdor/workload/PartitionsSpec.java       | 20 +++++++++++++++++---
 .../kafka/trogdor/common/JsonSerializationTest.java  |  9 +++++++++
 6 files changed, 41 insertions(+), 11 deletions(-)

diff --git a/tests/kafkatest/services/trogdor/produce_bench_workload.py b/tests/kafkatest/services/trogdor/produce_bench_workload.py
index bce28c0..7eac4ee 100644
--- a/tests/kafkatest/services/trogdor/produce_bench_workload.py
+++ b/tests/kafkatest/services/trogdor/produce_bench_workload.py
@@ -21,7 +21,7 @@ from kafkatest.services.trogdor.task_spec import TaskSpec
 class ProduceBenchWorkloadSpec(TaskSpec):
     def __init__(self, start_ms, duration_ms, producer_node, bootstrap_servers,
                  target_messages_per_sec, max_messages, producer_conf,
-                 total_topics, active_topics):
+                 inactive_topics, active_topics):
         super(ProduceBenchWorkloadSpec, self).__init__(start_ms, duration_ms)
         self.message["class"] = "org.apache.kafka.trogdor.workload.ProduceBenchSpec"
         self.message["producerNode"] = producer_node
@@ -29,7 +29,7 @@ class ProduceBenchWorkloadSpec(TaskSpec):
         self.message["targetMessagesPerSec"] = target_messages_per_sec
         self.message["maxMessages"] = max_messages
         self.message["producerConf"] = producer_conf
-        self.message["totalTopics"] = total_topics
+        self.message["inactiveTopics"] = inactive_topics
         self.message["activeTopics"] = active_topics
 
 
diff --git a/tests/kafkatest/services/trogdor/round_trip_workload.py b/tests/kafkatest/services/trogdor/round_trip_workload.py
index 588bba8..86bc2d2 100644
--- a/tests/kafkatest/services/trogdor/round_trip_workload.py
+++ b/tests/kafkatest/services/trogdor/round_trip_workload.py
@@ -20,14 +20,14 @@ from kafkatest.services.trogdor.task_spec import TaskSpec
 
 class RoundTripWorkloadSpec(TaskSpec):
     def __init__(self, start_ms, duration_ms, client_node, bootstrap_servers,
-                 target_messages_per_sec, partition_assignments, max_messages):
+                 target_messages_per_sec, max_messages, active_topics):
         super(RoundTripWorkloadSpec, self).__init__(start_ms, duration_ms)
         self.message["class"] = "org.apache.kafka.trogdor.workload.RoundTripWorkloadSpec"
         self.message["clientNode"] = client_node
         self.message["bootstrapServers"] = bootstrap_servers
         self.message["targetMessagesPerSec"] = target_messages_per_sec
-        self.message["partitionAssignments"] = partition_assignments
         self.message["maxMessages"] = max_messages
+        self.message["activeTopics"] = active_topics
 
 
 class RoundTripWorkloadService(Service):
diff --git a/tests/kafkatest/tests/core/produce_bench_test.py b/tests/kafkatest/tests/core/produce_bench_test.py
index 99df666..6a1724d 100644
--- a/tests/kafkatest/tests/core/produce_bench_test.py
+++ b/tests/kafkatest/tests/core/produce_bench_test.py
@@ -43,14 +43,16 @@ class ProduceBenchTest(Test):
         self.zk.stop()
 
     def test_produce_bench(self):
+        active_topics={"produce_bench_topic[0-1]":{"numPartitions":1, "replicationFactor":3}}
+        inactive_topics={"produce_bench_topic[2-9]":{"numPartitions":1, "replicationFactor":3}}
         spec = ProduceBenchWorkloadSpec(0, TaskSpec.MAX_DURATION_MS,
                                         self.workload_service.producer_node,
                                         self.workload_service.bootstrap_servers,
                                         target_messages_per_sec=1000,
                                         max_messages=100000,
                                         producer_conf={},
-                                        total_topics=10,
-                                        active_topics=2)
+                                        inactive_topics=inactive_topics,
+                                        active_topics=active_topics)
         workload1 = self.trogdor.create_task("workload1", spec)
         workload1.wait_for_done(timeout_sec=360)
         tasks = self.trogdor.tasks()
diff --git a/tests/kafkatest/tests/core/round_trip_fault_test.py b/tests/kafkatest/tests/core/round_trip_fault_test.py
index f03d6a1..8bee6a1 100644
--- a/tests/kafkatest/tests/core/round_trip_fault_test.py
+++ b/tests/kafkatest/tests/core/round_trip_fault_test.py
@@ -25,6 +25,8 @@ from kafkatest.services.zookeeper import ZookeeperService
 
 
 class RoundTripFaultTest(Test):
+    topic_name_index = 0
+
     def __init__(self, test_context):
         """:type test_context: ducktape.tests.test.TestContext"""
         super(RoundTripFaultTest, self).__init__(test_context)
@@ -33,12 +35,15 @@ class RoundTripFaultTest(Test):
         self.workload_service = RoundTripWorkloadService(test_context, self.kafka)
         self.trogdor = TrogdorService(context=self.test_context,
                                       client_services=[self.zk, self.kafka, self.workload_service])
+        topic_name = "round_trip_topic%d" % RoundTripFaultTest.topic_name_index
+        RoundTripFaultTest.topic_name_index = RoundTripFaultTest.topic_name_index + 1
+        active_topics={topic_name : {"partitionAssignments":{"0": [0,1,2]}}}
         self.round_trip_spec = RoundTripWorkloadSpec(0, TaskSpec.MAX_DURATION_MS,
                                      self.workload_service.client_node,
                                      self.workload_service.bootstrap_servers,
                                      target_messages_per_sec=10000,
-                                     partition_assignments={0: [0,1,2]},
-                                     max_messages=100000)
+                                     max_messages=100000,
+                                     active_topics=active_topics)
 
     def setUp(self):
         self.zk.start()
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/workload/PartitionsSpec.java b/tools/src/main/java/org/apache/kafka/trogdor/workload/PartitionsSpec.java
index 75f85c4..a6ebb21 100644
--- a/tools/src/main/java/org/apache/kafka/trogdor/workload/PartitionsSpec.java
+++ b/tools/src/main/java/org/apache/kafka/trogdor/workload/PartitionsSpec.java
@@ -23,9 +23,11 @@ import org.apache.kafka.clients.admin.NewTopic;
 import org.apache.kafka.trogdor.rest.Message;
 
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Map.Entry;
 
 /**
  * Describes some partitions.
@@ -44,8 +46,20 @@ public class PartitionsSpec extends Message {
             @JsonProperty("partitionAssignments") Map<Integer, List<Integer>> partitionAssignments) {
         this.numPartitions = numPartitions;
         this.replicationFactor = replicationFactor;
-        this.partitionAssignments = partitionAssignments == null ?
-            new HashMap<Integer, List<Integer>>() : partitionAssignments;
+        HashMap<Integer, List<Integer>> partMap = new HashMap<>();
+        if (partitionAssignments != null) {
+            for (Entry<Integer, List<Integer>> entry : partitionAssignments.entrySet()) {
+                int partition = entry.getKey() == null ? 0 : entry.getKey();
+                ArrayList<Integer> assignments = new ArrayList<>();
+                if (entry.getValue() != null) {
+                    for (Integer brokerId : entry.getValue()) {
+                        assignments.add(brokerId == null ? Integer.valueOf(0) : brokerId);
+                    }
+                }
+                partMap.put(partition, Collections.unmodifiableList(assignments));
+            }
+        }
+        this.partitionAssignments = Collections.unmodifiableMap(partMap);
     }
 
     @JsonProperty
@@ -72,7 +86,7 @@ public class PartitionsSpec extends Message {
     }
 
     @JsonProperty
-    public Map<Integer, List<Integer>> partitionAssignmentsap() {
+    public Map<Integer, List<Integer>> partitionAssignments() {
         return partitionAssignments;
     }
 
diff --git a/tools/src/test/java/org/apache/kafka/trogdor/common/JsonSerializationTest.java b/tools/src/test/java/org/apache/kafka/trogdor/common/JsonSerializationTest.java
index 55ecb1a..ea1eda6 100644
--- a/tools/src/test/java/org/apache/kafka/trogdor/common/JsonSerializationTest.java
+++ b/tools/src/test/java/org/apache/kafka/trogdor/common/JsonSerializationTest.java
@@ -33,6 +33,10 @@ import org.apache.kafka.trogdor.workload.TopicsSpec;
 import org.junit.Test;
 
 import java.lang.reflect.Field;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
 
 import static org.junit.Assert.assertNotNull;
 
@@ -55,6 +59,11 @@ public class JsonSerializationTest {
             0, null, null, 0));
         verify(new TopicsSpec());
         verify(new PartitionsSpec(0, (short) 0, null));
+        Map<Integer, List<Integer>> partitionAssignments = new HashMap<Integer, List<Integer>>();
+        partitionAssignments.put(0, Arrays.asList(1, 2, 3));
+        partitionAssignments.put(1, Arrays.asList(1, 2, 3));
+        verify(new PartitionsSpec(0, (short) 0, partitionAssignments));
+        verify(new PartitionsSpec(0, (short) 0, null));
     }
 
     private <T> void verify(T val1) throws Exception {

-- 
To stop receiving notification emails like this one, please contact
rsivaram@apache.org.