You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ne...@apache.org on 2011/08/23 04:36:24 UTC

svn commit: r1160529 - in /incubator/kafka/trunk/core/src/main/scala/kafka: tools/DumpLogSegments.scala tools/ProducerPerformance.scala tools/SimpleConsumerShell.scala utils/DumpLogSegments.scala

Author: nehanarkhede
Date: Tue Aug 23 02:36:24 2011
New Revision: 1160529

URL: http://svn.apache.org/viewvc?rev=1160529&view=rev
Log:
Improve the command line tools in the bin directory to use the compression feature correctly; KAFKA-112; patched by nehanarkhede; reviewed by junrao

Added:
    incubator/kafka/trunk/core/src/main/scala/kafka/tools/DumpLogSegments.scala
      - copied, changed from r1160521, incubator/kafka/trunk/core/src/main/scala/kafka/utils/DumpLogSegments.scala
Removed:
    incubator/kafka/trunk/core/src/main/scala/kafka/utils/DumpLogSegments.scala
Modified:
    incubator/kafka/trunk/core/src/main/scala/kafka/tools/ProducerPerformance.scala
    incubator/kafka/trunk/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala

Copied: incubator/kafka/trunk/core/src/main/scala/kafka/tools/DumpLogSegments.scala (from r1160521, incubator/kafka/trunk/core/src/main/scala/kafka/utils/DumpLogSegments.scala)
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/tools/DumpLogSegments.scala?p2=incubator/kafka/trunk/core/src/main/scala/kafka/tools/DumpLogSegments.scala&p1=incubator/kafka/trunk/core/src/main/scala/kafka/utils/DumpLogSegments.scala&r1=1160521&r2=1160529&rev=1160529&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/utils/DumpLogSegments.scala (original)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/tools/DumpLogSegments.scala Tue Aug 23 02:36:24 2011
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package kafka.utils
+package kafka.tools
 
 import java.io._
 import kafka.message._
@@ -44,7 +44,7 @@ object DumpLogSegments {
             println("offset:\t %d \t invalid".format(offset))
           if (!isNoPrint)
             println("payload:\t" + Utils.toString(messageAndOffset.message.payload, "UTF-8"))
-          offset += messageAndOffset.offset
+          offset = messageAndOffset.offset
         }
       }
     }

Modified: incubator/kafka/trunk/core/src/main/scala/kafka/tools/ProducerPerformance.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/tools/ProducerPerformance.scala?rev=1160529&r1=1160528&r2=1160529&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/tools/ProducerPerformance.scala (original)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/tools/ProducerPerformance.scala Tue Aug 23 02:36:24 2011
@@ -219,7 +219,7 @@ object ProducerPerformance {
       props.put("zk.connect", brokerInfoList(1))
     else
       props.put("broker.list", brokerInfoList(1))
-    props.put("compression.codec", config.compressionCodec.toString)
+    props.put("compression.codec", config.compressionCodec.codec.toString)
     props.put("reconnect.interval", Integer.MAX_VALUE.toString)
     props.put("buffer.size", (64*1024).toString)
 

Modified: incubator/kafka/trunk/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala?rev=1160529&r1=1160528&r2=1160529&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala (original)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala Tue Aug 23 02:36:24 2011
@@ -23,6 +23,7 @@ import kafka.api.FetchRequest
 import kafka.utils._
 import kafka.consumer._
 import kafka.server._
+import org.apache.log4j.Logger
 
 /**
  * Command line program to dump out messages to standard out using the simple consumer
@@ -30,7 +31,9 @@ import kafka.server._
 object SimpleConsumerShell {
 
   def main(args: Array[String]): Unit = {
-    
+
+    val logger = Logger.getLogger(getClass)
+
     val parser = new OptionParser
     val urlOpt = parser.accepts("server", "REQUIRED: The hostname of the server to connect to.")
                            .withRequiredArg
@@ -55,12 +58,22 @@ object SimpleConsumerShell {
                            .describedAs("fetchsize")
                            .ofType(classOf[java.lang.Integer])
                            .defaultsTo(1000000)
+    val printOffsetOpt = parser.accepts("print-offsets", "Print the offsets returned by the iterator")
+                           .withOptionalArg
+                           .describedAs("print offsets")
+                           .ofType(classOf[java.lang.Boolean])
+                           .defaultsTo(false)
+    val printMessageOpt = parser.accepts("print-messages", "Print the messages returned by the iterator")
+                           .withOptionalArg
+                           .describedAs("print messages")
+                           .ofType(classOf[java.lang.Boolean])
+                           .defaultsTo(false)
 
     val options = parser.parse(args : _*)
     
     for(arg <- List(urlOpt, topicOpt)) {
       if(!options.has(arg)) {
-        System.err.println("Missing required argument \"" + arg + "\"") 
+        logger.error("Missing required argument \"" + arg + "\"")
         parser.printHelpOn(System.err)
         System.exit(1)
       }
@@ -71,31 +84,35 @@ object SimpleConsumerShell {
     val partition = options.valueOf(partitionOpt).intValue
     val startingOffset = options.valueOf(offsetOpt).longValue
     val fetchsize = options.valueOf(fetchsizeOpt).intValue
+    val printOffsets = if(options.has(printOffsetOpt)) true else false
+    val printMessages = if(options.has(printMessageOpt)) true else false
 
-    println("Starting consumer...")
+    logger.info("Starting consumer...")
     val consumer = new SimpleConsumer(url.getHost, url.getPort, 10000, 64*1024)
     val thread = Utils.newThread("kafka-consumer", new Runnable() {
       def run() {
         var offset = startingOffset
         while(true) {
-	      val fetchRequest = new FetchRequest(topic, partition, offset, fetchsize)
-	      val messageSets = consumer.multifetch(fetchRequest)
-	      for (messages <- messageSets) {
-	        println("multi fetched " + messages.sizeInBytes + " bytes from offset " + offset)
+          val fetchRequest = new FetchRequest(topic, partition, offset, fetchsize)
+          val messageSets = consumer.multifetch(fetchRequest)
+          for (messages <- messageSets) {
+            if(logger.isDebugEnabled)
+            	logger.debug("multi fetched " + messages.sizeInBytes + " bytes from offset " + offset)
             var consumed = 0
             for(messageAndOffset <- messages) {
-              println("consumed: " + Utils.toString(messageAndOffset.message.payload, "UTF-8"))
+              if(printMessages)
+                logger.info("consumed: " + Utils.toString(messageAndOffset.message.payload, "UTF-8"))
+              offset = messageAndOffset.offset
+              if(printOffsets)
+                logger.info("next offset = " + offset)
               consumed += 1
-	        }
-	        if(consumed > 0)
-	          offset += messages.validBytes
+            }
           }
-          Thread.sleep(10000)
         }
-      }  
+      }
     }, false);
     thread.start()
     thread.join()
   }
-  
+
 }