You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ew...@apache.org on 2016/11/11 17:10:31 UTC
kafka git commit: MINOR: Extend mirror maker test to include
interceptors
Repository: kafka
Updated Branches:
refs/heads/trunk 0fc1898bf -> 978393446
MINOR: Extend mirror maker test to include interceptors
Author: Konstantine Karantasis <ko...@confluent.io>
Reviewers: Ewen Cheslack-Postava <ew...@confluent.io>
Closes #2081 from kkonstantine/MINOR-Extend-mirror-maker-test-to-include-interceptors
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/97839344
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/97839344
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/97839344
Branch: refs/heads/trunk
Commit: 97839344601cc67256e871bd461e877eed974104
Parents: 0fc1898
Author: Konstantine Karantasis <ko...@confluent.io>
Authored: Fri Nov 11 09:10:20 2016 -0800
Committer: Ewen Cheslack-Postava <me...@ewencp.org>
Committed: Fri Nov 11 09:10:20 2016 -0800
----------------------------------------------------------------------
tests/kafkatest/services/connect.py | 13 +++++++++++--
tests/kafkatest/services/kafka/kafka.py | 1 +
.../services/kafka/templates/log4j.properties | 2 +-
tests/kafkatest/services/mirror_maker.py | 14 ++++++++++++--
.../templates/mirror_maker_consumer.properties | 8 ++++++++
.../templates/mirror_maker_producer.properties | 4 ++++
6 files changed, 37 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/97839344/tests/kafkatest/services/connect.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/connect.py b/tests/kafkatest/services/connect.py
index bd2c9b9..091eaf7 100644
--- a/tests/kafkatest/services/connect.py
+++ b/tests/kafkatest/services/connect.py
@@ -57,6 +57,7 @@ class ConnectServiceBase(KafkaPathResolverMixin, Service):
self.kafka = kafka
self.security_config = kafka.security_config.client_config()
self.files = files
+ self.environment = {}
def pids(self, node):
"""Return process ids for Kafka Connect processes."""
@@ -208,6 +209,8 @@ class ConnectStandaloneService(ConnectServiceBase):
def start_cmd(self, node, connector_configs):
cmd = "( export KAFKA_LOG4J_OPTS=\"-Dlog4j.configuration=file:%s\"; " % self.LOG4J_CONFIG_FILE
cmd += "export KAFKA_OPTS=%s; " % self.security_config.kafka_opts
+ for envvar in self.environment:
+ cmd += "export %s=%s; " % envvar, str(self.environment[envvar])
cmd += "%s %s " % (self.path.script("connect-standalone.sh", node), self.CONFIG_FILE)
cmd += " ".join(connector_configs)
cmd += " & echo $! >&3 ) 1>> %s 2>> %s 3> %s" % (self.STDOUT_FILE, self.STDERR_FILE, self.PID_FILE)
@@ -227,7 +230,9 @@ class ConnectStandaloneService(ConnectServiceBase):
self.logger.info("Starting Kafka Connect standalone process on " + str(node.account))
with node.account.monitor_log(self.LOG_FILE) as monitor:
- node.account.ssh(self.start_cmd(node, remote_connector_configs))
+ cmd = self.start_cmd(node, remote_connector_configs)
+ self.logger.debug("Connect standalone command: %s", cmd)
+ node.account.ssh(cmd)
monitor.wait_until('Kafka Connect started', timeout_sec=30, err_msg="Never saw message indicating Kafka Connect finished startup on " + str(node.account))
if len(self.pids(node)) == 0:
@@ -247,6 +252,8 @@ class ConnectDistributedService(ConnectServiceBase):
def start_cmd(self, node):
cmd = "( export KAFKA_LOG4J_OPTS=\"-Dlog4j.configuration=file:%s\"; " % self.LOG4J_CONFIG_FILE
cmd += "export KAFKA_OPTS=%s; " % self.security_config.kafka_opts
+ for envvar in self.environment:
+ cmd += "export %s=%s; " % envvar, str(self.environment[envvar])
cmd += "%s %s " % (self.path.script("connect-distributed.sh", node), self.CONFIG_FILE)
cmd += " & echo $! >&3 ) 1>> %s 2>> %s 3> %s" % (self.STDOUT_FILE, self.STDERR_FILE, self.PID_FILE)
return cmd
@@ -262,7 +269,9 @@ class ConnectDistributedService(ConnectServiceBase):
self.logger.info("Starting Kafka Connect distributed process on " + str(node.account))
with node.account.monitor_log(self.LOG_FILE) as monitor:
- node.account.ssh(self.start_cmd(node))
+ cmd = self.start_cmd(node)
+ self.logger.debug("Connect distributed command: %s", cmd)
+ node.account.ssh(cmd)
monitor.wait_until('Kafka Connect started', timeout_sec=15, err_msg="Never saw message indicating Kafka Connect finished startup on " + str(node.account))
if len(self.pids(node)) == 0:
http://git-wip-us.apache.org/repos/asf/kafka/blob/97839344/tests/kafkatest/services/kafka/kafka.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/kafka/kafka.py b/tests/kafkatest/services/kafka/kafka.py
index 2fe169b..4ac53b1 100644
--- a/tests/kafkatest/services/kafka/kafka.py
+++ b/tests/kafkatest/services/kafka/kafka.py
@@ -88,6 +88,7 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service):
self.authorizer_class_name = authorizer_class_name
self.zk_set_acl = False
self.server_prop_overides = server_prop_overides
+ self.log_level = "DEBUG"
#
# In a heavily loaded and not very fast machine, it is
http://git-wip-us.apache.org/repos/asf/kafka/blob/97839344/tests/kafkatest/services/kafka/templates/log4j.properties
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/kafka/templates/log4j.properties b/tests/kafkatest/services/kafka/templates/log4j.properties
index 8098f3b..7a2a8dc 100644
--- a/tests/kafkatest/services/kafka/templates/log4j.properties
+++ b/tests/kafkatest/services/kafka/templates/log4j.properties
@@ -111,7 +111,7 @@ log4j.logger.kafka.client.ClientUtils=DEBUG, kafkaInfoAppender, kafkaDebugAppend
log4j.logger.kafka.perf=DEBUG, kafkaInfoAppender, kafkaDebugAppender
log4j.logger.kafka.perf.ProducerPerformance$ProducerThread=DEBUG, kafkaInfoAppender, kafkaDebugAppender
log4j.logger.org.I0Itec.zkclient.ZkClient=DEBUG, kafkaInfoAppender, kafkaDebugAppender
-log4j.logger.kafka=DEBUG, kafkaInfoAppender, kafkaDebugAppender
+log4j.logger.kafka={{ log_level|default("DEBUG") }}, kafkaInfoAppender, kafkaDebugAppender
log4j.logger.kafka.network.RequestChannel$=DEBUG, requestInfoAppender, requestDebugAppender
log4j.additivity.kafka.network.RequestChannel$=false
http://git-wip-us.apache.org/repos/asf/kafka/blob/97839344/tests/kafkatest/services/mirror_maker.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/mirror_maker.py b/tests/kafkatest/services/mirror_maker.py
index 7c0601f..734452c 100644
--- a/tests/kafkatest/services/mirror_maker.py
+++ b/tests/kafkatest/services/mirror_maker.py
@@ -73,7 +73,7 @@ class MirrorMaker(KafkaPathResolverMixin, Service):
def __init__(self, context, num_nodes, source, target, whitelist=None, blacklist=None, num_streams=1,
new_consumer=False, consumer_timeout_ms=None, offsets_storage="kafka",
- offset_commit_interval_ms=60000):
+ offset_commit_interval_ms=60000, log_level="DEBUG", producer_interceptor_classes=None):
"""
MirrorMaker mirrors messages from one or more source clusters to a single destination cluster.
@@ -91,7 +91,7 @@ class MirrorMaker(KafkaPathResolverMixin, Service):
offset_commit_interval_ms: how frequently the mirror maker consumer commits offsets
"""
super(MirrorMaker, self).__init__(context, num_nodes=num_nodes)
- self.log_level = "DEBUG"
+ self.log_level = log_level
self.new_consumer = new_consumer
self.consumer_timeout_ms = consumer_timeout_ms
self.num_streams = num_streams
@@ -108,11 +108,21 @@ class MirrorMaker(KafkaPathResolverMixin, Service):
raise Exception("offsets_storage should be 'kafka' or 'zookeeper'. Instead found %s" % self.offsets_storage)
self.offset_commit_interval_ms = offset_commit_interval_ms
+ self.producer_interceptor_classes = producer_interceptor_classes
+ self.external_jars = None
+
+ # These properties are potentially used by third-party tests.
+ self.source_auto_offset_reset = None
+ self.partition_assignment_strategy = None
def start_cmd(self, node):
cmd = "export LOG_DIR=%s;" % MirrorMaker.LOG_DIR
cmd += " export KAFKA_LOG4J_OPTS=\"-Dlog4j.configuration=file:%s\";" % MirrorMaker.LOG4J_CONFIG
cmd += " export KAFKA_OPTS=%s;" % self.security_config.kafka_opts
+ # add external dependencies, for instance for interceptors
+ if self.external_jars is not None:
+ cmd += "for file in %s; do CLASSPATH=$CLASSPATH:$file; done; " % self.external_jars
+ cmd += "export CLASSPATH; "
cmd += " %s kafka.tools.MirrorMaker" % self.path.script("kafka-run-class.sh", node)
cmd += " --consumer.config %s" % MirrorMaker.CONSUMER_CONFIG
cmd += " --producer.config %s" % MirrorMaker.PRODUCER_CONFIG
http://git-wip-us.apache.org/repos/asf/kafka/blob/97839344/tests/kafkatest/services/templates/mirror_maker_consumer.properties
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/templates/mirror_maker_consumer.properties b/tests/kafkatest/services/templates/mirror_maker_consumer.properties
index 68641ab..0e5b472 100644
--- a/tests/kafkatest/services/templates/mirror_maker_consumer.properties
+++ b/tests/kafkatest/services/templates/mirror_maker_consumer.properties
@@ -21,9 +21,17 @@ zookeeper.connect={{ source.zk.connect_setting() }}
zookeeper.connection.timeout.ms={{ zookeeper_connection_timeout_ms|default(6000) }}
{% endif %}
+{% if source_auto_offset_reset is defined and source_auto_offset_reset is not none %}
+auto.offset.reset={{ source_auto_offset_reset|default('latest') }}
+{% endif %}
+
group.id={{ group_id|default('test-consumer-group') }}
offsets.storage={{ offsets_storage }}
{% if consumer_timeout_ms is defined and consumer_timeout_ms is not none %}
consumer.timeout.ms={{ consumer_timeout_ms }}
{% endif %}
+
+{% if partition_assignment_strategy is defined and partition_assignment_strategy is not none %}
+partition.assignment.strategy={{ partition_assignment_strategy }}
+{% endif %}
http://git-wip-us.apache.org/repos/asf/kafka/blob/97839344/tests/kafkatest/services/templates/mirror_maker_producer.properties
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/templates/mirror_maker_producer.properties b/tests/kafkatest/services/templates/mirror_maker_producer.properties
index ff50b50..01cb75f 100644
--- a/tests/kafkatest/services/templates/mirror_maker_producer.properties
+++ b/tests/kafkatest/services/templates/mirror_maker_producer.properties
@@ -19,3 +19,7 @@ bootstrap.servers = {{ target.bootstrap_servers(security_config.security_protoco
producer.type={{ producer_type|default("async") }} # sync or async
compression.codec=none
serializer.class=kafka.serializer.DefaultEncoder
+
+{% if producer_interceptor_classes is defined and producer_interceptor_classes is not none %}
+interceptor.classes={{ producer_interceptor_classes }}
+{% endif %}