You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ew...@apache.org on 2016/06/07 04:34:48 UTC
kafka git commit: KAFKA-3501: Console consumer process hangs on exit
Repository: kafka
Updated Branches:
refs/heads/trunk feab5a374 -> d9f052acc
KAFKA-3501: Console consumer process hangs on exit
- replace `System.exit(1)` with a regular `return` in order to release the latch blocking the shutdown hook thread from shutting down the JVM
- provide `PrintStream` to the `process` method in order to ease unit testing
Author: Sebastien Launay <se...@opendns.com>
Reviewers: Ewen Cheslack-Postava <ew...@confluent.io>
Closes #1185 from slaunay/bugfix/KAFKA-3501-console-consumer-hangs
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/d9f052ac
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/d9f052ac
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/d9f052ac
Branch: refs/heads/trunk
Commit: d9f052acc396871801edee13ef0a6042a9af6626
Parents: feab5a3
Author: Sebastien Launay <se...@opendns.com>
Authored: Mon Jun 6 21:34:33 2016 -0700
Committer: Ewen Cheslack-Postava <me...@ewencp.org>
Committed: Mon Jun 6 21:34:33 2016 -0700
----------------------------------------------------------------------
.../scala/kafka/tools/ConsoleConsumer.scala | 20 ++++++++-----
.../unit/kafka/tools/ConsoleConsumerTest.scala | 31 ++++++++++++++++++--
2 files changed, 41 insertions(+), 10 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/d9f052ac/core/src/main/scala/kafka/tools/ConsoleConsumer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/ConsoleConsumer.scala b/core/src/main/scala/kafka/tools/ConsoleConsumer.scala
index 8953640..3b7a214 100755
--- a/core/src/main/scala/kafka/tools/ConsoleConsumer.scala
+++ b/core/src/main/scala/kafka/tools/ConsoleConsumer.scala
@@ -70,9 +70,10 @@ object ConsoleConsumer extends Logging {
addShutdownHook(consumer, conf)
try {
- process(conf.maxMessages, conf.formatter, consumer, conf.skipMessageOnError)
+ process(conf.maxMessages, conf.formatter, consumer, System.out, conf.skipMessageOnError)
} finally {
consumer.cleanup()
+ conf.formatter.close()
reportRecordCount()
// if we generated a random group id (as none specified explicitly) then avoid polluting zookeeper with persistent group data, this is a hack
@@ -111,7 +112,7 @@ object ConsoleConsumer extends Logging {
})
}
- def process(maxMessages: Integer, formatter: MessageFormatter, consumer: BaseConsumer, skipMessageOnError: Boolean) {
+ def process(maxMessages: Integer, formatter: MessageFormatter, consumer: BaseConsumer, output: PrintStream, skipMessageOnError: Boolean) {
while (messageCount < maxMessages || maxMessages == -1) {
val msg: BaseConsumerRecord = try {
consumer.receive()
@@ -132,7 +133,7 @@ object ConsoleConsumer extends Logging {
messageCount += 1
try {
formatter.writeTo(new ConsumerRecord(msg.topic, msg.partition, msg.offset, msg.timestamp,
- msg.timestampType, 0, 0, 0, msg.key, msg.value), System.out)
+ msg.timestampType, 0, 0, 0, msg.key, msg.value), output)
} catch {
case e: Throwable =>
if (skipMessageOnError) {
@@ -142,7 +143,10 @@ object ConsoleConsumer extends Logging {
throw e
}
}
- checkErr(formatter)
+ if (checkErr(output, formatter)) {
+ // Consumer will be closed
+ return
+ }
}
}
@@ -150,13 +154,13 @@ object ConsoleConsumer extends Logging {
System.err.println(s"Processed a total of $messageCount messages")
}
- def checkErr(formatter: MessageFormatter) {
- if (System.out.checkError()) {
+ def checkErr(output: PrintStream, formatter: MessageFormatter): Boolean = {
+ val gotError = output.checkError()
+ if (gotError) {
// This means no one is listening to our output stream any more, time to shutdown
System.err.println("Unable to write to standard out, closing consumer.")
- formatter.close()
- System.exit(1)
}
+ gotError
}
def getOldConsumerProps(config: ConsumerConfig): Properties = {
http://git-wip-us.apache.org/repos/asf/kafka/blob/d9f052ac/core/src/test/scala/unit/kafka/tools/ConsoleConsumerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/tools/ConsoleConsumerTest.scala b/core/src/test/scala/unit/kafka/tools/ConsoleConsumerTest.scala
index 31b3211..c3ebade 100644
--- a/core/src/test/scala/unit/kafka/tools/ConsoleConsumerTest.scala
+++ b/core/src/test/scala/unit/kafka/tools/ConsoleConsumerTest.scala
@@ -17,7 +17,7 @@
package kafka.tools
-import java.io.FileOutputStream
+import java.io.{PrintStream, FileOutputStream}
import kafka.common.MessageFormatter
import kafka.consumer.{BaseConsumer, BaseConsumerRecord}
@@ -47,7 +47,34 @@ class ConsoleConsumerTest extends JUnitSuite {
EasyMock.replay(formatter)
//Test
- ConsoleConsumer.process(messageLimit, formatter, consumer, true)
+ ConsoleConsumer.process(messageLimit, formatter, consumer, System.out, true)
+ }
+
+ @Test
+ def shouldStopWhenOutputCheckErrorFails() {
+ //Mocks
+ val consumer = EasyMock.createNiceMock(classOf[BaseConsumer])
+ val formatter = EasyMock.createNiceMock(classOf[MessageFormatter])
+ val printStream = EasyMock.createNiceMock(classOf[PrintStream])
+
+ //Stubs
+ val record = new BaseConsumerRecord(topic = "foo", partition = 1, offset = 1, key = Array[Byte](), value = Array[Byte]())
+
+ //Expectations
+ EasyMock.expect(consumer.receive()).andReturn(record)
+ EasyMock.expect(formatter.writeTo(EasyMock.anyObject(), EasyMock.eq(printStream)))
+ //Simulate an error on System.out after the first record has been printed
+ EasyMock.expect(printStream.checkError()).andReturn(true)
+
+ EasyMock.replay(consumer)
+ EasyMock.replay(formatter)
+ EasyMock.replay(printStream)
+
+ //Test
+ ConsoleConsumer.process(-1, formatter, consumer, printStream, true)
+
+ //Verify
+ EasyMock.verify(consumer, formatter, printStream)
}
@Test