You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by rs...@apache.org on 2018/04/29 14:54:43 UTC
[kafka] branch trunk updated: MINOR: Fix Trogdor tests,
partition assignments (#4892)
This is an automated email from the ASF dual-hosted git repository.
rsivaram pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 8577632 MINOR: Fix Trogdor tests, partition assignments (#4892)
8577632 is described below
commit 8577632b3a7dd3dd5ce9cf2562627983bbabba20
Author: Colin Patrick McCabe <co...@cmccabe.xyz>
AuthorDate: Sun Apr 29 07:54:38 2018 -0700
MINOR: Fix Trogdor tests, partition assignments (#4892)
---
.../services/trogdor/produce_bench_workload.py | 4 ++--
.../services/trogdor/round_trip_workload.py | 4 ++--
tests/kafkatest/tests/core/produce_bench_test.py | 6 ++++--
tests/kafkatest/tests/core/round_trip_fault_test.py | 9 +++++++--
.../kafka/trogdor/workload/PartitionsSpec.java | 20 +++++++++++++++++---
.../kafka/trogdor/common/JsonSerializationTest.java | 9 +++++++++
6 files changed, 41 insertions(+), 11 deletions(-)
diff --git a/tests/kafkatest/services/trogdor/produce_bench_workload.py b/tests/kafkatest/services/trogdor/produce_bench_workload.py
index bce28c0..7eac4ee 100644
--- a/tests/kafkatest/services/trogdor/produce_bench_workload.py
+++ b/tests/kafkatest/services/trogdor/produce_bench_workload.py
@@ -21,7 +21,7 @@ from kafkatest.services.trogdor.task_spec import TaskSpec
class ProduceBenchWorkloadSpec(TaskSpec):
def __init__(self, start_ms, duration_ms, producer_node, bootstrap_servers,
target_messages_per_sec, max_messages, producer_conf,
- total_topics, active_topics):
+ inactive_topics, active_topics):
super(ProduceBenchWorkloadSpec, self).__init__(start_ms, duration_ms)
self.message["class"] = "org.apache.kafka.trogdor.workload.ProduceBenchSpec"
self.message["producerNode"] = producer_node
@@ -29,7 +29,7 @@ class ProduceBenchWorkloadSpec(TaskSpec):
self.message["targetMessagesPerSec"] = target_messages_per_sec
self.message["maxMessages"] = max_messages
self.message["producerConf"] = producer_conf
- self.message["totalTopics"] = total_topics
+ self.message["inactiveTopics"] = inactive_topics
self.message["activeTopics"] = active_topics
diff --git a/tests/kafkatest/services/trogdor/round_trip_workload.py b/tests/kafkatest/services/trogdor/round_trip_workload.py
index 588bba8..86bc2d2 100644
--- a/tests/kafkatest/services/trogdor/round_trip_workload.py
+++ b/tests/kafkatest/services/trogdor/round_trip_workload.py
@@ -20,14 +20,14 @@ from kafkatest.services.trogdor.task_spec import TaskSpec
class RoundTripWorkloadSpec(TaskSpec):
def __init__(self, start_ms, duration_ms, client_node, bootstrap_servers,
- target_messages_per_sec, partition_assignments, max_messages):
+ target_messages_per_sec, max_messages, active_topics):
super(RoundTripWorkloadSpec, self).__init__(start_ms, duration_ms)
self.message["class"] = "org.apache.kafka.trogdor.workload.RoundTripWorkloadSpec"
self.message["clientNode"] = client_node
self.message["bootstrapServers"] = bootstrap_servers
self.message["targetMessagesPerSec"] = target_messages_per_sec
- self.message["partitionAssignments"] = partition_assignments
self.message["maxMessages"] = max_messages
+ self.message["activeTopics"] = active_topics
class RoundTripWorkloadService(Service):
diff --git a/tests/kafkatest/tests/core/produce_bench_test.py b/tests/kafkatest/tests/core/produce_bench_test.py
index 99df666..6a1724d 100644
--- a/tests/kafkatest/tests/core/produce_bench_test.py
+++ b/tests/kafkatest/tests/core/produce_bench_test.py
@@ -43,14 +43,16 @@ class ProduceBenchTest(Test):
self.zk.stop()
def test_produce_bench(self):
+ active_topics={"produce_bench_topic[0-1]":{"numPartitions":1, "replicationFactor":3}}
+ inactive_topics={"produce_bench_topic[2-9]":{"numPartitions":1, "replicationFactor":3}}
spec = ProduceBenchWorkloadSpec(0, TaskSpec.MAX_DURATION_MS,
self.workload_service.producer_node,
self.workload_service.bootstrap_servers,
target_messages_per_sec=1000,
max_messages=100000,
producer_conf={},
- total_topics=10,
- active_topics=2)
+ inactive_topics=inactive_topics,
+ active_topics=active_topics)
workload1 = self.trogdor.create_task("workload1", spec)
workload1.wait_for_done(timeout_sec=360)
tasks = self.trogdor.tasks()
diff --git a/tests/kafkatest/tests/core/round_trip_fault_test.py b/tests/kafkatest/tests/core/round_trip_fault_test.py
index f03d6a1..8bee6a1 100644
--- a/tests/kafkatest/tests/core/round_trip_fault_test.py
+++ b/tests/kafkatest/tests/core/round_trip_fault_test.py
@@ -25,6 +25,8 @@ from kafkatest.services.zookeeper import ZookeeperService
class RoundTripFaultTest(Test):
+ topic_name_index = 0
+
def __init__(self, test_context):
""":type test_context: ducktape.tests.test.TestContext"""
super(RoundTripFaultTest, self).__init__(test_context)
@@ -33,12 +35,15 @@ class RoundTripFaultTest(Test):
self.workload_service = RoundTripWorkloadService(test_context, self.kafka)
self.trogdor = TrogdorService(context=self.test_context,
client_services=[self.zk, self.kafka, self.workload_service])
+ topic_name = "round_trip_topic%d" % RoundTripFaultTest.topic_name_index
+ RoundTripFaultTest.topic_name_index = RoundTripFaultTest.topic_name_index + 1
+ active_topics={topic_name : {"partitionAssignments":{"0": [0,1,2]}}}
self.round_trip_spec = RoundTripWorkloadSpec(0, TaskSpec.MAX_DURATION_MS,
self.workload_service.client_node,
self.workload_service.bootstrap_servers,
target_messages_per_sec=10000,
- partition_assignments={0: [0,1,2]},
- max_messages=100000)
+ max_messages=100000,
+ active_topics=active_topics)
def setUp(self):
self.zk.start()
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/workload/PartitionsSpec.java b/tools/src/main/java/org/apache/kafka/trogdor/workload/PartitionsSpec.java
index 75f85c4..a6ebb21 100644
--- a/tools/src/main/java/org/apache/kafka/trogdor/workload/PartitionsSpec.java
+++ b/tools/src/main/java/org/apache/kafka/trogdor/workload/PartitionsSpec.java
@@ -23,9 +23,11 @@ import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.trogdor.rest.Message;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Map.Entry;
/**
* Describes some partitions.
@@ -44,8 +46,20 @@ public class PartitionsSpec extends Message {
@JsonProperty("partitionAssignments") Map<Integer, List<Integer>> partitionAssignments) {
this.numPartitions = numPartitions;
this.replicationFactor = replicationFactor;
- this.partitionAssignments = partitionAssignments == null ?
- new HashMap<Integer, List<Integer>>() : partitionAssignments;
+ HashMap<Integer, List<Integer>> partMap = new HashMap<>();
+ if (partitionAssignments != null) {
+ for (Entry<Integer, List<Integer>> entry : partitionAssignments.entrySet()) {
+ int partition = entry.getKey() == null ? 0 : entry.getKey();
+ ArrayList<Integer> assignments = new ArrayList<>();
+ if (entry.getValue() != null) {
+ for (Integer brokerId : entry.getValue()) {
+ assignments.add(brokerId == null ? Integer.valueOf(0) : brokerId);
+ }
+ }
+ partMap.put(partition, Collections.unmodifiableList(assignments));
+ }
+ }
+ this.partitionAssignments = Collections.unmodifiableMap(partMap);
}
@JsonProperty
@@ -72,7 +86,7 @@ public class PartitionsSpec extends Message {
}
@JsonProperty
- public Map<Integer, List<Integer>> partitionAssignmentsap() {
+ public Map<Integer, List<Integer>> partitionAssignments() {
return partitionAssignments;
}
diff --git a/tools/src/test/java/org/apache/kafka/trogdor/common/JsonSerializationTest.java b/tools/src/test/java/org/apache/kafka/trogdor/common/JsonSerializationTest.java
index 55ecb1a..ea1eda6 100644
--- a/tools/src/test/java/org/apache/kafka/trogdor/common/JsonSerializationTest.java
+++ b/tools/src/test/java/org/apache/kafka/trogdor/common/JsonSerializationTest.java
@@ -33,6 +33,10 @@ import org.apache.kafka.trogdor.workload.TopicsSpec;
import org.junit.Test;
import java.lang.reflect.Field;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
import static org.junit.Assert.assertNotNull;
@@ -55,6 +59,11 @@ public class JsonSerializationTest {
0, null, null, 0));
verify(new TopicsSpec());
verify(new PartitionsSpec(0, (short) 0, null));
+ Map<Integer, List<Integer>> partitionAssignments = new HashMap<Integer, List<Integer>>();
+ partitionAssignments.put(0, Arrays.asList(1, 2, 3));
+ partitionAssignments.put(1, Arrays.asList(1, 2, 3));
+ verify(new PartitionsSpec(0, (short) 0, partitionAssignments));
+ verify(new PartitionsSpec(0, (short) 0, null));
}
private <T> void verify(T val1) throws Exception {
--
To stop receiving notification emails like this one, please contact
rsivaram@apache.org.