You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by rh...@apache.org on 2020/07/20 14:01:24 UTC

[kafka] branch 2.4 updated (2eb10ec -> a39f6a8)

This is an automated email from the ASF dual-hosted git repository.

rhauch pushed a change to branch 2.4
in repository https://gitbox.apache.org/repos/asf/kafka.git.


    from 2eb10ec  KAFKA-10160: Kafka MM2 consumer configuration (#8921)
     new 88b9c1a  KAFKA-10286: Connect system tests should wait for workers to join group (#9040)
     new a39f6a8  KAFKA-10295: Wait for connector recovery in test_bounce (#9043)

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 tests/kafkatest/services/connect.py                | 25 ++++++++++++++++++----
 .../tests/connect/connect_distributed_test.py      | 14 ++++++++++++
 2 files changed, 35 insertions(+), 4 deletions(-)


[kafka] 02/02: KAFKA-10295: Wait for connector recovery in test_bounce (#9043)

Posted by rh...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

rhauch pushed a commit to branch 2.4
in repository https://gitbox.apache.org/repos/asf/kafka.git

commit a39f6a80ff748309ce5e9d12bd849cb7abf3669f
Author: Greg Harris <gr...@confluent.io>
AuthorDate: Mon Jul 20 06:50:05 2020 -0700

    KAFKA-10295: Wait for connector recovery in test_bounce (#9043)
    
    Signed-off-by: Greg Harris <gr...@confluent.io>
---
 tests/kafkatest/tests/connect/connect_distributed_test.py | 14 ++++++++++++++
 1 file changed, 14 insertions(+)

diff --git a/tests/kafkatest/tests/connect/connect_distributed_test.py b/tests/kafkatest/tests/connect/connect_distributed_test.py
index 244bc64..107c0d4 100644
--- a/tests/kafkatest/tests/connect/connect_distributed_test.py
+++ b/tests/kafkatest/tests/connect/connect_distributed_test.py
@@ -387,8 +387,22 @@ class ConnectDistributedTest(Test):
                 # through the test.
                 time.sleep(15)
 
+        # Wait at least scheduled.rebalance.max.delay.ms to expire and rebalance
+        time.sleep(60)
 
+        # Allow the connectors to startup, recover, and exit cleanly before
+        # ending the test. It's possible for the source connector to make
+        # uncommitted progress, and for the sink connector to read messages that
+        # have not been committed yet, and fail a later assertion.
+        wait_until(lambda: self.is_running(self.source), timeout_sec=30,
+                   err_msg="Failed to see connector transition to the RUNNING state")
+        time.sleep(15)
         self.source.stop()
+        # Ensure that the sink connector has an opportunity to read all
+        # committed messages from the source connector.
+        wait_until(lambda: self.is_running(self.sink), timeout_sec=30,
+                   err_msg="Failed to see connector transition to the RUNNING state")
+        time.sleep(15)
         self.sink.stop()
         self.cc.stop()
 


[kafka] 01/02: KAFKA-10286: Connect system tests should wait for workers to join group (#9040)

Posted by rh...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

rhauch pushed a commit to branch 2.4
in repository https://gitbox.apache.org/repos/asf/kafka.git

commit 88b9c1a9bc2a2874c1949112bc0491cd1d434385
Author: Greg Harris <gr...@confluent.io>
AuthorDate: Mon Jul 20 06:48:02 2020 -0700

    KAFKA-10286: Connect system tests should wait for workers to join group (#9040)
    
    Currently, the system tests `connect_distributed_test` and `connect_rest_test` only wait for the REST api to come up.
    The startup of the worker includes an asynchronous process for joining the worker group and syncing with other workers.
    There are some situations in which this sync takes an unusually long time, and the test continues without all workers up.
    This leads to flakey test failures, as worker joins are not given sufficient time to timeout and retry without waiting explicitly.
    
    This changes the `ConnectDistributedTest` to wait for the Joined group message to be printed to the logs before continuing with tests. I've activated this behavior by default, as it's a superset of the checks that were performed by default before.
    
    This log message is present in every version of DistributedHerder that I could find, in slightly different forms, but always with `Joined group` at the beginning of the log message. This change should be safe to backport to any branch.
    
    Signed-off-by: Greg Harris <gr...@confluent.io>
    Author: Greg Harris <gr...@confluent.io>
    Reviewer: Randall Hauch <rh...@gmail.com>
---
 tests/kafkatest/services/connect.py | 25 +++++++++++++++++++++----
 1 file changed, 21 insertions(+), 4 deletions(-)

diff --git a/tests/kafkatest/services/connect.py b/tests/kafkatest/services/connect.py
index 96e0d54..c97c8d0 100644
--- a/tests/kafkatest/services/connect.py
+++ b/tests/kafkatest/services/connect.py
@@ -43,13 +43,15 @@ class ConnectServiceBase(KafkaPathResolverMixin, Service):
     CONNECT_REST_PORT = 8083
     HEAP_DUMP_FILE = os.path.join(PERSISTENT_ROOT, "connect_heap_dump.bin")
 
-    # Currently the Connect worker supports waiting on three modes:
+    # Currently the Connect worker supports waiting on four modes:
     STARTUP_MODE_INSTANT = 'INSTANT'
     """STARTUP_MODE_INSTANT: Start Connect worker and return immediately"""
     STARTUP_MODE_LOAD = 'LOAD'
     """STARTUP_MODE_LOAD: Start Connect worker and return after discovering and loading plugins"""
     STARTUP_MODE_LISTEN = 'LISTEN'
     """STARTUP_MODE_LISTEN: Start Connect worker and return after opening the REST port."""
+    STARTUP_MODE_JOIN = 'JOIN'
+    """STARTUP_MODE_JOIN: Start Connect worker and return after joining the group."""
 
     logs = {
         "connect_log": {
@@ -114,8 +116,9 @@ class ConnectServiceBase(KafkaPathResolverMixin, Service):
             self.logger.debug("REST resources are not loaded yet")
             return False
 
-    def start(self, mode=STARTUP_MODE_LISTEN):
-        self.startup_mode = mode
+    def start(self, mode=None):
+        if mode:
+            self.startup_mode = mode
         super(ConnectServiceBase, self).start()
 
     def start_and_return_immediately(self, node, worker_type, remote_connector_configs):
@@ -136,6 +139,15 @@ class ConnectServiceBase(KafkaPathResolverMixin, Service):
                    err_msg="Kafka Connect failed to start on node: %s in condition mode: %s" %
                    (str(node.account), self.startup_mode))
 
+    def start_and_wait_to_join_group(self, node, worker_type, remote_connector_configs):
+        if worker_type != 'distributed':
+            raise RuntimeError("Cannot wait for joined group message for %s" % worker_type)
+        with node.account.monitor_log(self.LOG_FILE) as monitor:
+            self.start_and_return_immediately(node, worker_type, remote_connector_configs)
+            monitor.wait_until('Joined group', timeout_sec=self.startup_timeout_sec,
+                               err_msg="Never saw message indicating Kafka Connect joined group on node: " +
+                                       "%s in condition mode: %s" % (str(node.account), self.startup_mode))
+
     def stop_node(self, node, clean_shutdown=True):
         self.logger.info((clean_shutdown and "Cleanly" or "Forcibly") + " stopping Kafka Connect on " + str(node.account))
         pids = self.pids(node)
@@ -307,6 +319,8 @@ class ConnectStandaloneService(ConnectServiceBase):
             self.start_and_wait_to_load_plugins(node, 'standalone', remote_connector_configs)
         elif self.startup_mode == self.STARTUP_MODE_INSTANT:
             self.start_and_return_immediately(node, 'standalone', remote_connector_configs)
+        elif self.startup_mode == self.STARTUP_MODE_JOIN:
+            self.start_and_wait_to_join_group(node, 'standalone', remote_connector_configs)
         else:
             # The default mode is to wait until the complete startup of the worker
             self.start_and_wait_to_start_listening(node, 'standalone', remote_connector_configs)
@@ -321,6 +335,7 @@ class ConnectDistributedService(ConnectServiceBase):
     def __init__(self, context, num_nodes, kafka, files, offsets_topic="connect-offsets",
                  configs_topic="connect-configs", status_topic="connect-status", startup_timeout_sec = 60):
         super(ConnectDistributedService, self).__init__(context, num_nodes, kafka, files, startup_timeout_sec)
+        self.startup_mode = self.STARTUP_MODE_JOIN
         self.offsets_topic = offsets_topic
         self.configs_topic = configs_topic
         self.status_topic = status_topic
@@ -354,9 +369,11 @@ class ConnectDistributedService(ConnectServiceBase):
             self.start_and_wait_to_load_plugins(node, 'distributed', '')
         elif self.startup_mode == self.STARTUP_MODE_INSTANT:
             self.start_and_return_immediately(node, 'distributed', '')
+        elif self.startup_mode == self.STARTUP_MODE_LISTEN:
+            self.start_and_wait_to_start_listening(node, 'distributed', '')
         else:
             # The default mode is to wait until the complete startup of the worker
-            self.start_and_wait_to_start_listening(node, 'distributed', '')
+            self.start_and_wait_to_join_group(node, 'distributed', '')
 
         if len(self.pids(node)) == 0:
             raise RuntimeError("No process ids recorded")