You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gw...@apache.org on 2016/04/30 00:05:58 UTC

[44/50] [abbrv] kafka git commit: KAFKA-3597; Query ConsoleConsumer and VerifiableProducer if they shutdown cleanly

KAFKA-3597; Query ConsoleConsumer and VerifiableProducer if they shutdown cleanly

Even if a test calls stop() on console_consumer or verifiable_producer, it is still possible that producer/consumer will not shutdown cleanly, and will be killed forcefully after a timeout. It will be useful for some tests to know whether a clean shutdown happened or not. This PR adds methods to console_consumer and verifiable_producer to query whether clean shutdown happened or not.

hachikuji and/or granders Please review.

Author: Anna Povzner <an...@confluent.io>

Reviewers: Jason Gustafson, Geoff Anderson, Gwen Shapira

Closes #1278 from apovzner/kafka-3597


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/e29eac4b
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/e29eac4b
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/e29eac4b

Branch: refs/heads/0.10.0
Commit: e29eac4bbb678aa3d5a29a75f413a7b10cc2f0b1
Parents: eb50d2f
Author: Anna Povzner <an...@confluent.io>
Authored: Fri Apr 29 10:51:29 2016 -0700
Committer: Gwen Shapira <cs...@gmail.com>
Committed: Fri Apr 29 10:51:29 2016 -0700

----------------------------------------------------------------------
 .../src/main/scala/kafka/tools/ConsoleConsumer.scala |  8 ++++++++
 tests/kafkatest/services/console_consumer.py         | 15 +++++++++++----
 tests/kafkatest/services/verifiable_producer.py      |  6 ++++++
 .../org/apache/kafka/tools/VerifiableProducer.java   |  8 ++++++++
 4 files changed, 33 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/e29eac4b/core/src/main/scala/kafka/tools/ConsoleConsumer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/ConsoleConsumer.scala b/core/src/main/scala/kafka/tools/ConsoleConsumer.scala
index e9a43f2..8953640 100755
--- a/core/src/main/scala/kafka/tools/ConsoleConsumer.scala
+++ b/core/src/main/scala/kafka/tools/ConsoleConsumer.scala
@@ -103,6 +103,10 @@ object ConsoleConsumer extends Logging {
         consumer.stop()
 
         shutdownLatch.await()
+
+        if (conf.enableSystestEventsLogging) {
+          System.out.println("shutdown_complete")
+        }
       }
     })
   }
@@ -253,6 +257,9 @@ object ConsoleConsumer extends Logging {
       .withRequiredArg
       .describedAs("deserializer for values")
       .ofType(classOf[String])
+    val enableSystestEventsLoggingOpt = parser.accepts("enable-systest-events",
+                                                       "Log lifecycle events of the consumer in addition to logging consumed " +
+                                                       "messages. (This is specific for system tests.)")
 
     if (args.length == 0)
       CommandLineUtils.printUsageAndDie(parser, "The console consumer is a tool that reads data from Kafka and outputs it to standard output.")
@@ -260,6 +267,7 @@ object ConsoleConsumer extends Logging {
     var groupIdPassed = true
     val options: OptionSet = tryParse(parser, args)
     val useNewConsumer = options.has(useNewConsumerOpt)
+    val enableSystestEventsLogging = options.has(enableSystestEventsLoggingOpt)
 
     // If using old consumer, exactly one of whitelist/blacklist/topic is required.
     // If using new consumer, topic must be specified.

http://git-wip-us.apache.org/repos/asf/kafka/blob/e29eac4b/tests/kafkatest/services/console_consumer.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/console_consumer.py b/tests/kafkatest/services/console_consumer.py
index e5f2196..37638e2 100644
--- a/tests/kafkatest/services/console_consumer.py
+++ b/tests/kafkatest/services/console_consumer.py
@@ -123,6 +123,7 @@ class ConsoleConsumer(JmxMixin, BackgroundThreadService):
         self.from_beginning = from_beginning
         self.message_validator = message_validator
         self.messages_consumed = {idx: [] for idx in range(1, num_nodes + 1)}
+        self.clean_shutdown_nodes = set()
         self.client_id = client_id
         self.print_key = print_key
         self.log_level = "TRACE"
@@ -185,6 +186,7 @@ class ConsoleConsumer(JmxMixin, BackgroundThreadService):
         if node.version > LATEST_0_9:
             cmd+=" --formatter kafka.tools.LoggingMessageFormatter"
 
+        cmd += " --enable-systest-events"
         cmd += " 2>> %(stderr)s | tee -a %(stdout)s &" % args
         return cmd
 
@@ -226,10 +228,15 @@ class ConsoleConsumer(JmxMixin, BackgroundThreadService):
 
             for line in itertools.chain([first_line], consumer_output):
                 msg = line.strip()
-                if self.message_validator is not None:
-                    msg = self.message_validator(msg)
-                if msg is not None:
-                    self.messages_consumed[idx].append(msg)
+                if msg == "shutdown_complete":
+                    if node in self.clean_shutdown_nodes:
+                        raise Exception("Unexpected shutdown event from consumer, already shutdown. Consumer index: %d" % idx)
+                    self.clean_shutdown_nodes.add(node)
+                else:
+                    if self.message_validator is not None:
+                        msg = self.message_validator(msg)
+                    if msg is not None:
+                        self.messages_consumed[idx].append(msg)
 
             self.read_jmx_output(idx, node)
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/e29eac4b/tests/kafkatest/services/verifiable_producer.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/verifiable_producer.py b/tests/kafkatest/services/verifiable_producer.py
index 500410f..4fec776 100644
--- a/tests/kafkatest/services/verifiable_producer.py
+++ b/tests/kafkatest/services/verifiable_producer.py
@@ -71,6 +71,7 @@ class VerifiableProducer(BackgroundThreadService):
         self.acked_values = []
         self.not_acked_values = []
         self.produced_count = {}
+        self.clean_shutdown_nodes = set()
         self.acks = acks
 
 
@@ -139,6 +140,11 @@ class VerifiableProducer(BackgroundThreadService):
                         last_produced_time = t
                         prev_msg = data
 
+                    elif data["name"] == "shutdown_complete":
+                        if node in self.clean_shutdown_nodes:
+                            raise Exception("Unexpected shutdown event from producer, already shutdown. Producer index: %d" % idx)
+                        self.clean_shutdown_nodes.add(node)
+
     def start_cmd(self, node, idx):
 
         cmd = ""

http://git-wip-us.apache.org/repos/asf/kafka/blob/e29eac4b/tools/src/main/java/org/apache/kafka/tools/VerifiableProducer.java
----------------------------------------------------------------------
diff --git a/tools/src/main/java/org/apache/kafka/tools/VerifiableProducer.java b/tools/src/main/java/org/apache/kafka/tools/VerifiableProducer.java
index 9b10a9f..b511fb9 100644
--- a/tools/src/main/java/org/apache/kafka/tools/VerifiableProducer.java
+++ b/tools/src/main/java/org/apache/kafka/tools/VerifiableProducer.java
@@ -247,6 +247,14 @@ public class VerifiableProducer {
     /** Close the producer to flush any remaining messages. */
     public void close() {
         producer.close();
+        System.out.println(shutdownString());
+    }
+
+    String shutdownString() {
+        Map<String, Object> data = new HashMap<>();
+        data.put("class", this.getClass().toString());
+        data.put("name", "shutdown_complete");
+        return toJsonString(data);
     }
 
     /**