You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gw...@apache.org on 2016/04/30 00:05:58 UTC
[44/50] [abbrv] kafka git commit: KAFKA-3597;
Query ConsoleConsumer and VerifiableProducer if they shutdown cleanly
KAFKA-3597; Query ConsoleConsumer and VerifiableProducer if they shutdown cleanly
Even if a test calls stop() on console_consumer or verifiable_producer, it is still possible that producer/consumer will not shutdown cleanly, and will be killed forcefully after a timeout. It will be useful for some tests to know whether a clean shutdown happened or not. This PR adds methods to console_consumer and verifiable_producer to query whether clean shutdown happened or not.
hachikuji and/or granders Please review.
Author: Anna Povzner <an...@confluent.io>
Reviewers: Jason Gustafson, Geoff Anderson, Gwen Shapira
Closes #1278 from apovzner/kafka-3597
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/e29eac4b
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/e29eac4b
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/e29eac4b
Branch: refs/heads/0.10.0
Commit: e29eac4bbb678aa3d5a29a75f413a7b10cc2f0b1
Parents: eb50d2f
Author: Anna Povzner <an...@confluent.io>
Authored: Fri Apr 29 10:51:29 2016 -0700
Committer: Gwen Shapira <cs...@gmail.com>
Committed: Fri Apr 29 10:51:29 2016 -0700
----------------------------------------------------------------------
.../src/main/scala/kafka/tools/ConsoleConsumer.scala | 8 ++++++++
tests/kafkatest/services/console_consumer.py | 15 +++++++++++----
tests/kafkatest/services/verifiable_producer.py | 6 ++++++
.../org/apache/kafka/tools/VerifiableProducer.java | 8 ++++++++
4 files changed, 33 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/e29eac4b/core/src/main/scala/kafka/tools/ConsoleConsumer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/ConsoleConsumer.scala b/core/src/main/scala/kafka/tools/ConsoleConsumer.scala
index e9a43f2..8953640 100755
--- a/core/src/main/scala/kafka/tools/ConsoleConsumer.scala
+++ b/core/src/main/scala/kafka/tools/ConsoleConsumer.scala
@@ -103,6 +103,10 @@ object ConsoleConsumer extends Logging {
consumer.stop()
shutdownLatch.await()
+
+ if (conf.enableSystestEventsLogging) {
+ System.out.println("shutdown_complete")
+ }
}
})
}
@@ -253,6 +257,9 @@ object ConsoleConsumer extends Logging {
.withRequiredArg
.describedAs("deserializer for values")
.ofType(classOf[String])
+ val enableSystestEventsLoggingOpt = parser.accepts("enable-systest-events",
+ "Log lifecycle events of the consumer in addition to logging consumed " +
+ "messages. (This is specific for system tests.)")
if (args.length == 0)
CommandLineUtils.printUsageAndDie(parser, "The console consumer is a tool that reads data from Kafka and outputs it to standard output.")
@@ -260,6 +267,7 @@ object ConsoleConsumer extends Logging {
var groupIdPassed = true
val options: OptionSet = tryParse(parser, args)
val useNewConsumer = options.has(useNewConsumerOpt)
+ val enableSystestEventsLogging = options.has(enableSystestEventsLoggingOpt)
// If using old consumer, exactly one of whitelist/blacklist/topic is required.
// If using new consumer, topic must be specified.
http://git-wip-us.apache.org/repos/asf/kafka/blob/e29eac4b/tests/kafkatest/services/console_consumer.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/console_consumer.py b/tests/kafkatest/services/console_consumer.py
index e5f2196..37638e2 100644
--- a/tests/kafkatest/services/console_consumer.py
+++ b/tests/kafkatest/services/console_consumer.py
@@ -123,6 +123,7 @@ class ConsoleConsumer(JmxMixin, BackgroundThreadService):
self.from_beginning = from_beginning
self.message_validator = message_validator
self.messages_consumed = {idx: [] for idx in range(1, num_nodes + 1)}
+ self.clean_shutdown_nodes = set()
self.client_id = client_id
self.print_key = print_key
self.log_level = "TRACE"
@@ -185,6 +186,7 @@ class ConsoleConsumer(JmxMixin, BackgroundThreadService):
if node.version > LATEST_0_9:
cmd+=" --formatter kafka.tools.LoggingMessageFormatter"
+ cmd += " --enable-systest-events"
cmd += " 2>> %(stderr)s | tee -a %(stdout)s &" % args
return cmd
@@ -226,10 +228,15 @@ class ConsoleConsumer(JmxMixin, BackgroundThreadService):
for line in itertools.chain([first_line], consumer_output):
msg = line.strip()
- if self.message_validator is not None:
- msg = self.message_validator(msg)
- if msg is not None:
- self.messages_consumed[idx].append(msg)
+ if msg == "shutdown_complete":
+ if node in self.clean_shutdown_nodes:
+ raise Exception("Unexpected shutdown event from consumer, already shutdown. Consumer index: %d" % idx)
+ self.clean_shutdown_nodes.add(node)
+ else:
+ if self.message_validator is not None:
+ msg = self.message_validator(msg)
+ if msg is not None:
+ self.messages_consumed[idx].append(msg)
self.read_jmx_output(idx, node)
http://git-wip-us.apache.org/repos/asf/kafka/blob/e29eac4b/tests/kafkatest/services/verifiable_producer.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/verifiable_producer.py b/tests/kafkatest/services/verifiable_producer.py
index 500410f..4fec776 100644
--- a/tests/kafkatest/services/verifiable_producer.py
+++ b/tests/kafkatest/services/verifiable_producer.py
@@ -71,6 +71,7 @@ class VerifiableProducer(BackgroundThreadService):
self.acked_values = []
self.not_acked_values = []
self.produced_count = {}
+ self.clean_shutdown_nodes = set()
self.acks = acks
@@ -139,6 +140,11 @@ class VerifiableProducer(BackgroundThreadService):
last_produced_time = t
prev_msg = data
+ elif data["name"] == "shutdown_complete":
+ if node in self.clean_shutdown_nodes:
+ raise Exception("Unexpected shutdown event from producer, already shutdown. Producer index: %d" % idx)
+ self.clean_shutdown_nodes.add(node)
+
def start_cmd(self, node, idx):
cmd = ""
http://git-wip-us.apache.org/repos/asf/kafka/blob/e29eac4b/tools/src/main/java/org/apache/kafka/tools/VerifiableProducer.java
----------------------------------------------------------------------
diff --git a/tools/src/main/java/org/apache/kafka/tools/VerifiableProducer.java b/tools/src/main/java/org/apache/kafka/tools/VerifiableProducer.java
index 9b10a9f..b511fb9 100644
--- a/tools/src/main/java/org/apache/kafka/tools/VerifiableProducer.java
+++ b/tools/src/main/java/org/apache/kafka/tools/VerifiableProducer.java
@@ -247,6 +247,14 @@ public class VerifiableProducer {
/** Close the producer to flush any remaining messages. */
public void close() {
producer.close();
+ System.out.println(shutdownString());
+ }
+
+ String shutdownString() {
+ Map<String, Object> data = new HashMap<>();
+ data.put("class", this.getClass().toString());
+ data.put("name", "shutdown_complete");
+ return toJsonString(data);
}
/**