You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2012/06/26 18:23:15 UTC

svn commit: r1354094 - in /incubator/kafka/branches/0.8/core/src/main/scala/kafka: consumer/ConsoleConsumer.scala consumer/ConsumerConfig.scala consumer/FetcherRunnable.scala log/Log.scala

Author: junrao
Date: Tue Jun 26 16:23:14 2012
New Revision: 1354094

URL: http://svn.apache.org/viewvc?rev=1354094&view=rev
Log:
Consumer doesn't receive all data; patched by Jun Rao; reviewed by John Fung; KAFKA-372

Modified:
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ConsumerConfig.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/FetcherRunnable.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/Log.scala

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala?rev=1354094&r1=1354093&r2=1354094&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala Tue Jun 26 16:23:14 2012
@@ -62,7 +62,17 @@ object ConsoleConsumer extends Logging {
                            .withRequiredArg
                            .describedAs("size")
                            .ofType(classOf[java.lang.Integer])
-                           .defaultsTo(1024 * 1024)   
+                           .defaultsTo(1024 * 1024)
+    val minFetchBytesOpt = parser.accepts("min-fetch-bytes", "The min number of bytes each fetch request waits for.")
+                           .withRequiredArg
+                           .describedAs("bytes")
+                           .ofType(classOf[java.lang.Integer])
+                           .defaultsTo(1)
+    val maxWaitMsOpt = parser.accepts("max-wait-ms", "The max amount of time each fetch request waits.")
+                           .withRequiredArg
+                           .describedAs("ms")
+                           .ofType(classOf[java.lang.Integer])
+                           .defaultsTo(100)
     val socketBufferSizeOpt = parser.accepts("socket-buffer-size", "The size of the tcp RECV size.")
                            .withRequiredArg
                            .describedAs("size")
@@ -116,6 +126,8 @@ object ConsoleConsumer extends Logging {
     props.put("groupid", options.valueOf(groupIdOpt))
     props.put("socket.buffersize", options.valueOf(socketBufferSizeOpt).toString)
     props.put("fetch.size", options.valueOf(fetchSizeOpt).toString)
+    props.put("min.fetch.bytes", options.valueOf(minFetchBytesOpt).toString)
+    props.put("max.fetch.wait.ms", options.valueOf(maxWaitMsOpt).toString)
     props.put("auto.commit", "true")
     props.put("autocommit.interval.ms", options.valueOf(autoCommitIntervalOpt).toString)
     props.put("autooffset.reset", if(options.has(resetBeginningOpt)) "smallest" else "largest")

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ConsumerConfig.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ConsumerConfig.scala?rev=1354094&r1=1354093&r2=1354094&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ConsumerConfig.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ConsumerConfig.scala Tue Jun 26 16:23:14 2012
@@ -33,7 +33,7 @@ object ConsumerConfig {
   val AutoOffsetReset = OffsetRequest.SmallestTimeString
   val ConsumerTimeoutMs = -1
   val MinFetchBytes = 1
-  val MaxFetchWaitMs = 3000
+  val MaxFetchWaitMs = 100
   val MirrorTopicsWhitelist = ""
   val MirrorTopicsBlacklist = ""
   val MirrorConsumerNumThreads = 1

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/FetcherRunnable.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/FetcherRunnable.scala?rev=1354094&r1=1354093&r2=1354094&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/FetcherRunnable.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/FetcherRunnable.scala Tue Jun 26 16:23:14 2012
@@ -67,7 +67,8 @@ class FetcherRunnable(val name: String,
         val fetchRequest = builder.build()
         val start = System.currentTimeMillis
         val response = simpleConsumer.fetch(fetchRequest)
-        trace("Fetch completed in " + (System.currentTimeMillis - start) + " ms with max wait of " + config.maxFetchWaitMs)
+        trace("Fetch request %s completed in %d ms with max wait of %d".format(fetchRequest,
+          (System.currentTimeMillis - start), config.maxFetchWaitMs))
 
         var read = 0L
         for(infopti <- partitionTopicInfos) {

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/Log.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/Log.scala?rev=1354094&r1=1354093&r2=1354094&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/Log.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/Log.scala Tue Jun 26 16:23:14 2012
@@ -301,7 +301,6 @@ private[kafka] class Log(val dir: File, 
    * The byte offset of the message that will be appended next.
    */
   def nextAppendOffset: Long = {
-    flush
     val last = segments.view.last
     last.start + last.size
   }
@@ -329,6 +328,7 @@ private[kafka] class Log(val dir: File, 
    */
   def roll() {
     lock synchronized {
+      flush
       val newOffset = nextAppendOffset
       val newFile = new File(dir, nameFromOffset(newOffset))
       if (newFile.exists) {