You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2012/06/26 18:23:15 UTC
svn commit: r1354094 - in
/incubator/kafka/branches/0.8/core/src/main/scala/kafka:
consumer/ConsoleConsumer.scala consumer/ConsumerConfig.scala
consumer/FetcherRunnable.scala log/Log.scala
Author: junrao
Date: Tue Jun 26 16:23:14 2012
New Revision: 1354094
URL: http://svn.apache.org/viewvc?rev=1354094&view=rev
Log:
Consumer doesn't receive all data; patched by Jun Rao; reviewed by John Fung; KAFKA-372
Modified:
incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala
incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ConsumerConfig.scala
incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/FetcherRunnable.scala
incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/Log.scala
Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala?rev=1354094&r1=1354093&r2=1354094&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala Tue Jun 26 16:23:14 2012
@@ -62,7 +62,17 @@ object ConsoleConsumer extends Logging {
.withRequiredArg
.describedAs("size")
.ofType(classOf[java.lang.Integer])
- .defaultsTo(1024 * 1024)
+ .defaultsTo(1024 * 1024)
+ val minFetchBytesOpt = parser.accepts("min-fetch-bytes", "The min number of bytes each fetch request waits for.")
+ .withRequiredArg
+ .describedAs("bytes")
+ .ofType(classOf[java.lang.Integer])
+ .defaultsTo(1)
+ val maxWaitMsOpt = parser.accepts("max-wait-ms", "The max amount of time each fetch request waits.")
+ .withRequiredArg
+ .describedAs("ms")
+ .ofType(classOf[java.lang.Integer])
+ .defaultsTo(100)
val socketBufferSizeOpt = parser.accepts("socket-buffer-size", "The size of the tcp RECV size.")
.withRequiredArg
.describedAs("size")
@@ -116,6 +126,8 @@ object ConsoleConsumer extends Logging {
props.put("groupid", options.valueOf(groupIdOpt))
props.put("socket.buffersize", options.valueOf(socketBufferSizeOpt).toString)
props.put("fetch.size", options.valueOf(fetchSizeOpt).toString)
+ props.put("min.fetch.bytes", options.valueOf(minFetchBytesOpt).toString)
+ props.put("max.fetch.wait.ms", options.valueOf(maxWaitMsOpt).toString)
props.put("auto.commit", "true")
props.put("autocommit.interval.ms", options.valueOf(autoCommitIntervalOpt).toString)
props.put("autooffset.reset", if(options.has(resetBeginningOpt)) "smallest" else "largest")
Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ConsumerConfig.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ConsumerConfig.scala?rev=1354094&r1=1354093&r2=1354094&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ConsumerConfig.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ConsumerConfig.scala Tue Jun 26 16:23:14 2012
@@ -33,7 +33,7 @@ object ConsumerConfig {
val AutoOffsetReset = OffsetRequest.SmallestTimeString
val ConsumerTimeoutMs = -1
val MinFetchBytes = 1
- val MaxFetchWaitMs = 3000
+ val MaxFetchWaitMs = 100
val MirrorTopicsWhitelist = ""
val MirrorTopicsBlacklist = ""
val MirrorConsumerNumThreads = 1
Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/FetcherRunnable.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/FetcherRunnable.scala?rev=1354094&r1=1354093&r2=1354094&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/FetcherRunnable.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/FetcherRunnable.scala Tue Jun 26 16:23:14 2012
@@ -67,7 +67,8 @@ class FetcherRunnable(val name: String,
val fetchRequest = builder.build()
val start = System.currentTimeMillis
val response = simpleConsumer.fetch(fetchRequest)
- trace("Fetch completed in " + (System.currentTimeMillis - start) + " ms with max wait of " + config.maxFetchWaitMs)
+ trace("Fetch request %s completed in %d ms with max wait of %d".format(fetchRequest,
+ (System.currentTimeMillis - start), config.maxFetchWaitMs))
var read = 0L
for(infopti <- partitionTopicInfos) {
Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/Log.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/Log.scala?rev=1354094&r1=1354093&r2=1354094&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/Log.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/Log.scala Tue Jun 26 16:23:14 2012
@@ -301,7 +301,6 @@ private[kafka] class Log(val dir: File,
* The byte offset of the message that will be appended next.
*/
def nextAppendOffset: Long = {
- flush
val last = segments.view.last
last.start + last.size
}
@@ -329,6 +328,7 @@ private[kafka] class Log(val dir: File,
*/
def roll() {
lock synchronized {
+ flush
val newOffset = nextAppendOffset
val newFile = new File(dir, nameFromOffset(newOffset))
if (newFile.exists) {