You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ew...@apache.org on 2016/05/05 20:12:24 UTC

kafka git commit: KAFKA-3581: add timeouts to joins in background thread services

Repository: kafka
Updated Branches:
  refs/heads/trunk 32bf83e5a -> b6cd0e279


KAFKA-3581: add timeouts to joins in background thread services

This actually removes joins altogether, as well as references to self.worker_threads, which is best left as an implementation detail in BackgroundThreadService.

This makes use of hachikuji 's recent ducktape patch, and updates ducktape dependency to 0.5.0.

Author: Geoff Anderson <ge...@confluent.io>

Reviewers: Jason Gustafson <ja...@confluent.io>, Ewen Cheslack-Postava <ew...@confluent.io>

Closes #1297 from granders/KAFKA-3581-systest-add-join-timeout


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/b6cd0e27
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/b6cd0e27
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/b6cd0e27

Branch: refs/heads/trunk
Commit: b6cd0e2791e0e6a6ef02d069b3001ffb477f7c6c
Parents: 32bf83e
Author: Geoff Anderson <ge...@confluent.io>
Authored: Thu May 5 13:12:11 2016 -0700
Committer: Ewen Cheslack-Postava <me...@ewencp.org>
Committed: Thu May 5 13:12:11 2016 -0700

----------------------------------------------------------------------
 .../sanity_checks/test_verifiable_producer.py   |  2 +-
 tests/kafkatest/services/console_consumer.py    | 11 ++++++++---
 .../kafkatest/services/kafka_log4j_appender.py  |  9 +++------
 .../services/performance/end_to_end_latency.py  |  4 ++--
 .../services/performance/performance.py         | 11 ++++++++++-
 .../performance/producer_performance.py         |  1 +
 .../services/replica_verification_tool.py       |  8 +++++++-
 .../kafkatest/services/simple_consumer_shell.py | 12 +++++-------
 tests/kafkatest/services/verifiable_consumer.py | 20 +++++++++-----------
 tests/kafkatest/services/verifiable_producer.py | 16 +++++++---------
 tests/setup.py                                  |  2 +-
 .../apache/kafka/tools/VerifiableConsumer.java  |  5 -----
 .../apache/kafka/tools/VerifiableProducer.java  |  6 +-----
 13 files changed, 55 insertions(+), 52 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/b6cd0e27/tests/kafkatest/sanity_checks/test_verifiable_producer.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/sanity_checks/test_verifiable_producer.py b/tests/kafkatest/sanity_checks/test_verifiable_producer.py
index e22d422..f1bc2a0 100644
--- a/tests/kafkatest/sanity_checks/test_verifiable_producer.py
+++ b/tests/kafkatest/sanity_checks/test_verifiable_producer.py
@@ -35,7 +35,7 @@ class TestVerifiableProducer(Test):
         self.kafka = KafkaService(test_context, num_nodes=1, zk=self.zk,
                                   topics={self.topic: {"partitions": 1, "replication-factor": 1}})
 
-        self.num_messages = 100
+        self.num_messages = 1000
         # This will produce to source kafka cluster
         self.producer = VerifiableProducer(test_context, num_nodes=1, kafka=self.kafka, topic=self.topic,
                                            max_messages=self.num_messages, throughput=1000)

http://git-wip-us.apache.org/repos/asf/kafka/blob/b6cd0e27/tests/kafkatest/services/console_consumer.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/console_consumer.py b/tests/kafkatest/services/console_consumer.py
index 5a33052..9c7f564 100644
--- a/tests/kafkatest/services/console_consumer.py
+++ b/tests/kafkatest/services/console_consumer.py
@@ -91,7 +91,7 @@ class ConsoleConsumer(JmxMixin, BackgroundThreadService):
     def __init__(self, context, num_nodes, kafka, topic, group_id="test-consumer-group", new_consumer=False,
                  message_validator=None, from_beginning=True, consumer_timeout_ms=None, version=TRUNK,
                  client_id="console-consumer", print_key=False, jmx_object_names=None, jmx_attributes=[],
-                 enable_systest_events=False):
+                 enable_systest_events=False, stop_timeout_sec=15):
         """
         Args:
             context:                    standard context
@@ -108,6 +108,8 @@ class ConsoleConsumer(JmxMixin, BackgroundThreadService):
             print_key                   if True, print each message's key in addition to its value
             enable_systest_events       if True, console consumer will print additional lifecycle-related information
                                         only available in 0.10.0 and later.
+            stop_timeout_sec            After stopping a node, wait up to stop_timeout_sec for the node to stop,
+                                        and the corresponding background thread to finish successfully.
         """
         JmxMixin.__init__(self, num_nodes, jmx_object_names, jmx_attributes)
         BackgroundThreadService.__init__(self, context, num_nodes)
@@ -129,6 +131,7 @@ class ConsoleConsumer(JmxMixin, BackgroundThreadService):
         self.client_id = client_id
         self.print_key = print_key
         self.log_level = "TRACE"
+        self.stop_timeout_sec = stop_timeout_sec
 
         self.enable_systest_events = enable_systest_events
         if self.enable_systest_events:
@@ -259,8 +262,10 @@ class ConsoleConsumer(JmxMixin, BackgroundThreadService):
 
     def stop_node(self, node):
         node.account.kill_process("console_consumer", allow_fail=True)
-        wait_until(lambda: not self.alive(node), timeout_sec=10, backoff_sec=.2,
-                   err_msg="Timed out waiting for consumer to stop.")
+
+        stopped = self.wait_node(node, timeout_sec=self.stop_timeout_sec)
+        assert stopped, "Node %s: did not stop within the specified timeout of %s seconds" % \
+                        (str(node.account), str(self.stop_timeout_sec))
 
     def clean_node(self, node):
         if self.alive(node):

http://git-wip-us.apache.org/repos/asf/kafka/blob/b6cd0e27/tests/kafkatest/services/kafka_log4j_appender.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/kafka_log4j_appender.py b/tests/kafkatest/services/kafka_log4j_appender.py
index 3732bb0..c0af1a1 100644
--- a/tests/kafkatest/services/kafka_log4j_appender.py
+++ b/tests/kafkatest/services/kafka_log4j_appender.py
@@ -67,13 +67,10 @@ class KafkaLog4jAppender(BackgroundThreadService):
 
     def stop_node(self, node):
         node.account.kill_process("VerifiableLog4jAppender", allow_fail=False)
-        if self.worker_threads is None:
-            return
 
-        # block until the corresponding thread exits
-        if len(self.worker_threads) >= self.idx(node):
-            # Need to guard this because stop is preemptively called before the worker threads are added and started
-            self.worker_threads[self.idx(node) - 1].join()
+        stopped = self.wait_node(node, timeout_sec=self.stop_timeout_sec)
+        assert stopped, "Node %s: did not stop within the specified timeout of %s seconds" % \
+                        (str(node.account), str(self.stop_timeout_sec))
 
     def clean_node(self, node):
         node.account.kill_process("VerifiableLog4jAppender", clean_shutdown=False, allow_fail=False)

http://git-wip-us.apache.org/repos/asf/kafka/blob/b6cd0e27/tests/kafkatest/services/performance/end_to_end_latency.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/performance/end_to_end_latency.py b/tests/kafkatest/services/performance/end_to_end_latency.py
index 6d21151..2007d65 100644
--- a/tests/kafkatest/services/performance/end_to_end_latency.py
+++ b/tests/kafkatest/services/performance/end_to_end_latency.py
@@ -17,10 +17,11 @@ from kafkatest.services.performance import PerformanceService
 from kafkatest.services.security.security_config import SecurityConfig
 
 from kafkatest.services.kafka.directory import kafka_dir
-from kafkatest.services.kafka.version import TRUNK, V_0_9_0_0, V_0_10_0_0
+from kafkatest.services.kafka.version import TRUNK, V_0_9_0_0
 
 import os
 
+
 class EndToEndLatencyService(PerformanceService):
     MESSAGE_BYTES = 21  # 0.8.X messages are fixed at 21 bytes, so we'll match that for other versions
 
@@ -45,7 +46,6 @@ class EndToEndLatencyService(PerformanceService):
             "collect_default": True}
     }
 
-
     def __init__(self, context, num_nodes, kafka, topic, num_records, compression_type="none", version=TRUNK, acks=1):
         super(EndToEndLatencyService, self).__init__(context, num_nodes)
         self.kafka = kafka

http://git-wip-us.apache.org/repos/asf/kafka/blob/b6cd0e27/tests/kafkatest/services/performance/performance.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/performance/performance.py b/tests/kafkatest/services/performance/performance.py
index 1eab197..dcc1a32 100644
--- a/tests/kafkatest/services/performance/performance.py
+++ b/tests/kafkatest/services/performance/performance.py
@@ -18,15 +18,24 @@ from ducktape.services.background_thread import BackgroundThreadService
 
 class PerformanceService(BackgroundThreadService):
 
-    def __init__(self, context, num_nodes):
+    def __init__(self, context, num_nodes, stop_timeout_sec=30):
         super(PerformanceService, self).__init__(context, num_nodes)
         self.results = [None] * self.num_nodes
         self.stats = [[] for x in range(self.num_nodes)]
+        self.stop_timeout_sec = stop_timeout_sec
+
+    def stop_node(self, node):
+        node.account.kill_process("java", clean_shutdown=True, allow_fail=True)
+
+        stopped = self.wait_node(node, timeout_sec=self.stop_timeout_sec)
+        assert stopped, "Node %s: did not stop within the specified timeout of %s seconds" % \
+                        (str(node.account), str(self.stop_timeout_sec))
 
     def clean_node(self, node):
         node.account.kill_process("java", clean_shutdown=False, allow_fail=True)
         node.account.ssh("rm -rf /mnt/*", allow_fail=False)
 
+
 def throughput(records_per_sec, mb_per_sec):
     """Helper method to ensure uniform representation of throughput data"""
     return {

http://git-wip-us.apache.org/repos/asf/kafka/blob/b6cd0e27/tests/kafkatest/services/performance/producer_performance.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/performance/producer_performance.py b/tests/kafkatest/services/performance/producer_performance.py
index f4887ed..efd6c09 100644
--- a/tests/kafkatest/services/performance/producer_performance.py
+++ b/tests/kafkatest/services/performance/producer_performance.py
@@ -36,6 +36,7 @@ class ProducerPerformanceService(JmxMixin, PerformanceService):
 
     def __init__(self, context, num_nodes, kafka, topic, num_records, record_size, throughput, version=TRUNK, settings={},
                  intermediate_stats=False, client_id="producer-performance", jmx_object_names=None, jmx_attributes=[]):
+
         JmxMixin.__init__(self, num_nodes, jmx_object_names, jmx_attributes)
         PerformanceService.__init__(self, context, num_nodes)
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/b6cd0e27/tests/kafkatest/services/replica_verification_tool.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/replica_verification_tool.py b/tests/kafkatest/services/replica_verification_tool.py
index f6374fb..7f77049 100644
--- a/tests/kafkatest/services/replica_verification_tool.py
+++ b/tests/kafkatest/services/replica_verification_tool.py
@@ -20,6 +20,7 @@ from kafkatest.services.security.security_config import SecurityConfig
 
 import re
 
+
 class ReplicaVerificationTool(BackgroundThreadService):
 
     logs = {
@@ -28,7 +29,7 @@ class ReplicaVerificationTool(BackgroundThreadService):
             "collect_default": False}
     }
 
-    def __init__(self, context, num_nodes, kafka, topic, report_interval_ms, security_protocol="PLAINTEXT"):
+    def __init__(self, context, num_nodes, kafka, topic, report_interval_ms, security_protocol="PLAINTEXT", stop_timeout_sec=30):
         super(ReplicaVerificationTool, self).__init__(context, num_nodes)
 
         self.kafka = kafka
@@ -37,6 +38,7 @@ class ReplicaVerificationTool(BackgroundThreadService):
         self.security_protocol = security_protocol
         self.security_config = SecurityConfig(security_protocol)
         self.partition_lag = {}
+        self.stop_timeout_sec = stop_timeout_sec
 
     def _worker(self, idx, node):
         cmd = self.start_cmd(node)
@@ -76,6 +78,10 @@ class ReplicaVerificationTool(BackgroundThreadService):
     def stop_node(self, node):
         node.account.kill_process("java", clean_shutdown=True, allow_fail=True)
 
+        stopped = self.wait_node(node, timeout_sec=self.stop_timeout_sec)
+        assert stopped, "Node %s: did not stop within the specified timeout of %s seconds" % \
+                        (str(node.account), str(self.stop_timeout_sec))
+
     def clean_node(self, node):
         node.account.kill_process("java", clean_shutdown=False, allow_fail=True)
         node.account.ssh("rm -rf /mnt/replica_verification_tool.log", allow_fail=False)
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/b6cd0e27/tests/kafkatest/services/simple_consumer_shell.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/simple_consumer_shell.py b/tests/kafkatest/services/simple_consumer_shell.py
index 8deee85..c44540d 100644
--- a/tests/kafkatest/services/simple_consumer_shell.py
+++ b/tests/kafkatest/services/simple_consumer_shell.py
@@ -26,13 +26,14 @@ class SimpleConsumerShell(BackgroundThreadService):
             "collect_default": False}
     }
 
-    def __init__(self, context, num_nodes, kafka, topic, partition=0):
+    def __init__(self, context, num_nodes, kafka, topic, partition=0, stop_timeout_sec=30):
         super(SimpleConsumerShell, self).__init__(context, num_nodes)
 
         self.kafka = kafka
         self.topic = topic
         self.partition = partition
         self.output = ""
+        self.stop_timeout_sec = stop_timeout_sec
 
     def _worker(self, idx, node):
         cmd = self.start_cmd(node)
@@ -56,13 +57,10 @@ class SimpleConsumerShell(BackgroundThreadService):
 
     def stop_node(self, node):
         node.account.kill_process("SimpleConsumerShell", allow_fail=False)
-        if self.worker_threads is None:
-            return
 
-        # block until the corresponding thread exits
-        if len(self.worker_threads) >= self.idx(node):
-            # Need to guard this because stop is preemptively called before the worker threads are added and started
-            self.worker_threads[self.idx(node) - 1].join()
+        stopped = self.wait_node(node, timeout_sec=self.stop_timeout_sec)
+        assert stopped, "Node %s: did not stop within the specified timeout of %s seconds" % \
+                        (str(node.account), str(self.stop_timeout_sec))
 
     def clean_node(self, node):
         node.account.kill_process("SimpleConsumerShell", clean_shutdown=False, allow_fail=False)

http://git-wip-us.apache.org/repos/asf/kafka/blob/b6cd0e27/tests/kafkatest/services/verifiable_consumer.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/verifiable_consumer.py b/tests/kafkatest/services/verifiable_consumer.py
index d97bef3..55304dc 100644
--- a/tests/kafkatest/services/verifiable_consumer.py
+++ b/tests/kafkatest/services/verifiable_consumer.py
@@ -15,22 +15,22 @@
 
 from ducktape.services.background_thread import BackgroundThreadService
 
-from kafkatest.services.kafka.directory import kafka_dir, KAFKA_TRUNK
+from kafkatest.services.kafka.directory import kafka_dir
 from kafkatest.services.kafka.version import TRUNK
-from kafkatest.services.security.security_config import SecurityConfig
 from kafkatest.services.kafka import TopicPartition
 
 import json
 import os
 import signal
 import subprocess
-import time
+
 
 class ConsumerState:
     Dead = 1
     Rebalancing = 3
     Joined = 2
 
+
 class ConsumerEventHandler(object):
 
     def __init__(self, node):
@@ -111,6 +111,7 @@ class ConsumerEventHandler(object):
         else:
             return None
 
+
 class VerifiableConsumer(BackgroundThreadService):
     PERSISTENT_ROOT = "/mnt/verifiable_consumer"
     STDOUT_CAPTURE = os.path.join(PERSISTENT_ROOT, "verifiable_consumer.stdout")
@@ -135,7 +136,7 @@ class VerifiableConsumer(BackgroundThreadService):
     def __init__(self, context, num_nodes, kafka, topic, group_id,
                  max_messages=-1, session_timeout_sec=30, enable_autocommit=False,
                  assignment_strategy="org.apache.kafka.clients.consumer.RangeAssignor",
-                 version=TRUNK):
+                 version=TRUNK, stop_timeout_sec=30):
         super(VerifiableConsumer, self).__init__(context, num_nodes)
         self.log_level = "TRACE"
         
@@ -149,6 +150,7 @@ class VerifiableConsumer(BackgroundThreadService):
         self.prop_file = ""
         self.security_config = kafka.security_config.client_config(self.prop_file)
         self.prop_file += str(self.security_config)
+        self.stop_timeout_sec = stop_timeout_sec
 
         self.event_handlers = {}
         self.global_position = {}
@@ -268,14 +270,10 @@ class VerifiableConsumer(BackgroundThreadService):
 
     def stop_node(self, node, clean_shutdown=True):
         self.kill_node(node, clean_shutdown=clean_shutdown)
-        
-        if self.worker_threads is None:
-            return
 
-        # block until the corresponding thread exits
-        if len(self.worker_threads) >= self.idx(node):
-            # Need to guard this because stop is preemptively called before the worker threads are added and started
-            self.worker_threads[self.idx(node) - 1].join()
+        stopped = self.wait_node(node, timeout_sec=self.stop_timeout_sec)
+        assert stopped, "Node %s: did not stop within the specified timeout of %s seconds" % \
+                        (str(node.account), str(self.stop_timeout_sec))
 
     def clean_node(self, node):
         self.kill_node(node, clean_shutdown=False)

http://git-wip-us.apache.org/repos/asf/kafka/blob/b6cd0e27/tests/kafkatest/services/verifiable_producer.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/verifiable_producer.py b/tests/kafkatest/services/verifiable_producer.py
index 4fec776..a6a1bd9 100644
--- a/tests/kafkatest/services/verifiable_producer.py
+++ b/tests/kafkatest/services/verifiable_producer.py
@@ -43,7 +43,8 @@ class VerifiableProducer(BackgroundThreadService):
         }
 
     def __init__(self, context, num_nodes, kafka, topic, max_messages=-1, throughput=100000,
-                 message_validator=is_int, compression_types=None, version=TRUNK, acks=None):
+                 message_validator=is_int, compression_types=None, version=TRUNK, acks=None,
+                 stop_timeout_sec=150):
         """
         :param max_messages is a number of messages to be produced per producer
         :param message_validator checks for an expected format of messages produced. There are
@@ -73,7 +74,7 @@ class VerifiableProducer(BackgroundThreadService):
         self.produced_count = {}
         self.clean_shutdown_nodes = set()
         self.acks = acks
-
+        self.stop_timeout_sec = stop_timeout_sec
 
     @property
     def security_config(self):
@@ -220,14 +221,11 @@ class VerifiableProducer(BackgroundThreadService):
             return True
 
     def stop_node(self, node):
-        self.kill_node(node, clean_shutdown=False, allow_fail=False)
-        if self.worker_threads is None:
-            return
+        self.kill_node(node, clean_shutdown=True, allow_fail=False)
 
-        # block until the corresponding thread exits
-        if len(self.worker_threads) >= self.idx(node):
-            # Need to guard this because stop is preemptively called before the worker threads are added and started
-            self.worker_threads[self.idx(node) - 1].join()
+        stopped = self.wait_node(node, timeout_sec=self.stop_timeout_sec)
+        assert stopped, "Node %s: did not stop within the specified timeout of %s seconds" % \
+                        (str(node.account), str(self.stop_timeout_sec))
 
     def clean_node(self, node):
         self.kill_node(node, clean_shutdown=False, allow_fail=False)

http://git-wip-us.apache.org/repos/asf/kafka/blob/b6cd0e27/tests/setup.py
----------------------------------------------------------------------
diff --git a/tests/setup.py b/tests/setup.py
index de3ea62..910c0a2 100644
--- a/tests/setup.py
+++ b/tests/setup.py
@@ -30,5 +30,5 @@ setup(name="kafkatest",
       license="apache2.0",
       packages=find_packages(),
       include_package_data=True,
-      install_requires=["ducktape==0.4.0", "requests>=2.5.0"]
+      install_requires=["ducktape==0.5.0", "requests>=2.5.0"]
       )

http://git-wip-us.apache.org/repos/asf/kafka/blob/b6cd0e27/tools/src/main/java/org/apache/kafka/tools/VerifiableConsumer.java
----------------------------------------------------------------------
diff --git a/tools/src/main/java/org/apache/kafka/tools/VerifiableConsumer.java b/tools/src/main/java/org/apache/kafka/tools/VerifiableConsumer.java
index 1880d7a..8db442e 100644
--- a/tools/src/main/java/org/apache/kafka/tools/VerifiableConsumer.java
+++ b/tools/src/main/java/org/apache/kafka/tools/VerifiableConsumer.java
@@ -265,11 +265,6 @@ public class VerifiableConsumer implements Closeable, OffsetCommitCallback, Cons
         public long timestamp() {
             return timestamp;
         }
-
-        @JsonProperty("class")
-        public String clazz() {
-            return VerifiableConsumer.class.getName();
-        }
     }
 
     private static class ShutdownComplete extends ConsumerEvent {

http://git-wip-us.apache.org/repos/asf/kafka/blob/b6cd0e27/tools/src/main/java/org/apache/kafka/tools/VerifiableProducer.java
----------------------------------------------------------------------
diff --git a/tools/src/main/java/org/apache/kafka/tools/VerifiableProducer.java b/tools/src/main/java/org/apache/kafka/tools/VerifiableProducer.java
index b511fb9..30f08e8 100644
--- a/tools/src/main/java/org/apache/kafka/tools/VerifiableProducer.java
+++ b/tools/src/main/java/org/apache/kafka/tools/VerifiableProducer.java
@@ -84,7 +84,7 @@ public class VerifiableProducer {
         this.topic = topic;
         this.throughput = throughput;
         this.maxMessages = maxMessages;
-        this.producer = new KafkaProducer<String, String>(producerProps);
+        this.producer = new KafkaProducer<>(producerProps);
         this.valuePrefix = valuePrefix;
     }
 
@@ -252,7 +252,6 @@ public class VerifiableProducer {
 
     String shutdownString() {
         Map<String, Object> data = new HashMap<>();
-        data.put("class", this.getClass().toString());
         data.put("name", "shutdown_complete");
         return toJsonString(data);
     }
@@ -265,7 +264,6 @@ public class VerifiableProducer {
         assert e != null : "Expected non-null exception.";
 
         Map<String, Object> errorData = new HashMap<>();
-        errorData.put("class", this.getClass().toString());
         errorData.put("name", "producer_send_error");
 
         errorData.put("time_ms", nowMs);
@@ -282,7 +280,6 @@ public class VerifiableProducer {
         assert recordMetadata != null : "Expected non-null recordMetadata object.";
 
         Map<String, Object> successData = new HashMap<>();
-        successData.put("class", this.getClass().toString());
         successData.put("name", "producer_send_success");
 
         successData.put("time_ms", nowMs);
@@ -349,7 +346,6 @@ public class VerifiableProducer {
                 double avgThroughput = 1000 * ((producer.numAcked) / (double) (stopMs - startMs));
 
                 Map<String, Object> data = new HashMap<>();
-                data.put("class", producer.getClass().toString());
                 data.put("name", "tool_data");
                 data.put("sent", producer.numSent);
                 data.put("acked", producer.numAcked);