You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ne...@apache.org on 2011/08/23 04:36:24 UTC
svn commit: r1160529 - in /incubator/kafka/trunk/core/src/main/scala/kafka:
tools/DumpLogSegments.scala tools/ProducerPerformance.scala
tools/SimpleConsumerShell.scala utils/DumpLogSegments.scala
Author: nehanarkhede
Date: Tue Aug 23 02:36:24 2011
New Revision: 1160529
URL: http://svn.apache.org/viewvc?rev=1160529&view=rev
Log:
Improve the command line tools in the bin directory to use the compression feature correctly; KAFKA-112; patched by nehanarkhede; reviewed by junrao
Added:
incubator/kafka/trunk/core/src/main/scala/kafka/tools/DumpLogSegments.scala
- copied, changed from r1160521, incubator/kafka/trunk/core/src/main/scala/kafka/utils/DumpLogSegments.scala
Removed:
incubator/kafka/trunk/core/src/main/scala/kafka/utils/DumpLogSegments.scala
Modified:
incubator/kafka/trunk/core/src/main/scala/kafka/tools/ProducerPerformance.scala
incubator/kafka/trunk/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala
Copied: incubator/kafka/trunk/core/src/main/scala/kafka/tools/DumpLogSegments.scala (from r1160521, incubator/kafka/trunk/core/src/main/scala/kafka/utils/DumpLogSegments.scala)
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/tools/DumpLogSegments.scala?p2=incubator/kafka/trunk/core/src/main/scala/kafka/tools/DumpLogSegments.scala&p1=incubator/kafka/trunk/core/src/main/scala/kafka/utils/DumpLogSegments.scala&r1=1160521&r2=1160529&rev=1160529&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/utils/DumpLogSegments.scala (original)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/tools/DumpLogSegments.scala Tue Aug 23 02:36:24 2011
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package kafka.utils
+package kafka.tools
import java.io._
import kafka.message._
@@ -44,7 +44,7 @@ object DumpLogSegments {
println("offset:\t %d \t invalid".format(offset))
if (!isNoPrint)
println("payload:\t" + Utils.toString(messageAndOffset.message.payload, "UTF-8"))
- offset += messageAndOffset.offset
+ offset = messageAndOffset.offset
}
}
}
Modified: incubator/kafka/trunk/core/src/main/scala/kafka/tools/ProducerPerformance.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/tools/ProducerPerformance.scala?rev=1160529&r1=1160528&r2=1160529&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/tools/ProducerPerformance.scala (original)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/tools/ProducerPerformance.scala Tue Aug 23 02:36:24 2011
@@ -219,7 +219,7 @@ object ProducerPerformance {
props.put("zk.connect", brokerInfoList(1))
else
props.put("broker.list", brokerInfoList(1))
- props.put("compression.codec", config.compressionCodec.toString)
+ props.put("compression.codec", config.compressionCodec.codec.toString)
props.put("reconnect.interval", Integer.MAX_VALUE.toString)
props.put("buffer.size", (64*1024).toString)
Modified: incubator/kafka/trunk/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala?rev=1160529&r1=1160528&r2=1160529&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala (original)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala Tue Aug 23 02:36:24 2011
@@ -23,6 +23,7 @@ import kafka.api.FetchRequest
import kafka.utils._
import kafka.consumer._
import kafka.server._
+import org.apache.log4j.Logger
/**
* Command line program to dump out messages to standard out using the simple consumer
@@ -30,7 +31,9 @@ import kafka.server._
object SimpleConsumerShell {
def main(args: Array[String]): Unit = {
-
+
+ val logger = Logger.getLogger(getClass)
+
val parser = new OptionParser
val urlOpt = parser.accepts("server", "REQUIRED: The hostname of the server to connect to.")
.withRequiredArg
@@ -55,12 +58,22 @@ object SimpleConsumerShell {
.describedAs("fetchsize")
.ofType(classOf[java.lang.Integer])
.defaultsTo(1000000)
+ val printOffsetOpt = parser.accepts("print-offsets", "Print the offsets returned by the iterator")
+ .withOptionalArg
+ .describedAs("print offsets")
+ .ofType(classOf[java.lang.Boolean])
+ .defaultsTo(false)
+ val printMessageOpt = parser.accepts("print-messages", "Print the messages returned by the iterator")
+ .withOptionalArg
+ .describedAs("print messages")
+ .ofType(classOf[java.lang.Boolean])
+ .defaultsTo(false)
val options = parser.parse(args : _*)
for(arg <- List(urlOpt, topicOpt)) {
if(!options.has(arg)) {
- System.err.println("Missing required argument \"" + arg + "\"")
+ logger.error("Missing required argument \"" + arg + "\"")
parser.printHelpOn(System.err)
System.exit(1)
}
@@ -71,31 +84,35 @@ object SimpleConsumerShell {
val partition = options.valueOf(partitionOpt).intValue
val startingOffset = options.valueOf(offsetOpt).longValue
val fetchsize = options.valueOf(fetchsizeOpt).intValue
+ val printOffsets = if(options.has(printOffsetOpt)) true else false
+ val printMessages = if(options.has(printMessageOpt)) true else false
- println("Starting consumer...")
+ logger.info("Starting consumer...")
val consumer = new SimpleConsumer(url.getHost, url.getPort, 10000, 64*1024)
val thread = Utils.newThread("kafka-consumer", new Runnable() {
def run() {
var offset = startingOffset
while(true) {
- val fetchRequest = new FetchRequest(topic, partition, offset, fetchsize)
- val messageSets = consumer.multifetch(fetchRequest)
- for (messages <- messageSets) {
- println("multi fetched " + messages.sizeInBytes + " bytes from offset " + offset)
+ val fetchRequest = new FetchRequest(topic, partition, offset, fetchsize)
+ val messageSets = consumer.multifetch(fetchRequest)
+ for (messages <- messageSets) {
+ if(logger.isDebugEnabled)
+ logger.debug("multi fetched " + messages.sizeInBytes + " bytes from offset " + offset)
var consumed = 0
for(messageAndOffset <- messages) {
- println("consumed: " + Utils.toString(messageAndOffset.message.payload, "UTF-8"))
+ if(printMessages)
+ logger.info("consumed: " + Utils.toString(messageAndOffset.message.payload, "UTF-8"))
+ offset = messageAndOffset.offset
+ if(printOffsets)
+ logger.info("next offset = " + offset)
consumed += 1
- }
- if(consumed > 0)
- offset += messages.validBytes
+ }
}
- Thread.sleep(10000)
}
- }
+ }
}, false);
thread.start()
thread.join()
}
-
+
}