You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by kk...@apache.org on 2020/03/26 05:03:05 UTC
[kafka] branch 2.0 updated: MINOR: Backport kafkatest per-broker
overrides and extra JVM args (#8347)
This is an automated email from the ASF dual-hosted git repository.
kkarantasis pushed a commit to branch 2.0
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.0 by this push:
new 052f658 MINOR: Backport kafkatest per-broker overrides and extra JVM args (#8347)
052f658 is described below
commit 052f658e0eff47f8e48ff35d4339c8db97f02610
Author: Brian Bushree <bb...@confluent.io>
AuthorDate: Wed Mar 25 21:58:48 2020 -0700
MINOR: Backport kafkatest per-broker overrides and extra JVM args (#8347)
Backport of #7297 and #7715 to allow per-node broker overrides and extra JVM args
Co-authored-by: David Arthur <mu...@gmail.com>
Reviewers: Konstantine Karantasis <ko...@confluent.io>
---
tests/kafkatest/services/kafka/kafka.py | 15 ++++++++++++---
1 file changed, 12 insertions(+), 3 deletions(-)
diff --git a/tests/kafkatest/services/kafka/kafka.py b/tests/kafkatest/services/kafka/kafka.py
index a59bb71..735cf42 100644
--- a/tests/kafkatest/services/kafka/kafka.py
+++ b/tests/kafkatest/services/kafka/kafka.py
@@ -75,7 +75,8 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service):
def __init__(self, context, num_nodes, zk, security_protocol=SecurityConfig.PLAINTEXT, interbroker_security_protocol=SecurityConfig.PLAINTEXT,
client_sasl_mechanism=SecurityConfig.SASL_MECHANISM_GSSAPI, interbroker_sasl_mechanism=SecurityConfig.SASL_MECHANISM_GSSAPI,
authorizer_class_name=None, topics=None, version=DEV_BRANCH, jmx_object_names=None,
- jmx_attributes=None, zk_connect_timeout=5000, zk_session_timeout=6000, server_prop_overides=None, zk_chroot=None):
+ jmx_attributes=None, zk_connect_timeout=5000, zk_session_timeout=6000, server_prop_overides=None, zk_chroot=None,
+ per_node_server_prop_overrides=None, extra_kafka_opts=""):
"""
:type context
:type zk: ZookeeperService
@@ -99,8 +100,13 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service):
self.server_prop_overides = []
else:
self.server_prop_overides = server_prop_overides
+ if per_node_server_prop_overrides is None:
+ self.per_node_server_prop_overrides = {}
+ else:
+ self.per_node_server_prop_overrides = per_node_server_prop_overrides
self.log_level = "DEBUG"
self.zk_chroot = zk_chroot
+ self.extra_kafka_opts = extra_kafka_opts
#
# In a heavily loaded and not very fast machine, it is
@@ -232,6 +238,9 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service):
for prop in self.server_prop_overides:
override_configs[prop[0]] = prop[1]
+ for prop in self.per_node_server_prop_overrides.get(self.idx(node), []):
+ override_configs[prop[0]] = prop[1]
+
#update template configs with test override configs
configs.update(override_configs)
@@ -253,8 +262,8 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service):
cmd += "export KAFKA_LOG4J_OPTS=\"-Dlog4j.configuration=file:%s\"; " % self.LOG4J_CONFIG
heap_kafka_opts = "-XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=%s" % \
self.logs["kafka_heap_dump_file"]["path"]
- other_kafka_opts = self.security_config.kafka_opts.strip('\"')
- cmd += "export KAFKA_OPTS=\"%s %s\"; " % (heap_kafka_opts, other_kafka_opts)
+ security_kafka_opts = self.security_config.kafka_opts.strip('\"')
+ cmd += "export KAFKA_OPTS=\"%s %s %s\"; " % (heap_kafka_opts, security_kafka_opts, self.extra_kafka_opts)
cmd += "%s %s 1>> %s 2>> %s &" % \
(self.path.script("kafka-server-start.sh", node),
KafkaService.CONFIG_FILE,