You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jk...@apache.org on 2011/09/01 07:47:22 UTC

svn commit: r1163911 - /incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala

Author: jkreps
Date: Thu Sep  1 05:47:21 2011
New Revision: 1163911

URL: http://svn.apache.org/viewvc?rev=1163911&view=rev
Log:
KAFKA-124 Console consumer does not stop consuming if the program reading from standard out dies. Check for errors on the output stream and exit if no one is listening.


Modified:
    incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala

Modified: incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala?rev=1163911&r1=1163910&r2=1163911&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala (original)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala Thu Sep  1 05:47:21 2011
@@ -136,20 +136,27 @@ object ConsoleConsumer {
         } catch {
           case e =>
             if (skipMessageOnError)
-              logger.error("error processing message, skipping and resume consumption: " + e)
+              logger.error("Error processing message, skipping this message: ", e)
             else
               throw e
         }
+        if(System.out.checkError()) { 
+          // This means no one is listening to our output stream any more, time to shutdown
+          System.err.println("Unable to write to standard out, closing consumer.")
+          formatter.close()
+          connector.shutdown()
+          System.exit(1)
+        }
       }
     } catch {
-      case e => logger.error("error processing message, stop consuming: " + e)
+      case e => logger.error("Error processing message, stopping consumer: ", e)
     }
       
     System.out.flush()
     formatter.close()
     connector.shutdown()
   }
-  
+
   def tryParse(parser: OptionParser, args: Array[String]) = {
     try {
       parser.parse(args : _*)