You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by "Jakob Homan (JIRA)" <ji...@apache.org> on 2013/11/15 21:19:23 UTC

[jira] [Created] (SAMZA-86) GetOffset:getNextOffset needs some work

Jakob Homan created SAMZA-86:
--------------------------------

             Summary: GetOffset:getNextOffset needs some work
                 Key: SAMZA-86
                 URL: https://issues.apache.org/jira/browse/SAMZA-86
             Project: Samza
          Issue Type: Improvement
            Reporter: Jakob Homan


{code}  def getNextOffset(sc: SimpleConsumer with DefaultFetch, tp: TopicAndPartition, lastCheckpointedOffset: String): Long = {
    val offsetRequest = new OffsetRequest(Map(tp -> new PartitionOffsetRequestInfo(getAutoOffset(tp.topic), 1)))
    val offsetResponse = sc.getOffsetsBefore(offsetRequest)
    val partitionOffsetResponse = offsetResponse.partitionErrorAndOffsets.get(tp).getOrElse(toss("Unable to find offset information for %s" format tp))
    val autoOffset = partitionOffsetResponse.offsets.headOption.getOrElse(toss("Got response, but no offsets defined for %s" format tp))

    info("Got offset %d for topic and partition %s" format (autoOffset, tp))

    val actualOffset = Option(lastCheckpointedOffset) match {
      case Some(last) => useLastCheckpointedOffset(sc, last, tp).getOrElse(autoOffset)
      case None => autoOffset
    }
{code} 
lastCheckpointedOffset being coerced into an option here sucks, particularly since at least one QA job is passing in a null as the value, which hits the Some case.  It would be better to make the parameter an Option at the method level and be explicit about what needs to be passed in.



--
This message was sent by Atlassian JIRA
(v6.1#6144)