You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ew...@apache.org on 2016/05/09 23:56:37 UTC
kafka git commit: KAFKA-3676: system tests for connector pause/resume
Repository: kafka
Updated Branches:
refs/heads/trunk eb1de107b -> f96da638e
KAFKA-3676: system tests for connector pause/resume
Author: Jason Gustafson <ja...@confluent.io>
Reviewers: Ewen Cheslack-Postava <ew...@confluent.io>
Closes #1345 from hachikuji/KAFKA-3676
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/f96da638
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/f96da638
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/f96da638
Branch: refs/heads/trunk
Commit: f96da638ee9a4e1e47ece1ea337ee071d911c3da
Parents: eb1de10
Author: Jason Gustafson <ja...@confluent.io>
Authored: Mon May 9 16:56:32 2016 -0700
Committer: Ewen Cheslack-Postava <me...@ewencp.org>
Committed: Mon May 9 16:56:32 2016 -0700
----------------------------------------------------------------------
.../storage/KafkaConfigBackingStore.java | 2 +-
tests/kafkatest/services/connect.py | 28 +++-
tests/kafkatest/services/kafka/kafka.py | 2 +-
.../tests/connect/connect_distributed_test.py | 146 ++++++++++++++++++-
4 files changed, 166 insertions(+), 12 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/f96da638/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java
index a894f31..9a93a4e 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java
@@ -469,7 +469,7 @@ public class KafkaConfigBackingStore implements ConfigBackingStore {
try {
TargetState state = TargetState.valueOf((String) targetState);
- log.trace("Setting target state for connector {} to {}", connectorName, targetState);
+ log.debug("Setting target state for connector {} to {}", connectorName, targetState);
connectorTargetStates.put(connectorName, state);
} catch (IllegalArgumentException e) {
log.error("Invalid target state for connector ({}): {}", connectorName, targetState);
http://git-wip-us.apache.org/repos/asf/kafka/blob/f96da638/tests/kafkatest/services/connect.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/connect.py b/tests/kafkatest/services/connect.py
index cf67c30..aad9ff3 100644
--- a/tests/kafkatest/services/connect.py
+++ b/tests/kafkatest/services/connect.py
@@ -88,12 +88,15 @@ class ConnectServiceBase(KafkaPathResolverMixin, Service):
node.account.ssh("rm -f " + self.PID_FILE, allow_fail=False)
- def restart(self):
+ def restart(self, clean_shutdown=True):
# We don't want to do any clean up here, just restart the process.
for node in self.nodes:
self.logger.info("Restarting Kafka Connect on " + str(node.account))
- self.stop_node(node)
- self.start_node(node)
+ self.restart_node(node, clean_shutdown)
+
+ def restart_node(self, node, clean_shutdown=True):
+ self.stop_node(node, clean_shutdown)
+ self.start_node(node)
def clean_node(self, node):
node.account.kill_process("connect", clean_shutdown=False, allow_fail=True)
@@ -128,6 +131,15 @@ class ConnectServiceBase(KafkaPathResolverMixin, Service):
def delete_connector(self, name, node=None, retries=0, retry_backoff=.01):
return self._rest_with_retry('/connectors/' + name, node=node, method="DELETE", retries=retries, retry_backoff=retry_backoff)
+ def get_connector_status(self, name, node=None):
+ return self._rest('/connectors/' + name + '/status', node=node)
+
+ def pause_connector(self, name, node=None):
+ return self._rest('/connectors/' + name + '/pause', method="PUT")
+
+ def resume_connector(self, name, node=None):
+ return self._rest('/connectors/' + name + '/resume', method="PUT")
+
def _rest(self, path, body=None, node=None, method="GET"):
if node is None:
node = random.choice(self.nodes)
@@ -139,7 +151,7 @@ class ConnectServiceBase(KafkaPathResolverMixin, Service):
self.logger.debug("%s %s response: %d", url, method, resp.status_code)
if resp.status_code > 400:
raise ConnectRestError(resp.status_code, resp.text, resp.url)
- if resp.status_code == 204:
+ if resp.status_code == 204 or resp.status_code == 202:
return None
else:
return resp.json()
@@ -185,7 +197,7 @@ class ConnectStandaloneService(ConnectServiceBase):
self.logger.info("Starting Kafka Connect standalone process on " + str(node.account))
with node.account.monitor_log(self.LOG_FILE) as monitor:
node.account.ssh(self.start_cmd(node, remote_connector_configs))
- monitor.wait_until('Kafka Connect started', timeout_sec=15, err_msg="Never saw message indicating Kafka Connect finished startup on " + str(node.account))
+ monitor.wait_until('Kafka Connect started', timeout_sec=30, err_msg="Never saw message indicating Kafka Connect finished startup on " + str(node.account))
if len(self.pids(node)) == 0:
raise RuntimeError("No process ids recorded")
@@ -298,6 +310,12 @@ class VerifiableSink(VerifiableConnector):
self.tasks = tasks
self.topics = topics
+ def flushed_messages(self):
+ return filter(lambda m: 'flushed' in m and m['flushed'], self.messages())
+
+ def received_messages(self):
+ return filter(lambda m: 'flushed' not in m or not m['flushed'], self.messages())
+
def start(self):
self.logger.info("Creating connector VerifiableSinkConnector %s", self.name)
self.cc.create_connector({
http://git-wip-us.apache.org/repos/asf/kafka/blob/f96da638/tests/kafkatest/services/kafka/kafka.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/kafka/kafka.py b/tests/kafkatest/services/kafka/kafka.py
index 6ff7d0c..334069d 100644
--- a/tests/kafkatest/services/kafka/kafka.py
+++ b/tests/kafkatest/services/kafka/kafka.py
@@ -227,7 +227,7 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service):
for pid in pids:
node.account.signal(pid, sig, allow_fail=False)
- wait_until(lambda: len(self.pids(node)) == 0, timeout_sec=20, err_msg="Kafka node failed to stop")
+ wait_until(lambda: len(self.pids(node)) == 0, timeout_sec=60, err_msg="Kafka node failed to stop")
def clean_node(self, node):
JmxMixin.clean_node(self, node)
http://git-wip-us.apache.org/repos/asf/kafka/blob/f96da638/tests/kafkatest/tests/connect/connect_distributed_test.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/connect/connect_distributed_test.py b/tests/kafkatest/tests/connect/connect_distributed_test.py
index 698a827..d3ae2e1 100644
--- a/tests/kafkatest/tests/connect/connect_distributed_test.py
+++ b/tests/kafkatest/tests/connect/connect_distributed_test.py
@@ -17,13 +17,14 @@ from ducktape.tests.test import Test
from kafkatest.services.zookeeper import ZookeeperService
from kafkatest.services.kafka import KafkaService
-from kafkatest.services.connect import ConnectDistributedService, VerifiableSource, VerifiableSink
+from kafkatest.services.connect import ConnectDistributedService, VerifiableSource, VerifiableSink, ConnectRestError
from kafkatest.services.console_consumer import ConsoleConsumer
from kafkatest.services.security.security_config import SecurityConfig
from ducktape.utils.util import wait_until
from ducktape.mark import matrix
import subprocess, itertools, time
from collections import Counter
+import operator
class ConnectDistributedTest(Test):
"""
@@ -73,6 +74,142 @@ class ConnectDistributedTest(Test):
self.zk.start()
self.kafka.start()
+ def _start_connector(self, config_file):
+ connector_props = self.render(config_file)
+ connector_config = dict([line.strip().split('=', 1) for line in connector_props.split('\n') if line.strip() and not line.strip().startswith('#')])
+ self.cc.create_connector(connector_config)
+
+ def _connector_status(self, connector, node=None):
+ try:
+ return self.cc.get_connector_status(connector, node)
+ except ConnectRestError:
+ return None
+
+ def _has_state(self, status, state):
+ return status is not None and status['connector']['state'] == state
+
+ def _all_tasks_have_state(self, status, task_count, state):
+ if status is None:
+ return False
+
+ tasks = status['tasks']
+ if len(tasks) != task_count:
+ return False
+
+ return reduce(operator.and_, [task['state'] == state for task in tasks], True)
+
+ def is_running(self, connector, node=None):
+ status = self._connector_status(connector.name, node)
+ return self._has_state(status, 'RUNNING') and self._all_tasks_have_state(status, connector.tasks, 'RUNNING')
+
+ def is_paused(self, connector, node=None):
+ status = self._connector_status(connector.name, node)
+ return self._has_state(status, 'PAUSED') and self._all_tasks_have_state(status, connector.tasks, 'PAUSED')
+
+ def test_pause_and_resume_source(self):
+ """
+ Verify that source connectors stop producing records when paused and begin again after
+ being resumed.
+ """
+
+ self.setup_services()
+ self.cc.set_configs(lambda node: self.render("connect-distributed.properties", node=node))
+ self.cc.start()
+
+ self.source = VerifiableSource(self.cc)
+ self.source.start()
+
+ wait_until(lambda: self.is_running(self.source), timeout_sec=30,
+ err_msg="Failed to see connector transition to the RUNNING state")
+
+ self.cc.pause_connector(self.source.name)
+
+ # wait until all nodes report the paused transition
+ for node in self.cc.nodes:
+ wait_until(lambda: self.is_paused(self.source, node), timeout_sec=30,
+ err_msg="Failed to see connector transition to the PAUSED state")
+
+ # verify that we do not produce new messages while paused
+ num_messages = len(self.source.messages())
+ time.sleep(10)
+ assert num_messages == len(self.source.messages()), "Paused source connector should not produce any messages"
+
+ self.cc.resume_connector(self.source.name)
+
+ for node in self.cc.nodes:
+ wait_until(lambda: self.is_running(self.source, node), timeout_sec=30,
+ err_msg="Failed to see connector transition to the RUNNING state")
+
+ # after resuming, we should see records produced again
+ wait_until(lambda: len(self.source.messages()) > num_messages, timeout_sec=30,
+ err_msg="Failed to produce messages after resuming source connector")
+
+ def test_pause_and_resume_sink(self):
+ """
+ Verify that sink connectors stop consuming records when paused and begin again after
+ being resumed.
+ """
+
+ self.setup_services()
+ self.cc.set_configs(lambda node: self.render("connect-distributed.properties", node=node))
+ self.cc.start()
+
+ # use the verifiable source to produce a steady stream of messages
+ self.source = VerifiableSource(self.cc)
+ self.source.start()
+
+ self.sink = VerifiableSink(self.cc)
+ self.sink.start()
+
+ wait_until(lambda: self.is_running(self.sink), timeout_sec=30,
+ err_msg="Failed to see connector transition to the RUNNING state")
+
+ self.cc.pause_connector(self.sink.name)
+
+ # wait until all nodes report the paused transition
+ for node in self.cc.nodes:
+ wait_until(lambda: self.is_paused(self.sink, node), timeout_sec=30,
+ err_msg="Failed to see connector transition to the PAUSED state")
+
+ # verify that we do not consume new messages while paused
+ num_messages = len(self.sink.received_messages())
+ time.sleep(10)
+ assert num_messages == len(self.sink.received_messages()), "Paused sink connector should not consume any messages"
+
+ self.cc.resume_connector(self.sink.name)
+
+ for node in self.cc.nodes:
+ wait_until(lambda: self.is_running(self.sink, node), timeout_sec=30,
+ err_msg="Failed to see connector transition to the RUNNING state")
+
+ # after resuming, we should see records consumed again
+ wait_until(lambda: len(self.sink.received_messages()) > num_messages, timeout_sec=30,
+ err_msg="Failed to consume messages after resuming source connector")
+
+
+ def test_pause_state_persistent(self):
+ """
+ Verify that paused state is preserved after a cluster restart.
+ """
+
+ self.setup_services()
+ self.cc.set_configs(lambda node: self.render("connect-distributed.properties", node=node))
+ self.cc.start()
+
+ self.source = VerifiableSource(self.cc)
+ self.source.start()
+
+ wait_until(lambda: self.is_running(self.source), timeout_sec=30,
+ err_msg="Failed to see connector transition to the RUNNING state")
+
+ self.cc.pause_connector(self.source.name)
+
+ self.cc.restart()
+
+ # we should still be paused after restarting
+ for node in self.cc.nodes:
+ wait_until(lambda: self.is_paused(self.source, node), timeout_sec=30,
+ err_msg="Failed to see connector startup in PAUSED state")
@matrix(security_protocol=[SecurityConfig.PLAINTEXT, SecurityConfig.SASL_SSL])
def test_file_source_and_sink(self, security_protocol):
@@ -87,10 +224,9 @@ class ConnectDistributedTest(Test):
self.cc.start()
self.logger.info("Creating connectors")
- for connector_props in [self.render("connect-file-source.properties"), self.render("connect-file-sink.properties")]:
- connector_config = dict([line.strip().split('=', 1) for line in connector_props.split('\n') if line.strip() and not line.strip().startswith('#')])
- self.cc.create_connector(connector_config)
-
+ self._start_connector("connect-file-source.properties")
+ self._start_connector("connect-file-sink.properties")
+
# Generating data on the source node should generate new records and create new output on the sink node. Timeouts
# here need to be more generous than they are for standalone mode because a) it takes longer to write configs,
# do rebalancing of the group, etc, and b) without explicit leave group support, rebalancing takes awhile