You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ew...@apache.org on 2016/06/07 04:34:48 UTC

kafka git commit: KAFKA-3501: Console consumer process hangs on exit

Repository: kafka
Updated Branches:
  refs/heads/trunk feab5a374 -> d9f052acc


KAFKA-3501: Console consumer process hangs on exit

- replace `System.exit(1)` with a regular `return` in order to release the latch blocking the shutdown hook thread from shutting down the JVM
- provide `PrintStream` to the `process` method in order to ease unit testing

Author: Sebastien Launay <se...@opendns.com>

Reviewers: Ewen Cheslack-Postava <ew...@confluent.io>

Closes #1185 from slaunay/bugfix/KAFKA-3501-console-consumer-hangs


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/d9f052ac
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/d9f052ac
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/d9f052ac

Branch: refs/heads/trunk
Commit: d9f052acc396871801edee13ef0a6042a9af6626
Parents: feab5a3
Author: Sebastien Launay <se...@opendns.com>
Authored: Mon Jun 6 21:34:33 2016 -0700
Committer: Ewen Cheslack-Postava <me...@ewencp.org>
Committed: Mon Jun 6 21:34:33 2016 -0700

----------------------------------------------------------------------
 .../scala/kafka/tools/ConsoleConsumer.scala     | 20 ++++++++-----
 .../unit/kafka/tools/ConsoleConsumerTest.scala  | 31 ++++++++++++++++++--
 2 files changed, 41 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/d9f052ac/core/src/main/scala/kafka/tools/ConsoleConsumer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/ConsoleConsumer.scala b/core/src/main/scala/kafka/tools/ConsoleConsumer.scala
index 8953640..3b7a214 100755
--- a/core/src/main/scala/kafka/tools/ConsoleConsumer.scala
+++ b/core/src/main/scala/kafka/tools/ConsoleConsumer.scala
@@ -70,9 +70,10 @@ object ConsoleConsumer extends Logging {
     addShutdownHook(consumer, conf)
 
     try {
-      process(conf.maxMessages, conf.formatter, consumer, conf.skipMessageOnError)
+      process(conf.maxMessages, conf.formatter, consumer, System.out, conf.skipMessageOnError)
     } finally {
       consumer.cleanup()
+      conf.formatter.close()
       reportRecordCount()
 
       // if we generated a random group id (as none specified explicitly) then avoid polluting zookeeper with persistent group data, this is a hack
@@ -111,7 +112,7 @@ object ConsoleConsumer extends Logging {
     })
   }
 
-  def process(maxMessages: Integer, formatter: MessageFormatter, consumer: BaseConsumer, skipMessageOnError: Boolean) {
+  def process(maxMessages: Integer, formatter: MessageFormatter, consumer: BaseConsumer, output: PrintStream, skipMessageOnError: Boolean) {
     while (messageCount < maxMessages || maxMessages == -1) {
       val msg: BaseConsumerRecord = try {
         consumer.receive()
@@ -132,7 +133,7 @@ object ConsoleConsumer extends Logging {
       messageCount += 1
       try {
         formatter.writeTo(new ConsumerRecord(msg.topic, msg.partition, msg.offset, msg.timestamp,
-                                             msg.timestampType, 0, 0, 0, msg.key, msg.value), System.out)
+                                             msg.timestampType, 0, 0, 0, msg.key, msg.value), output)
       } catch {
         case e: Throwable =>
           if (skipMessageOnError) {
@@ -142,7 +143,10 @@ object ConsoleConsumer extends Logging {
             throw e
           }
       }
-      checkErr(formatter)
+      if (checkErr(output, formatter)) {
+        // Consumer will be closed
+        return
+      }
     }
   }
 
@@ -150,13 +154,13 @@ object ConsoleConsumer extends Logging {
     System.err.println(s"Processed a total of $messageCount messages")
   }
 
-  def checkErr(formatter: MessageFormatter) {
-    if (System.out.checkError()) {
+  def checkErr(output: PrintStream, formatter: MessageFormatter): Boolean = {
+    val gotError = output.checkError()
+    if (gotError) {
       // This means no one is listening to our output stream any more, time to shutdown
       System.err.println("Unable to write to standard out, closing consumer.")
-      formatter.close()
-      System.exit(1)
     }
+    gotError
   }
 
   def getOldConsumerProps(config: ConsumerConfig): Properties = {

http://git-wip-us.apache.org/repos/asf/kafka/blob/d9f052ac/core/src/test/scala/unit/kafka/tools/ConsoleConsumerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/tools/ConsoleConsumerTest.scala b/core/src/test/scala/unit/kafka/tools/ConsoleConsumerTest.scala
index 31b3211..c3ebade 100644
--- a/core/src/test/scala/unit/kafka/tools/ConsoleConsumerTest.scala
+++ b/core/src/test/scala/unit/kafka/tools/ConsoleConsumerTest.scala
@@ -17,7 +17,7 @@
 
 package kafka.tools
 
-import java.io.FileOutputStream
+import java.io.{PrintStream, FileOutputStream}
 
 import kafka.common.MessageFormatter
 import kafka.consumer.{BaseConsumer, BaseConsumerRecord}
@@ -47,7 +47,34 @@ class ConsoleConsumerTest extends JUnitSuite {
     EasyMock.replay(formatter)
 
     //Test
-    ConsoleConsumer.process(messageLimit, formatter, consumer, true)
+    ConsoleConsumer.process(messageLimit, formatter, consumer, System.out, true)
+  }
+
+  @Test
+  def shouldStopWhenOutputCheckErrorFails() {
+    //Mocks
+    val consumer = EasyMock.createNiceMock(classOf[BaseConsumer])
+    val formatter = EasyMock.createNiceMock(classOf[MessageFormatter])
+    val printStream = EasyMock.createNiceMock(classOf[PrintStream])
+
+    //Stubs
+    val record = new BaseConsumerRecord(topic = "foo", partition = 1, offset = 1, key = Array[Byte](), value = Array[Byte]())
+
+    //Expectations
+    EasyMock.expect(consumer.receive()).andReturn(record)
+    EasyMock.expect(formatter.writeTo(EasyMock.anyObject(), EasyMock.eq(printStream)))
+    //Simulate an error on System.out after the first record has been printed
+    EasyMock.expect(printStream.checkError()).andReturn(true)
+
+    EasyMock.replay(consumer)
+    EasyMock.replay(formatter)
+    EasyMock.replay(printStream)
+
+    //Test
+    ConsoleConsumer.process(-1, formatter, consumer, printStream, true)
+
+    //Verify
+    EasyMock.verify(consumer, formatter, printStream)
   }
 
   @Test