You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by xiaoyu wang <xi...@gmail.com> on 2012/08/23 18:49:05 UTC
getOffsetsBefore fails for very low volume topic?
Hello,
We recently run into problem getting offsets by time for a very low traffic
topic. Basically, the traffic is so low that kafka always work on one
segment file. I checked the source code and found the following. The last
modified time is always current and therefore isFound is false when it exit
the loop and it returns no offsets.
=============================================================================
package kafka.log
import java.util.concurrent.CopyOnWriteArrayList
import java.util.concurrent.atomic._
import java.text.NumberFormat
import java.io._
import java.nio.channels.FileChannel
import org.apache.log4j._
import kafka.message._
import kafka.utils._
import kafka.common._
import kafka.api.OffsetRequest
import java.util._
....
def getOffsetsBefore(request: OffsetRequest): Array[Long] = {
val segsArray = segments.view
var offsetTimeArray: Array[Tuple2[Long, Long]] = null
if (segsArray.last.size > 0)
offsetTimeArray = new Array[Tuple2[Long, Long]](segsArray.length + 1)
else
offsetTimeArray = new Array[Tuple2[Long, Long]](segsArray.length)
for (i <- 0 until segsArray.length)
* offsetTimeArray(i) = (segsArray(i).start,
segsArray(i).file.lastModified)*
if (segsArray.last.size > 0)
offsetTimeArray(segsArray.length) = (segsArray.last.start +
segsArray.last.messageSet.highWaterMark, SystemTime.milliseconds)
var startIndex = -1
request.time match {
case OffsetRequest.LatestTime =>
startIndex = offsetTimeArray.length - 1
case OffsetRequest.EarliestTime =>
startIndex = 0
case _ =>
var isFound = false
if(logger.isDebugEnabled) {
logger.debug("Offset time array = " + offsetTimeArray.foreach(o
=> "%d, %d".format(o._1, o._2)))
}
startIndex = offsetTimeArray.length - 1
while (startIndex >= 0 && !isFound) {
* if (offsetTimeArray(startIndex)._2 <= request.time)*
isFound = true
else
startIndex -=1
}
}
val retSize = request.maxNumOffsets.min(startIndex + 1)
val ret = new Array[Long](retSize)
for (j <- 0 until retSize) {
ret(j) = offsetTimeArray(startIndex)._1
startIndex -= 1
}
ret
}
Re: getOffsetsBefore fails for very low volume topic?
Posted by xiaoyu wang <xi...@gmail.com>.
Jun,
Thanks for confirming this. We will use the earliest offset.
-Xiaoyu
On Thu, Aug 23, 2012 at 10:37 AM, Jun Rao <ju...@gmail.com> wrote:
> Xiaoyu,
>
> Yes, this is because we rely on the last modified time as a rough estimate
> of the offset time, which is not very accurate. In this case, can you fall
> back to the earliest offset?
>
> Thanks,
>
> Jun
>
> On Thu, Aug 23, 2012 at 9:49 AM, xiaoyu wang <xi...@gmail.com>
> wrote:
>
> > Hello,
> >
> > We recently run into problem getting offsets by time for a very low
> traffic
> > topic. Basically, the traffic is so low that kafka always work on one
> > segment file. I checked the source code and found the following. The last
> > modified time is always current and therefore isFound is false when it
> exit
> > the loop and it returns no offsets.
> >
> >
> >
> >
> =============================================================================
> > package kafka.log
> >
> > import java.util.concurrent.CopyOnWriteArrayList
> > import java.util.concurrent.atomic._
> > import java.text.NumberFormat
> > import java.io._
> > import java.nio.channels.FileChannel
> > import org.apache.log4j._
> > import kafka.message._
> > import kafka.utils._
> > import kafka.common._
> > import kafka.api.OffsetRequest
> > import java.util._
> > ....
> >
> > def getOffsetsBefore(request: OffsetRequest): Array[Long] = {
> > val segsArray = segments.view
> > var offsetTimeArray: Array[Tuple2[Long, Long]] = null
> > if (segsArray.last.size > 0)
> > offsetTimeArray = new Array[Tuple2[Long, Long]](segsArray.length +
> 1)
> > else
> > offsetTimeArray = new Array[Tuple2[Long, Long]](segsArray.length)
> >
> > for (i <- 0 until segsArray.length)
> > * offsetTimeArray(i) = (segsArray(i).start,
> > segsArray(i).file.lastModified)*
> > if (segsArray.last.size > 0)
> > offsetTimeArray(segsArray.length) = (segsArray.last.start +
> > segsArray.last.messageSet.highWaterMark, SystemTime.milliseconds)
> >
> > var startIndex = -1
> > request.time match {
> > case OffsetRequest.LatestTime =>
> > startIndex = offsetTimeArray.length - 1
> > case OffsetRequest.EarliestTime =>
> > startIndex = 0
> > case _ =>
> > var isFound = false
> > if(logger.isDebugEnabled) {
> > logger.debug("Offset time array = " +
> offsetTimeArray.foreach(o
> > => "%d, %d".format(o._1, o._2)))
> > }
> > startIndex = offsetTimeArray.length - 1
> > while (startIndex >= 0 && !isFound) {
> > * if (offsetTimeArray(startIndex)._2 <= request.time)*
> > isFound = true
> > else
> > startIndex -=1
> > }
> > }
> >
> > val retSize = request.maxNumOffsets.min(startIndex + 1)
> > val ret = new Array[Long](retSize)
> > for (j <- 0 until retSize) {
> > ret(j) = offsetTimeArray(startIndex)._1
> > startIndex -= 1
> > }
> > ret
> > }
> >
>
Re: getOffsetsBefore fails for very low volume topic?
Posted by Jun Rao <ju...@gmail.com>.
Xiaoyu,
Yes, this is because we rely on the last modified time as a rough estimate
of the offset time, which is not very accurate. In this case, can you fall
back to the earliest offset?
Thanks,
Jun
On Thu, Aug 23, 2012 at 9:49 AM, xiaoyu wang <xi...@gmail.com> wrote:
> Hello,
>
> We recently run into problem getting offsets by time for a very low traffic
> topic. Basically, the traffic is so low that kafka always work on one
> segment file. I checked the source code and found the following. The last
> modified time is always current and therefore isFound is false when it exit
> the loop and it returns no offsets.
>
>
>
> =============================================================================
> package kafka.log
>
> import java.util.concurrent.CopyOnWriteArrayList
> import java.util.concurrent.atomic._
> import java.text.NumberFormat
> import java.io._
> import java.nio.channels.FileChannel
> import org.apache.log4j._
> import kafka.message._
> import kafka.utils._
> import kafka.common._
> import kafka.api.OffsetRequest
> import java.util._
> ....
>
> def getOffsetsBefore(request: OffsetRequest): Array[Long] = {
> val segsArray = segments.view
> var offsetTimeArray: Array[Tuple2[Long, Long]] = null
> if (segsArray.last.size > 0)
> offsetTimeArray = new Array[Tuple2[Long, Long]](segsArray.length + 1)
> else
> offsetTimeArray = new Array[Tuple2[Long, Long]](segsArray.length)
>
> for (i <- 0 until segsArray.length)
> * offsetTimeArray(i) = (segsArray(i).start,
> segsArray(i).file.lastModified)*
> if (segsArray.last.size > 0)
> offsetTimeArray(segsArray.length) = (segsArray.last.start +
> segsArray.last.messageSet.highWaterMark, SystemTime.milliseconds)
>
> var startIndex = -1
> request.time match {
> case OffsetRequest.LatestTime =>
> startIndex = offsetTimeArray.length - 1
> case OffsetRequest.EarliestTime =>
> startIndex = 0
> case _ =>
> var isFound = false
> if(logger.isDebugEnabled) {
> logger.debug("Offset time array = " + offsetTimeArray.foreach(o
> => "%d, %d".format(o._1, o._2)))
> }
> startIndex = offsetTimeArray.length - 1
> while (startIndex >= 0 && !isFound) {
> * if (offsetTimeArray(startIndex)._2 <= request.time)*
> isFound = true
> else
> startIndex -=1
> }
> }
>
> val retSize = request.maxNumOffsets.min(startIndex + 1)
> val ret = new Array[Long](retSize)
> for (j <- 0 until retSize) {
> ret(j) = offsetTimeArray(startIndex)._1
> startIndex -= 1
> }
> ret
> }
>