You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gw...@apache.org on 2015/11/13 03:54:30 UTC
kafka git commit: KAFKA-2826: Make Kafka Connect ducktape services
easier to extend.
Repository: kafka
Updated Branches:
refs/heads/trunk 2802bd081 -> 969d0cb0a
KAFKA-2826: Make Kafka Connect ducktape services easier to extend.
Author: Ewen Cheslack-Postava <me...@ewencp.org>
Reviewers: Gwen Shapira
Closes #522 from ewencp/kafka-2826-extensible-connect-services
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/969d0cb0
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/969d0cb0
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/969d0cb0
Branch: refs/heads/trunk
Commit: 969d0cb0ae316ba0dfdb34ed096bfd56fe86ad92
Parents: 2802bd0
Author: Ewen Cheslack-Postava <me...@ewencp.org>
Authored: Thu Nov 12 18:54:20 2015 -0800
Committer: Gwen Shapira <cs...@gmail.com>
Committed: Thu Nov 12 18:54:20 2015 -0800
----------------------------------------------------------------------
tests/kafkatest/services/connect.py | 23 +++++++++++++++--------
1 file changed, 15 insertions(+), 8 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/969d0cb0/tests/kafkatest/services/connect.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/connect.py b/tests/kafkatest/services/connect.py
index 26feb99..a6e902f 100644
--- a/tests/kafkatest/services/connect.py
+++ b/tests/kafkatest/services/connect.py
@@ -151,6 +151,13 @@ class ConnectStandaloneService(ConnectServiceBase):
def node(self):
return self.nodes[0]
+ def start_cmd(self, node, connector_configs):
+ cmd = "( export KAFKA_LOG4J_OPTS=\"-Dlog4j.configuration=file:%s\"; " % self.LOG4J_CONFIG_FILE
+ cmd += "/opt/%s/bin/connect-standalone.sh %s " % (kafka_dir(node), self.CONFIG_FILE)
+ cmd += " ".join(connector_configs)
+ cmd += " & echo $! >&3 ) 1>> %s 2>> %s 3> %s" % (self.STDOUT_FILE, self.STDERR_FILE, self.PID_FILE)
+ return cmd
+
def start_node(self, node):
node.account.ssh("mkdir -p %s" % self.PERSISTENT_ROOT, allow_fail=False)
@@ -164,10 +171,7 @@ class ConnectStandaloneService(ConnectServiceBase):
self.logger.info("Starting Kafka Connect standalone process on " + str(node.account))
with node.account.monitor_log(self.LOG_FILE) as monitor:
- node.account.ssh("( export KAFKA_LOG4J_OPTS=\"-Dlog4j.configuration=file:%s\"; " % self.LOG4J_CONFIG_FILE +
- "/opt/%s/bin/connect-standalone.sh %s " % (kafka_dir(node), self.CONFIG_FILE) +
- " ".join(remote_connector_configs) +
- (" & echo $! >&3 ) 1>> %s 2>> %s 3> %s" % (self.STDOUT_FILE, self.STDERR_FILE, self.PID_FILE)))
+ node.account.ssh(self.start_cmd(node, remote_connector_configs))
monitor.wait_until('Kafka Connect started', timeout_sec=15, err_msg="Never saw message indicating Kafka Connect finished startup on " + str(node.account))
if len(self.pids(node)) == 0:
@@ -182,6 +186,12 @@ class ConnectDistributedService(ConnectServiceBase):
self.offsets_topic = offsets_topic
self.configs_topic = configs_topic
+ def start_cmd(self, node):
+ cmd = "( export KAFKA_LOG4J_OPTS=\"-Dlog4j.configuration=file:%s\"; " % self.LOG4J_CONFIG_FILE
+ cmd += "/opt/%s/bin/connect-distributed.sh %s " % (kafka_dir(node), self.CONFIG_FILE)
+ cmd += " & echo $! >&3 ) 1>> %s 2>> %s 3> %s" % (self.STDOUT_FILE, self.STDERR_FILE, self.PID_FILE)
+ return cmd
+
def start_node(self, node):
node.account.ssh("mkdir -p %s" % self.PERSISTENT_ROOT, allow_fail=False)
@@ -192,10 +202,7 @@ class ConnectDistributedService(ConnectServiceBase):
self.logger.info("Starting Kafka Connect distributed process on " + str(node.account))
with node.account.monitor_log(self.LOG_FILE) as monitor:
- cmd = "( export KAFKA_LOG4J_OPTS=\"-Dlog4j.configuration=file:%s\"; " % self.LOG4J_CONFIG_FILE
- cmd += "/opt/%s/bin/connect-distributed.sh %s " % (kafka_dir(node), self.CONFIG_FILE)
- cmd += " & echo $! >&3 ) 1>> %s 2>> %s 3> %s" % (self.STDOUT_FILE, self.STDERR_FILE, self.PID_FILE)
- node.account.ssh(cmd)
+ node.account.ssh(self.start_cmd(node))
monitor.wait_until('Kafka Connect started', timeout_sec=15, err_msg="Never saw message indicating Kafka Connect finished startup on " + str(node.account))
if len(self.pids(node)) == 0: