You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ew...@apache.org on 2016/05/05 20:12:24 UTC
kafka git commit: KAFKA-3581: add timeouts to joins in background
thread services
Repository: kafka
Updated Branches:
refs/heads/trunk 32bf83e5a -> b6cd0e279
KAFKA-3581: add timeouts to joins in background thread services
This actually removes joins altogether, as well as references to self.worker_threads, which is best left as an implementation detail in BackgroundThreadService.
This makes use of hachikuji 's recent ducktape patch, and updates ducktape dependency to 0.5.0.
Author: Geoff Anderson <ge...@confluent.io>
Reviewers: Jason Gustafson <ja...@confluent.io>, Ewen Cheslack-Postava <ew...@confluent.io>
Closes #1297 from granders/KAFKA-3581-systest-add-join-timeout
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/b6cd0e27
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/b6cd0e27
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/b6cd0e27
Branch: refs/heads/trunk
Commit: b6cd0e2791e0e6a6ef02d069b3001ffb477f7c6c
Parents: 32bf83e
Author: Geoff Anderson <ge...@confluent.io>
Authored: Thu May 5 13:12:11 2016 -0700
Committer: Ewen Cheslack-Postava <me...@ewencp.org>
Committed: Thu May 5 13:12:11 2016 -0700
----------------------------------------------------------------------
.../sanity_checks/test_verifiable_producer.py | 2 +-
tests/kafkatest/services/console_consumer.py | 11 ++++++++---
.../kafkatest/services/kafka_log4j_appender.py | 9 +++------
.../services/performance/end_to_end_latency.py | 4 ++--
.../services/performance/performance.py | 11 ++++++++++-
.../performance/producer_performance.py | 1 +
.../services/replica_verification_tool.py | 8 +++++++-
.../kafkatest/services/simple_consumer_shell.py | 12 +++++-------
tests/kafkatest/services/verifiable_consumer.py | 20 +++++++++-----------
tests/kafkatest/services/verifiable_producer.py | 16 +++++++---------
tests/setup.py | 2 +-
.../apache/kafka/tools/VerifiableConsumer.java | 5 -----
.../apache/kafka/tools/VerifiableProducer.java | 6 +-----
13 files changed, 55 insertions(+), 52 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/b6cd0e27/tests/kafkatest/sanity_checks/test_verifiable_producer.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/sanity_checks/test_verifiable_producer.py b/tests/kafkatest/sanity_checks/test_verifiable_producer.py
index e22d422..f1bc2a0 100644
--- a/tests/kafkatest/sanity_checks/test_verifiable_producer.py
+++ b/tests/kafkatest/sanity_checks/test_verifiable_producer.py
@@ -35,7 +35,7 @@ class TestVerifiableProducer(Test):
self.kafka = KafkaService(test_context, num_nodes=1, zk=self.zk,
topics={self.topic: {"partitions": 1, "replication-factor": 1}})
- self.num_messages = 100
+ self.num_messages = 1000
# This will produce to source kafka cluster
self.producer = VerifiableProducer(test_context, num_nodes=1, kafka=self.kafka, topic=self.topic,
max_messages=self.num_messages, throughput=1000)
http://git-wip-us.apache.org/repos/asf/kafka/blob/b6cd0e27/tests/kafkatest/services/console_consumer.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/console_consumer.py b/tests/kafkatest/services/console_consumer.py
index 5a33052..9c7f564 100644
--- a/tests/kafkatest/services/console_consumer.py
+++ b/tests/kafkatest/services/console_consumer.py
@@ -91,7 +91,7 @@ class ConsoleConsumer(JmxMixin, BackgroundThreadService):
def __init__(self, context, num_nodes, kafka, topic, group_id="test-consumer-group", new_consumer=False,
message_validator=None, from_beginning=True, consumer_timeout_ms=None, version=TRUNK,
client_id="console-consumer", print_key=False, jmx_object_names=None, jmx_attributes=[],
- enable_systest_events=False):
+ enable_systest_events=False, stop_timeout_sec=15):
"""
Args:
context: standard context
@@ -108,6 +108,8 @@ class ConsoleConsumer(JmxMixin, BackgroundThreadService):
print_key if True, print each message's key in addition to its value
enable_systest_events if True, console consumer will print additional lifecycle-related information
only available in 0.10.0 and later.
+ stop_timeout_sec After stopping a node, wait up to stop_timeout_sec for the node to stop,
+ and the corresponding background thread to finish successfully.
"""
JmxMixin.__init__(self, num_nodes, jmx_object_names, jmx_attributes)
BackgroundThreadService.__init__(self, context, num_nodes)
@@ -129,6 +131,7 @@ class ConsoleConsumer(JmxMixin, BackgroundThreadService):
self.client_id = client_id
self.print_key = print_key
self.log_level = "TRACE"
+ self.stop_timeout_sec = stop_timeout_sec
self.enable_systest_events = enable_systest_events
if self.enable_systest_events:
@@ -259,8 +262,10 @@ class ConsoleConsumer(JmxMixin, BackgroundThreadService):
def stop_node(self, node):
node.account.kill_process("console_consumer", allow_fail=True)
- wait_until(lambda: not self.alive(node), timeout_sec=10, backoff_sec=.2,
- err_msg="Timed out waiting for consumer to stop.")
+
+ stopped = self.wait_node(node, timeout_sec=self.stop_timeout_sec)
+ assert stopped, "Node %s: did not stop within the specified timeout of %s seconds" % \
+ (str(node.account), str(self.stop_timeout_sec))
def clean_node(self, node):
if self.alive(node):
http://git-wip-us.apache.org/repos/asf/kafka/blob/b6cd0e27/tests/kafkatest/services/kafka_log4j_appender.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/kafka_log4j_appender.py b/tests/kafkatest/services/kafka_log4j_appender.py
index 3732bb0..c0af1a1 100644
--- a/tests/kafkatest/services/kafka_log4j_appender.py
+++ b/tests/kafkatest/services/kafka_log4j_appender.py
@@ -67,13 +67,10 @@ class KafkaLog4jAppender(BackgroundThreadService):
def stop_node(self, node):
node.account.kill_process("VerifiableLog4jAppender", allow_fail=False)
- if self.worker_threads is None:
- return
- # block until the corresponding thread exits
- if len(self.worker_threads) >= self.idx(node):
- # Need to guard this because stop is preemptively called before the worker threads are added and started
- self.worker_threads[self.idx(node) - 1].join()
+ stopped = self.wait_node(node, timeout_sec=self.stop_timeout_sec)
+ assert stopped, "Node %s: did not stop within the specified timeout of %s seconds" % \
+ (str(node.account), str(self.stop_timeout_sec))
def clean_node(self, node):
node.account.kill_process("VerifiableLog4jAppender", clean_shutdown=False, allow_fail=False)
http://git-wip-us.apache.org/repos/asf/kafka/blob/b6cd0e27/tests/kafkatest/services/performance/end_to_end_latency.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/performance/end_to_end_latency.py b/tests/kafkatest/services/performance/end_to_end_latency.py
index 6d21151..2007d65 100644
--- a/tests/kafkatest/services/performance/end_to_end_latency.py
+++ b/tests/kafkatest/services/performance/end_to_end_latency.py
@@ -17,10 +17,11 @@ from kafkatest.services.performance import PerformanceService
from kafkatest.services.security.security_config import SecurityConfig
from kafkatest.services.kafka.directory import kafka_dir
-from kafkatest.services.kafka.version import TRUNK, V_0_9_0_0, V_0_10_0_0
+from kafkatest.services.kafka.version import TRUNK, V_0_9_0_0
import os
+
class EndToEndLatencyService(PerformanceService):
MESSAGE_BYTES = 21 # 0.8.X messages are fixed at 21 bytes, so we'll match that for other versions
@@ -45,7 +46,6 @@ class EndToEndLatencyService(PerformanceService):
"collect_default": True}
}
-
def __init__(self, context, num_nodes, kafka, topic, num_records, compression_type="none", version=TRUNK, acks=1):
super(EndToEndLatencyService, self).__init__(context, num_nodes)
self.kafka = kafka
http://git-wip-us.apache.org/repos/asf/kafka/blob/b6cd0e27/tests/kafkatest/services/performance/performance.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/performance/performance.py b/tests/kafkatest/services/performance/performance.py
index 1eab197..dcc1a32 100644
--- a/tests/kafkatest/services/performance/performance.py
+++ b/tests/kafkatest/services/performance/performance.py
@@ -18,15 +18,24 @@ from ducktape.services.background_thread import BackgroundThreadService
class PerformanceService(BackgroundThreadService):
- def __init__(self, context, num_nodes):
+ def __init__(self, context, num_nodes, stop_timeout_sec=30):
super(PerformanceService, self).__init__(context, num_nodes)
self.results = [None] * self.num_nodes
self.stats = [[] for x in range(self.num_nodes)]
+ self.stop_timeout_sec = stop_timeout_sec
+
+ def stop_node(self, node):
+ node.account.kill_process("java", clean_shutdown=True, allow_fail=True)
+
+ stopped = self.wait_node(node, timeout_sec=self.stop_timeout_sec)
+ assert stopped, "Node %s: did not stop within the specified timeout of %s seconds" % \
+ (str(node.account), str(self.stop_timeout_sec))
def clean_node(self, node):
node.account.kill_process("java", clean_shutdown=False, allow_fail=True)
node.account.ssh("rm -rf /mnt/*", allow_fail=False)
+
def throughput(records_per_sec, mb_per_sec):
"""Helper method to ensure uniform representation of throughput data"""
return {
http://git-wip-us.apache.org/repos/asf/kafka/blob/b6cd0e27/tests/kafkatest/services/performance/producer_performance.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/performance/producer_performance.py b/tests/kafkatest/services/performance/producer_performance.py
index f4887ed..efd6c09 100644
--- a/tests/kafkatest/services/performance/producer_performance.py
+++ b/tests/kafkatest/services/performance/producer_performance.py
@@ -36,6 +36,7 @@ class ProducerPerformanceService(JmxMixin, PerformanceService):
def __init__(self, context, num_nodes, kafka, topic, num_records, record_size, throughput, version=TRUNK, settings={},
intermediate_stats=False, client_id="producer-performance", jmx_object_names=None, jmx_attributes=[]):
+
JmxMixin.__init__(self, num_nodes, jmx_object_names, jmx_attributes)
PerformanceService.__init__(self, context, num_nodes)
http://git-wip-us.apache.org/repos/asf/kafka/blob/b6cd0e27/tests/kafkatest/services/replica_verification_tool.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/replica_verification_tool.py b/tests/kafkatest/services/replica_verification_tool.py
index f6374fb..7f77049 100644
--- a/tests/kafkatest/services/replica_verification_tool.py
+++ b/tests/kafkatest/services/replica_verification_tool.py
@@ -20,6 +20,7 @@ from kafkatest.services.security.security_config import SecurityConfig
import re
+
class ReplicaVerificationTool(BackgroundThreadService):
logs = {
@@ -28,7 +29,7 @@ class ReplicaVerificationTool(BackgroundThreadService):
"collect_default": False}
}
- def __init__(self, context, num_nodes, kafka, topic, report_interval_ms, security_protocol="PLAINTEXT"):
+ def __init__(self, context, num_nodes, kafka, topic, report_interval_ms, security_protocol="PLAINTEXT", stop_timeout_sec=30):
super(ReplicaVerificationTool, self).__init__(context, num_nodes)
self.kafka = kafka
@@ -37,6 +38,7 @@ class ReplicaVerificationTool(BackgroundThreadService):
self.security_protocol = security_protocol
self.security_config = SecurityConfig(security_protocol)
self.partition_lag = {}
+ self.stop_timeout_sec = stop_timeout_sec
def _worker(self, idx, node):
cmd = self.start_cmd(node)
@@ -76,6 +78,10 @@ class ReplicaVerificationTool(BackgroundThreadService):
def stop_node(self, node):
node.account.kill_process("java", clean_shutdown=True, allow_fail=True)
+ stopped = self.wait_node(node, timeout_sec=self.stop_timeout_sec)
+ assert stopped, "Node %s: did not stop within the specified timeout of %s seconds" % \
+ (str(node.account), str(self.stop_timeout_sec))
+
def clean_node(self, node):
node.account.kill_process("java", clean_shutdown=False, allow_fail=True)
node.account.ssh("rm -rf /mnt/replica_verification_tool.log", allow_fail=False)
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/kafka/blob/b6cd0e27/tests/kafkatest/services/simple_consumer_shell.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/simple_consumer_shell.py b/tests/kafkatest/services/simple_consumer_shell.py
index 8deee85..c44540d 100644
--- a/tests/kafkatest/services/simple_consumer_shell.py
+++ b/tests/kafkatest/services/simple_consumer_shell.py
@@ -26,13 +26,14 @@ class SimpleConsumerShell(BackgroundThreadService):
"collect_default": False}
}
- def __init__(self, context, num_nodes, kafka, topic, partition=0):
+ def __init__(self, context, num_nodes, kafka, topic, partition=0, stop_timeout_sec=30):
super(SimpleConsumerShell, self).__init__(context, num_nodes)
self.kafka = kafka
self.topic = topic
self.partition = partition
self.output = ""
+ self.stop_timeout_sec = stop_timeout_sec
def _worker(self, idx, node):
cmd = self.start_cmd(node)
@@ -56,13 +57,10 @@ class SimpleConsumerShell(BackgroundThreadService):
def stop_node(self, node):
node.account.kill_process("SimpleConsumerShell", allow_fail=False)
- if self.worker_threads is None:
- return
- # block until the corresponding thread exits
- if len(self.worker_threads) >= self.idx(node):
- # Need to guard this because stop is preemptively called before the worker threads are added and started
- self.worker_threads[self.idx(node) - 1].join()
+ stopped = self.wait_node(node, timeout_sec=self.stop_timeout_sec)
+ assert stopped, "Node %s: did not stop within the specified timeout of %s seconds" % \
+ (str(node.account), str(self.stop_timeout_sec))
def clean_node(self, node):
node.account.kill_process("SimpleConsumerShell", clean_shutdown=False, allow_fail=False)
http://git-wip-us.apache.org/repos/asf/kafka/blob/b6cd0e27/tests/kafkatest/services/verifiable_consumer.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/verifiable_consumer.py b/tests/kafkatest/services/verifiable_consumer.py
index d97bef3..55304dc 100644
--- a/tests/kafkatest/services/verifiable_consumer.py
+++ b/tests/kafkatest/services/verifiable_consumer.py
@@ -15,22 +15,22 @@
from ducktape.services.background_thread import BackgroundThreadService
-from kafkatest.services.kafka.directory import kafka_dir, KAFKA_TRUNK
+from kafkatest.services.kafka.directory import kafka_dir
from kafkatest.services.kafka.version import TRUNK
-from kafkatest.services.security.security_config import SecurityConfig
from kafkatest.services.kafka import TopicPartition
import json
import os
import signal
import subprocess
-import time
+
class ConsumerState:
Dead = 1
Rebalancing = 3
Joined = 2
+
class ConsumerEventHandler(object):
def __init__(self, node):
@@ -111,6 +111,7 @@ class ConsumerEventHandler(object):
else:
return None
+
class VerifiableConsumer(BackgroundThreadService):
PERSISTENT_ROOT = "/mnt/verifiable_consumer"
STDOUT_CAPTURE = os.path.join(PERSISTENT_ROOT, "verifiable_consumer.stdout")
@@ -135,7 +136,7 @@ class VerifiableConsumer(BackgroundThreadService):
def __init__(self, context, num_nodes, kafka, topic, group_id,
max_messages=-1, session_timeout_sec=30, enable_autocommit=False,
assignment_strategy="org.apache.kafka.clients.consumer.RangeAssignor",
- version=TRUNK):
+ version=TRUNK, stop_timeout_sec=30):
super(VerifiableConsumer, self).__init__(context, num_nodes)
self.log_level = "TRACE"
@@ -149,6 +150,7 @@ class VerifiableConsumer(BackgroundThreadService):
self.prop_file = ""
self.security_config = kafka.security_config.client_config(self.prop_file)
self.prop_file += str(self.security_config)
+ self.stop_timeout_sec = stop_timeout_sec
self.event_handlers = {}
self.global_position = {}
@@ -268,14 +270,10 @@ class VerifiableConsumer(BackgroundThreadService):
def stop_node(self, node, clean_shutdown=True):
self.kill_node(node, clean_shutdown=clean_shutdown)
-
- if self.worker_threads is None:
- return
- # block until the corresponding thread exits
- if len(self.worker_threads) >= self.idx(node):
- # Need to guard this because stop is preemptively called before the worker threads are added and started
- self.worker_threads[self.idx(node) - 1].join()
+ stopped = self.wait_node(node, timeout_sec=self.stop_timeout_sec)
+ assert stopped, "Node %s: did not stop within the specified timeout of %s seconds" % \
+ (str(node.account), str(self.stop_timeout_sec))
def clean_node(self, node):
self.kill_node(node, clean_shutdown=False)
http://git-wip-us.apache.org/repos/asf/kafka/blob/b6cd0e27/tests/kafkatest/services/verifiable_producer.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/verifiable_producer.py b/tests/kafkatest/services/verifiable_producer.py
index 4fec776..a6a1bd9 100644
--- a/tests/kafkatest/services/verifiable_producer.py
+++ b/tests/kafkatest/services/verifiable_producer.py
@@ -43,7 +43,8 @@ class VerifiableProducer(BackgroundThreadService):
}
def __init__(self, context, num_nodes, kafka, topic, max_messages=-1, throughput=100000,
- message_validator=is_int, compression_types=None, version=TRUNK, acks=None):
+ message_validator=is_int, compression_types=None, version=TRUNK, acks=None,
+ stop_timeout_sec=150):
"""
:param max_messages is a number of messages to be produced per producer
:param message_validator checks for an expected format of messages produced. There are
@@ -73,7 +74,7 @@ class VerifiableProducer(BackgroundThreadService):
self.produced_count = {}
self.clean_shutdown_nodes = set()
self.acks = acks
-
+ self.stop_timeout_sec = stop_timeout_sec
@property
def security_config(self):
@@ -220,14 +221,11 @@ class VerifiableProducer(BackgroundThreadService):
return True
def stop_node(self, node):
- self.kill_node(node, clean_shutdown=False, allow_fail=False)
- if self.worker_threads is None:
- return
+ self.kill_node(node, clean_shutdown=True, allow_fail=False)
- # block until the corresponding thread exits
- if len(self.worker_threads) >= self.idx(node):
- # Need to guard this because stop is preemptively called before the worker threads are added and started
- self.worker_threads[self.idx(node) - 1].join()
+ stopped = self.wait_node(node, timeout_sec=self.stop_timeout_sec)
+ assert stopped, "Node %s: did not stop within the specified timeout of %s seconds" % \
+ (str(node.account), str(self.stop_timeout_sec))
def clean_node(self, node):
self.kill_node(node, clean_shutdown=False, allow_fail=False)
http://git-wip-us.apache.org/repos/asf/kafka/blob/b6cd0e27/tests/setup.py
----------------------------------------------------------------------
diff --git a/tests/setup.py b/tests/setup.py
index de3ea62..910c0a2 100644
--- a/tests/setup.py
+++ b/tests/setup.py
@@ -30,5 +30,5 @@ setup(name="kafkatest",
license="apache2.0",
packages=find_packages(),
include_package_data=True,
- install_requires=["ducktape==0.4.0", "requests>=2.5.0"]
+ install_requires=["ducktape==0.5.0", "requests>=2.5.0"]
)
http://git-wip-us.apache.org/repos/asf/kafka/blob/b6cd0e27/tools/src/main/java/org/apache/kafka/tools/VerifiableConsumer.java
----------------------------------------------------------------------
diff --git a/tools/src/main/java/org/apache/kafka/tools/VerifiableConsumer.java b/tools/src/main/java/org/apache/kafka/tools/VerifiableConsumer.java
index 1880d7a..8db442e 100644
--- a/tools/src/main/java/org/apache/kafka/tools/VerifiableConsumer.java
+++ b/tools/src/main/java/org/apache/kafka/tools/VerifiableConsumer.java
@@ -265,11 +265,6 @@ public class VerifiableConsumer implements Closeable, OffsetCommitCallback, Cons
public long timestamp() {
return timestamp;
}
-
- @JsonProperty("class")
- public String clazz() {
- return VerifiableConsumer.class.getName();
- }
}
private static class ShutdownComplete extends ConsumerEvent {
http://git-wip-us.apache.org/repos/asf/kafka/blob/b6cd0e27/tools/src/main/java/org/apache/kafka/tools/VerifiableProducer.java
----------------------------------------------------------------------
diff --git a/tools/src/main/java/org/apache/kafka/tools/VerifiableProducer.java b/tools/src/main/java/org/apache/kafka/tools/VerifiableProducer.java
index b511fb9..30f08e8 100644
--- a/tools/src/main/java/org/apache/kafka/tools/VerifiableProducer.java
+++ b/tools/src/main/java/org/apache/kafka/tools/VerifiableProducer.java
@@ -84,7 +84,7 @@ public class VerifiableProducer {
this.topic = topic;
this.throughput = throughput;
this.maxMessages = maxMessages;
- this.producer = new KafkaProducer<String, String>(producerProps);
+ this.producer = new KafkaProducer<>(producerProps);
this.valuePrefix = valuePrefix;
}
@@ -252,7 +252,6 @@ public class VerifiableProducer {
String shutdownString() {
Map<String, Object> data = new HashMap<>();
- data.put("class", this.getClass().toString());
data.put("name", "shutdown_complete");
return toJsonString(data);
}
@@ -265,7 +264,6 @@ public class VerifiableProducer {
assert e != null : "Expected non-null exception.";
Map<String, Object> errorData = new HashMap<>();
- errorData.put("class", this.getClass().toString());
errorData.put("name", "producer_send_error");
errorData.put("time_ms", nowMs);
@@ -282,7 +280,6 @@ public class VerifiableProducer {
assert recordMetadata != null : "Expected non-null recordMetadata object.";
Map<String, Object> successData = new HashMap<>();
- successData.put("class", this.getClass().toString());
successData.put("name", "producer_send_success");
successData.put("time_ms", nowMs);
@@ -349,7 +346,6 @@ public class VerifiableProducer {
double avgThroughput = 1000 * ((producer.numAcked) / (double) (stopMs - startMs));
Map<String, Object> data = new HashMap<>();
- data.put("class", producer.getClass().toString());
data.put("name", "tool_data");
data.put("sent", producer.numSent);
data.put("acked", producer.numAcked);