You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@samza.apache.org by Paul <ts...@gmail.com> on 2020/12/17 16:05:20 UTC
Manipulating Starting Offsets for Input Streams
Hi All,
I am using the Samza low level API and would like to be able to manipulate the offsets for my Kafka input topics to be able to reprocess data from a previous point in time.
It looks like the functionality to do this was introduced by SEP-18. Does anyone know of any code example showing how to use this feature?
Thanks.
Re: Manipulating Starting Offsets for Input Streams
Posted by Lakshmi Manasa <la...@gmail.com>.
Hi Paul,
Similar to CheckpointTool (see Manipulating checkpoints manually)
<https://samza.apache.org/learn/documentation/latest/container/checkpointing.html>
a
StartpointTool can be written. The
org.apache.samza.checkpoint.CheckpointTool takes in the config.properties
and offset.properties as input files which determine the job & its config
and the new offsets. For StartpointTool will need the same input files but
the second one will have SSP and new Startpoints.
Attaching a barebones tool based on CheckpointTool code.
regards,
Manasa
StartpointTool.scala
object StartpointTool {
type SSPToStartpointMap = Map[SystemStreamPartition, Startpoint]]
class SPToolCommandLine extends CommandLine with Logging {
var newOffsets: SSPToStartpointMap = _
def parseOffsets(propertiesFile: Properties): SSPToStartpointMap = {
// similar to
org.apache.samza.checkpoint.CheckpointTool.CheckpointToolCommandLine.parseOffsets
to get startpoints
// startpoints can be offset, timestamp, oldest, upcoming -
please see org.apache.samza.startpoint.{StartpointOldest,
StartpointTimestamp..}
// based on the format in second input file to the tool, parse
to get startpoint type and value ad add for SSP as key
}
override def loadConfig(options: OptionSet): Config = {
// similar to
org.apache.samza.checkpoint.CheckpointTool.CheckpointToolCommandLine.loadConfig
newOffsets = parseOffsets(..)
super.loadConfig(options);
}
}
def apply(config: Config, offsets: SSPToStartpointMap): CheckpointTool = {
val metadataStore: CoordinatorStreamStore = new
CoordinatorStreamStore(config, new MetricsRegistryMap())
metadataStore.init()
new StartpointTool(offsets, metadataStore, config)
}
def main(args: Array[String]) {
// pretty much the same as Checkpoint tool
val cmdline = new SPToolCommandLine
val options = cmdline.parser.parse(args: _*)
val userConfig = cmdline.loadConfig(options)
val jobConfig = JobPlanner.generateSingleJobConfig(userConfig)
val rewrittenConfig = ConfigUtil.rewriteConfig(jobConfig)
val tool = StartpointTool(rewrittenConfig, cmdline.newOffsets)
tool.run()
}
}
class StartpointTool(newOffsets: SSPToStartpointMap,
coordinatorStreamStore: CoordinatorStreamStore, userDefinedConfig:
Config) extends Logging {
def run() {
val startpointManager: StartpointManager = new
StartpointManager(coordinatorStreamStore);
startpointManager.start();
startpointManager.writeStartpoint(ssp, startpoint)// for each ssp
and startpoint from newOffsets -- taskname is not really needed
startpointManager.stop()
coordinatorStreamStore.close()
}
}
On Thu, Dec 17, 2020 at 8:05 AM Paul <ts...@gmail.com> wrote:
> Hi All,
>
> I am using the Samza low level API and would like to be able to manipulate
> the offsets for my Kafka input topics to be able to reprocess data from a
> previous point in time.
>
> It looks like the functionality to do this was introduced by SEP-18. Does
> anyone know of any code example showing how to use this feature?
>
> Thanks.