You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jk...@apache.org on 2011/09/01 07:47:22 UTC
svn commit: r1163911 -
/incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala
Author: jkreps
Date: Thu Sep 1 05:47:21 2011
New Revision: 1163911
URL: http://svn.apache.org/viewvc?rev=1163911&view=rev
Log:
KAFKA-124 Console consumer does not stop consuming if the program reading from standard out dies. Check for errors on the output stream and exit if no one is listening.
Modified:
incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala
Modified: incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala?rev=1163911&r1=1163910&r2=1163911&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala (original)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala Thu Sep 1 05:47:21 2011
@@ -136,20 +136,27 @@ object ConsoleConsumer {
} catch {
case e =>
if (skipMessageOnError)
- logger.error("error processing message, skipping and resume consumption: " + e)
+ logger.error("Error processing message, skipping this message: ", e)
else
throw e
}
+ if(System.out.checkError()) {
+ // This means no one is listening to our output stream any more, time to shutdown
+ System.err.println("Unable to write to standard out, closing consumer.")
+ formatter.close()
+ connector.shutdown()
+ System.exit(1)
+ }
}
} catch {
- case e => logger.error("error processing message, stop consuming: " + e)
+ case e => logger.error("Error processing message, stopping consumer: ", e)
}
System.out.flush()
formatter.close()
connector.shutdown()
}
-
+
def tryParse(parser: OptionParser, args: Array[String]) = {
try {
parser.parse(args : _*)