You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ew...@apache.org on 2016/05/09 23:56:37 UTC

kafka git commit: KAFKA-3676: system tests for connector pause/resume

Repository: kafka
Updated Branches:
  refs/heads/trunk eb1de107b -> f96da638e


KAFKA-3676: system tests for connector pause/resume

Author: Jason Gustafson <ja...@confluent.io>

Reviewers: Ewen Cheslack-Postava <ew...@confluent.io>

Closes #1345 from hachikuji/KAFKA-3676


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/f96da638
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/f96da638
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/f96da638

Branch: refs/heads/trunk
Commit: f96da638ee9a4e1e47ece1ea337ee071d911c3da
Parents: eb1de10
Author: Jason Gustafson <ja...@confluent.io>
Authored: Mon May 9 16:56:32 2016 -0700
Committer: Ewen Cheslack-Postava <me...@ewencp.org>
Committed: Mon May 9 16:56:32 2016 -0700

----------------------------------------------------------------------
 .../storage/KafkaConfigBackingStore.java        |   2 +-
 tests/kafkatest/services/connect.py             |  28 +++-
 tests/kafkatest/services/kafka/kafka.py         |   2 +-
 .../tests/connect/connect_distributed_test.py   | 146 ++++++++++++++++++-
 4 files changed, 166 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/f96da638/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java
index a894f31..9a93a4e 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java
@@ -469,7 +469,7 @@ public class KafkaConfigBackingStore implements ConfigBackingStore {
 
                         try {
                             TargetState state = TargetState.valueOf((String) targetState);
-                            log.trace("Setting target state for connector {} to {}", connectorName, targetState);
+                            log.debug("Setting target state for connector {} to {}", connectorName, targetState);
                             connectorTargetStates.put(connectorName, state);
                         } catch (IllegalArgumentException e) {
                             log.error("Invalid target state for connector ({}): {}", connectorName, targetState);

http://git-wip-us.apache.org/repos/asf/kafka/blob/f96da638/tests/kafkatest/services/connect.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/connect.py b/tests/kafkatest/services/connect.py
index cf67c30..aad9ff3 100644
--- a/tests/kafkatest/services/connect.py
+++ b/tests/kafkatest/services/connect.py
@@ -88,12 +88,15 @@ class ConnectServiceBase(KafkaPathResolverMixin, Service):
 
         node.account.ssh("rm -f " + self.PID_FILE, allow_fail=False)
 
-    def restart(self):
+    def restart(self, clean_shutdown=True):
         # We don't want to do any clean up here, just restart the process.
         for node in self.nodes:
             self.logger.info("Restarting Kafka Connect on " + str(node.account))
-            self.stop_node(node)
-            self.start_node(node)
+            self.restart_node(node, clean_shutdown)
+
+    def restart_node(self, node, clean_shutdown=True):
+        self.stop_node(node, clean_shutdown)
+        self.start_node(node)
 
     def clean_node(self, node):
         node.account.kill_process("connect", clean_shutdown=False, allow_fail=True)
@@ -128,6 +131,15 @@ class ConnectServiceBase(KafkaPathResolverMixin, Service):
     def delete_connector(self, name, node=None, retries=0, retry_backoff=.01):
         return self._rest_with_retry('/connectors/' + name, node=node, method="DELETE", retries=retries, retry_backoff=retry_backoff)
 
+    def get_connector_status(self, name, node=None):
+        return self._rest('/connectors/' + name + '/status', node=node)
+
+    def pause_connector(self, name, node=None):
+        return self._rest('/connectors/' + name + '/pause', method="PUT")
+
+    def resume_connector(self, name, node=None):
+        return self._rest('/connectors/' + name + '/resume', method="PUT")
+
     def _rest(self, path, body=None, node=None, method="GET"):
         if node is None:
             node = random.choice(self.nodes)
@@ -139,7 +151,7 @@ class ConnectServiceBase(KafkaPathResolverMixin, Service):
         self.logger.debug("%s %s response: %d", url, method, resp.status_code)
         if resp.status_code > 400:
             raise ConnectRestError(resp.status_code, resp.text, resp.url)
-        if resp.status_code == 204:
+        if resp.status_code == 204 or resp.status_code == 202:
             return None
         else:
             return resp.json()
@@ -185,7 +197,7 @@ class ConnectStandaloneService(ConnectServiceBase):
         self.logger.info("Starting Kafka Connect standalone process on " + str(node.account))
         with node.account.monitor_log(self.LOG_FILE) as monitor:
             node.account.ssh(self.start_cmd(node, remote_connector_configs))
-            monitor.wait_until('Kafka Connect started', timeout_sec=15, err_msg="Never saw message indicating Kafka Connect finished startup on " + str(node.account))
+            monitor.wait_until('Kafka Connect started', timeout_sec=30, err_msg="Never saw message indicating Kafka Connect finished startup on " + str(node.account))
 
         if len(self.pids(node)) == 0:
             raise RuntimeError("No process ids recorded")
@@ -298,6 +310,12 @@ class VerifiableSink(VerifiableConnector):
         self.tasks = tasks
         self.topics = topics
 
+    def flushed_messages(self):
+        return filter(lambda m: 'flushed' in m and m['flushed'], self.messages())
+
+    def received_messages(self):
+        return filter(lambda m: 'flushed' not in m or not m['flushed'], self.messages())
+
     def start(self):
         self.logger.info("Creating connector VerifiableSinkConnector %s", self.name)
         self.cc.create_connector({

http://git-wip-us.apache.org/repos/asf/kafka/blob/f96da638/tests/kafkatest/services/kafka/kafka.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/kafka/kafka.py b/tests/kafkatest/services/kafka/kafka.py
index 6ff7d0c..334069d 100644
--- a/tests/kafkatest/services/kafka/kafka.py
+++ b/tests/kafkatest/services/kafka/kafka.py
@@ -227,7 +227,7 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service):
 
         for pid in pids:
             node.account.signal(pid, sig, allow_fail=False)
-        wait_until(lambda: len(self.pids(node)) == 0, timeout_sec=20, err_msg="Kafka node failed to stop")
+        wait_until(lambda: len(self.pids(node)) == 0, timeout_sec=60, err_msg="Kafka node failed to stop")
 
     def clean_node(self, node):
         JmxMixin.clean_node(self, node)

http://git-wip-us.apache.org/repos/asf/kafka/blob/f96da638/tests/kafkatest/tests/connect/connect_distributed_test.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/connect/connect_distributed_test.py b/tests/kafkatest/tests/connect/connect_distributed_test.py
index 698a827..d3ae2e1 100644
--- a/tests/kafkatest/tests/connect/connect_distributed_test.py
+++ b/tests/kafkatest/tests/connect/connect_distributed_test.py
@@ -17,13 +17,14 @@ from ducktape.tests.test import Test
 
 from kafkatest.services.zookeeper import ZookeeperService
 from kafkatest.services.kafka import KafkaService
-from kafkatest.services.connect import ConnectDistributedService, VerifiableSource, VerifiableSink
+from kafkatest.services.connect import ConnectDistributedService, VerifiableSource, VerifiableSink, ConnectRestError
 from kafkatest.services.console_consumer import ConsoleConsumer
 from kafkatest.services.security.security_config import SecurityConfig
 from ducktape.utils.util import wait_until
 from ducktape.mark import matrix
 import subprocess, itertools, time
 from collections import Counter
+import operator
 
 class ConnectDistributedTest(Test):
     """
@@ -73,6 +74,142 @@ class ConnectDistributedTest(Test):
         self.zk.start()
         self.kafka.start()
 
+    def _start_connector(self, config_file):
+        connector_props = self.render(config_file)
+        connector_config = dict([line.strip().split('=', 1) for line in connector_props.split('\n') if line.strip() and not line.strip().startswith('#')])
+        self.cc.create_connector(connector_config)
+            
+    def _connector_status(self, connector, node=None):
+        try:
+            return self.cc.get_connector_status(connector, node)
+        except ConnectRestError:
+            return None
+
+    def _has_state(self, status, state):
+        return status is not None and status['connector']['state'] == state
+
+    def _all_tasks_have_state(self, status, task_count, state):
+        if status is None:
+            return False
+
+        tasks = status['tasks']
+        if len(tasks) != task_count:
+            return False
+
+        return reduce(operator.and_, [task['state'] == state for task in tasks], True)
+
+    def is_running(self, connector, node=None):
+        status = self._connector_status(connector.name, node)
+        return self._has_state(status, 'RUNNING') and self._all_tasks_have_state(status, connector.tasks, 'RUNNING')
+
+    def is_paused(self, connector, node=None):
+        status = self._connector_status(connector.name, node)
+        return self._has_state(status, 'PAUSED') and self._all_tasks_have_state(status, connector.tasks, 'PAUSED')
+
+    def test_pause_and_resume_source(self):
+        """
+        Verify that source connectors stop producing records when paused and begin again after
+        being resumed.
+        """
+
+        self.setup_services()
+        self.cc.set_configs(lambda node: self.render("connect-distributed.properties", node=node))
+        self.cc.start()
+
+        self.source = VerifiableSource(self.cc)
+        self.source.start()
+
+        wait_until(lambda: self.is_running(self.source), timeout_sec=30,
+                   err_msg="Failed to see connector transition to the RUNNING state")
+        
+        self.cc.pause_connector(self.source.name)
+
+        # wait until all nodes report the paused transition
+        for node in self.cc.nodes:
+            wait_until(lambda: self.is_paused(self.source, node), timeout_sec=30,
+                       err_msg="Failed to see connector transition to the PAUSED state")
+
+        # verify that we do not produce new messages while paused
+        num_messages = len(self.source.messages())
+        time.sleep(10)
+        assert num_messages == len(self.source.messages()), "Paused source connector should not produce any messages"
+
+        self.cc.resume_connector(self.source.name)
+
+        for node in self.cc.nodes:
+            wait_until(lambda: self.is_running(self.source, node), timeout_sec=30,
+                       err_msg="Failed to see connector transition to the RUNNING state")
+
+        # after resuming, we should see records produced again
+        wait_until(lambda: len(self.source.messages()) > num_messages, timeout_sec=30,
+                   err_msg="Failed to produce messages after resuming source connector")
+
+    def test_pause_and_resume_sink(self):
+        """
+        Verify that sink connectors stop consuming records when paused and begin again after
+        being resumed.
+        """
+
+        self.setup_services()
+        self.cc.set_configs(lambda node: self.render("connect-distributed.properties", node=node))
+        self.cc.start()
+
+        # use the verifiable source to produce a steady stream of messages
+        self.source = VerifiableSource(self.cc)
+        self.source.start()
+
+        self.sink = VerifiableSink(self.cc)
+        self.sink.start()
+
+        wait_until(lambda: self.is_running(self.sink), timeout_sec=30,
+                   err_msg="Failed to see connector transition to the RUNNING state")
+        
+        self.cc.pause_connector(self.sink.name)
+
+        # wait until all nodes report the paused transition
+        for node in self.cc.nodes:
+            wait_until(lambda: self.is_paused(self.sink, node), timeout_sec=30,
+                       err_msg="Failed to see connector transition to the PAUSED state")
+
+        # verify that we do not consume new messages while paused
+        num_messages = len(self.sink.received_messages())
+        time.sleep(10)
+        assert num_messages == len(self.sink.received_messages()), "Paused sink connector should not consume any messages"
+
+        self.cc.resume_connector(self.sink.name)
+
+        for node in self.cc.nodes:
+            wait_until(lambda: self.is_running(self.sink, node), timeout_sec=30,
+                       err_msg="Failed to see connector transition to the RUNNING state")
+
+        # after resuming, we should see records consumed again
+        wait_until(lambda: len(self.sink.received_messages()) > num_messages, timeout_sec=30,
+                   err_msg="Failed to consume messages after resuming source connector")
+
+
+    def test_pause_state_persistent(self):
+        """
+        Verify that paused state is preserved after a cluster restart.
+        """
+
+        self.setup_services()
+        self.cc.set_configs(lambda node: self.render("connect-distributed.properties", node=node))
+        self.cc.start()
+
+        self.source = VerifiableSource(self.cc)
+        self.source.start()
+
+        wait_until(lambda: self.is_running(self.source), timeout_sec=30,
+                   err_msg="Failed to see connector transition to the RUNNING state")
+        
+        self.cc.pause_connector(self.source.name)
+
+        self.cc.restart()
+
+        # we should still be paused after restarting
+        for node in self.cc.nodes:
+            wait_until(lambda: self.is_paused(self.source, node), timeout_sec=30,
+                       err_msg="Failed to see connector startup in PAUSED state")
 
     @matrix(security_protocol=[SecurityConfig.PLAINTEXT, SecurityConfig.SASL_SSL])
     def test_file_source_and_sink(self, security_protocol):
@@ -87,10 +224,9 @@ class ConnectDistributedTest(Test):
         self.cc.start()
 
         self.logger.info("Creating connectors")
-        for connector_props in [self.render("connect-file-source.properties"), self.render("connect-file-sink.properties")]:
-            connector_config = dict([line.strip().split('=', 1) for line in connector_props.split('\n') if line.strip() and not line.strip().startswith('#')])
-            self.cc.create_connector(connector_config)
-
+        self._start_connector("connect-file-source.properties")
+        self._start_connector("connect-file-sink.properties")
+        
         # Generating data on the source node should generate new records and create new output on the sink node. Timeouts
         # here need to be more generous than they are for standalone mode because a) it takes longer to write configs,
         # do rebalancing of the group, etc, and b) without explicit leave group support, rebalancing takes awhile