You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@samza.apache.org by Paul <ts...@gmail.com> on 2020/12/17 16:05:20 UTC

Manipulating Starting Offsets for Input Streams

Hi All,

I am using the Samza low level API and would like to be able to manipulate the offsets for my Kafka input topics to be able to reprocess data from a previous point in time. 

It looks like the functionality to do this was introduced by SEP-18. Does anyone know of any code example showing how to use this feature?

Thanks.

Re: Manipulating Starting Offsets for Input Streams

Posted by Lakshmi Manasa <la...@gmail.com>.
Hi Paul,

  Similar to CheckpointTool (see Manipulating checkpoints manually)
<https://samza.apache.org/learn/documentation/latest/container/checkpointing.html>
a
StartpointTool can be written. The
org.apache.samza.checkpoint.CheckpointTool takes in the config.properties
and offset.properties as input files which determine the job & its config
and the new offsets. For StartpointTool will need the same input files but
the second one will have SSP and new Startpoints.

Attaching a barebones tool based on CheckpointTool code.

regards,
Manasa

StartpointTool.scala

object StartpointTool {

  type SSPToStartpointMap = Map[SystemStreamPartition, Startpoint]]

  class SPToolCommandLine extends CommandLine with Logging {
    var newOffsets: SSPToStartpointMap = _

    def parseOffsets(propertiesFile: Properties): SSPToStartpointMap = {
      // similar to
org.apache.samza.checkpoint.CheckpointTool.CheckpointToolCommandLine.parseOffsets
to get startpoints
      // startpoints can be offset, timestamp, oldest, upcoming -
please see org.apache.samza.startpoint.{StartpointOldest,
StartpointTimestamp..}

      // based on the format in second input file to the tool, parse
to get startpoint type and value ad add for SSP as key

    }

    override def loadConfig(options: OptionSet): Config = {
      // similar to
org.apache.samza.checkpoint.CheckpointTool.CheckpointToolCommandLine.loadConfig
      newOffsets = parseOffsets(..)
      super.loadConfig(options);
    }
  }

  def apply(config: Config, offsets: SSPToStartpointMap): CheckpointTool = {
    val metadataStore: CoordinatorStreamStore = new
CoordinatorStreamStore(config, new MetricsRegistryMap())
    metadataStore.init()
    new StartpointTool(offsets, metadataStore, config)
  }

  def main(args: Array[String]) {
    // pretty much the same as Checkpoint tool
    val cmdline = new SPToolCommandLine

    val options = cmdline.parser.parse(args: _*)
    val userConfig = cmdline.loadConfig(options)
    val jobConfig = JobPlanner.generateSingleJobConfig(userConfig)
    val rewrittenConfig = ConfigUtil.rewriteConfig(jobConfig)

    val tool = StartpointTool(rewrittenConfig, cmdline.newOffsets)

    tool.run()
  }
}

class StartpointTool(newOffsets: SSPToStartpointMap,
coordinatorStreamStore: CoordinatorStreamStore, userDefinedConfig:
Config) extends Logging {

  def run() {
    val startpointManager: StartpointManager = new
StartpointManager(coordinatorStreamStore);
    startpointManager.start();
    startpointManager.writeStartpoint(ssp, startpoint)// for each ssp
and startpoint from newOffsets -- taskname is not really needed
    startpointManager.stop()
    coordinatorStreamStore.close()
  }

}


On Thu, Dec 17, 2020 at 8:05 AM Paul <ts...@gmail.com> wrote:

> Hi All,
>
> I am using the Samza low level API and would like to be able to manipulate
> the offsets for my Kafka input topics to be able to reprocess data from a
> previous point in time.
>
> It looks like the functionality to do this was introduced by SEP-18. Does
> anyone know of any code example showing how to use this feature?
>
> Thanks.