You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by xiaoyu wang <xi...@gmail.com> on 2012/08/23 18:49:05 UTC

getOffsetsBefore fails for very low volume topic?

Hello,

We recently run into problem getting offsets by time for a very low traffic
topic. Basically, the traffic is so low that kafka always work on one
segment file. I checked the source code and found the following. The last
modified time is always current and therefore isFound is false when it exit
the loop and it returns no offsets.


=============================================================================
package kafka.log

import java.util.concurrent.CopyOnWriteArrayList
import java.util.concurrent.atomic._
import java.text.NumberFormat
import java.io._
import java.nio.channels.FileChannel
import org.apache.log4j._
import kafka.message._
import kafka.utils._
import kafka.common._
import kafka.api.OffsetRequest
import java.util._
....

 def getOffsetsBefore(request: OffsetRequest): Array[Long] = {
    val segsArray = segments.view
    var offsetTimeArray: Array[Tuple2[Long, Long]] = null
    if (segsArray.last.size > 0)
      offsetTimeArray = new Array[Tuple2[Long, Long]](segsArray.length + 1)
    else
      offsetTimeArray = new Array[Tuple2[Long, Long]](segsArray.length)

    for (i <- 0 until segsArray.length)
 *     offsetTimeArray(i) = (segsArray(i).start,
segsArray(i).file.lastModified)*
    if (segsArray.last.size > 0)
      offsetTimeArray(segsArray.length) = (segsArray.last.start +
segsArray.last.messageSet.highWaterMark, SystemTime.milliseconds)

    var startIndex = -1
    request.time match {
      case OffsetRequest.LatestTime =>
        startIndex = offsetTimeArray.length - 1
      case OffsetRequest.EarliestTime =>
        startIndex = 0
      case _ =>
          var isFound = false
          if(logger.isDebugEnabled) {
            logger.debug("Offset time array = " + offsetTimeArray.foreach(o
=> "%d, %d".format(o._1, o._2)))
          }
          startIndex = offsetTimeArray.length - 1
          while (startIndex >= 0 && !isFound) {
*            if (offsetTimeArray(startIndex)._2 <= request.time)*
              isFound = true
            else
              startIndex -=1
          }
    }

    val retSize = request.maxNumOffsets.min(startIndex + 1)
    val ret = new Array[Long](retSize)
    for (j <- 0 until retSize) {
      ret(j) = offsetTimeArray(startIndex)._1
      startIndex -= 1
    }
    ret
  }

Re: getOffsetsBefore fails for very low volume topic?

Posted by xiaoyu wang <xi...@gmail.com>.
Jun,

Thanks for confirming this. We will use the earliest offset.


-Xiaoyu

On Thu, Aug 23, 2012 at 10:37 AM, Jun Rao <ju...@gmail.com> wrote:

> Xiaoyu,
>
> Yes, this is because we rely on the last modified time as a rough estimate
> of the offset time, which is not very accurate. In this case, can you fall
> back to the earliest offset?
>
> Thanks,
>
> Jun
>
> On Thu, Aug 23, 2012 at 9:49 AM, xiaoyu wang <xi...@gmail.com>
> wrote:
>
> > Hello,
> >
> > We recently run into problem getting offsets by time for a very low
> traffic
> > topic. Basically, the traffic is so low that kafka always work on one
> > segment file. I checked the source code and found the following. The last
> > modified time is always current and therefore isFound is false when it
> exit
> > the loop and it returns no offsets.
> >
> >
> >
> >
> =============================================================================
> > package kafka.log
> >
> > import java.util.concurrent.CopyOnWriteArrayList
> > import java.util.concurrent.atomic._
> > import java.text.NumberFormat
> > import java.io._
> > import java.nio.channels.FileChannel
> > import org.apache.log4j._
> > import kafka.message._
> > import kafka.utils._
> > import kafka.common._
> > import kafka.api.OffsetRequest
> > import java.util._
> > ....
> >
> >  def getOffsetsBefore(request: OffsetRequest): Array[Long] = {
> >     val segsArray = segments.view
> >     var offsetTimeArray: Array[Tuple2[Long, Long]] = null
> >     if (segsArray.last.size > 0)
> >       offsetTimeArray = new Array[Tuple2[Long, Long]](segsArray.length +
> 1)
> >     else
> >       offsetTimeArray = new Array[Tuple2[Long, Long]](segsArray.length)
> >
> >     for (i <- 0 until segsArray.length)
> >  *     offsetTimeArray(i) = (segsArray(i).start,
> > segsArray(i).file.lastModified)*
> >     if (segsArray.last.size > 0)
> >       offsetTimeArray(segsArray.length) = (segsArray.last.start +
> > segsArray.last.messageSet.highWaterMark, SystemTime.milliseconds)
> >
> >     var startIndex = -1
> >     request.time match {
> >       case OffsetRequest.LatestTime =>
> >         startIndex = offsetTimeArray.length - 1
> >       case OffsetRequest.EarliestTime =>
> >         startIndex = 0
> >       case _ =>
> >           var isFound = false
> >           if(logger.isDebugEnabled) {
> >             logger.debug("Offset time array = " +
> offsetTimeArray.foreach(o
> > => "%d, %d".format(o._1, o._2)))
> >           }
> >           startIndex = offsetTimeArray.length - 1
> >           while (startIndex >= 0 && !isFound) {
> > *            if (offsetTimeArray(startIndex)._2 <= request.time)*
> >               isFound = true
> >             else
> >               startIndex -=1
> >           }
> >     }
> >
> >     val retSize = request.maxNumOffsets.min(startIndex + 1)
> >     val ret = new Array[Long](retSize)
> >     for (j <- 0 until retSize) {
> >       ret(j) = offsetTimeArray(startIndex)._1
> >       startIndex -= 1
> >     }
> >     ret
> >   }
> >
>

Re: getOffsetsBefore fails for very low volume topic?

Posted by Jun Rao <ju...@gmail.com>.
Xiaoyu,

Yes, this is because we rely on the last modified time as a rough estimate
of the offset time, which is not very accurate. In this case, can you fall
back to the earliest offset?

Thanks,

Jun

On Thu, Aug 23, 2012 at 9:49 AM, xiaoyu wang <xi...@gmail.com> wrote:

> Hello,
>
> We recently run into problem getting offsets by time for a very low traffic
> topic. Basically, the traffic is so low that kafka always work on one
> segment file. I checked the source code and found the following. The last
> modified time is always current and therefore isFound is false when it exit
> the loop and it returns no offsets.
>
>
>
> =============================================================================
> package kafka.log
>
> import java.util.concurrent.CopyOnWriteArrayList
> import java.util.concurrent.atomic._
> import java.text.NumberFormat
> import java.io._
> import java.nio.channels.FileChannel
> import org.apache.log4j._
> import kafka.message._
> import kafka.utils._
> import kafka.common._
> import kafka.api.OffsetRequest
> import java.util._
> ....
>
>  def getOffsetsBefore(request: OffsetRequest): Array[Long] = {
>     val segsArray = segments.view
>     var offsetTimeArray: Array[Tuple2[Long, Long]] = null
>     if (segsArray.last.size > 0)
>       offsetTimeArray = new Array[Tuple2[Long, Long]](segsArray.length + 1)
>     else
>       offsetTimeArray = new Array[Tuple2[Long, Long]](segsArray.length)
>
>     for (i <- 0 until segsArray.length)
>  *     offsetTimeArray(i) = (segsArray(i).start,
> segsArray(i).file.lastModified)*
>     if (segsArray.last.size > 0)
>       offsetTimeArray(segsArray.length) = (segsArray.last.start +
> segsArray.last.messageSet.highWaterMark, SystemTime.milliseconds)
>
>     var startIndex = -1
>     request.time match {
>       case OffsetRequest.LatestTime =>
>         startIndex = offsetTimeArray.length - 1
>       case OffsetRequest.EarliestTime =>
>         startIndex = 0
>       case _ =>
>           var isFound = false
>           if(logger.isDebugEnabled) {
>             logger.debug("Offset time array = " + offsetTimeArray.foreach(o
> => "%d, %d".format(o._1, o._2)))
>           }
>           startIndex = offsetTimeArray.length - 1
>           while (startIndex >= 0 && !isFound) {
> *            if (offsetTimeArray(startIndex)._2 <= request.time)*
>               isFound = true
>             else
>               startIndex -=1
>           }
>     }
>
>     val retSize = request.maxNumOffsets.min(startIndex + 1)
>     val ret = new Array[Long](retSize)
>     for (j <- 0 until retSize) {
>       ret(j) = offsetTimeArray(startIndex)._1
>       startIndex -= 1
>     }
>     ret
>   }
>