You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Susan Zhang <su...@gmail.com> on 2015/08/22 01:01:57 UTC
Streaming: BatchTime OffsetRange Mapping?
I've been reading documentation on accessing offsetRanges and updating ZK
yourself when using DirectKafkaInputDStream (from createDirectStream),
along with the code changes in this PR:
https://github.com/apache/spark/pull/4805.
I'm planning on adding a listener to update ZK (for monitoring purposes)
when batch completes. What would be a consistent manner to index the
offsets for a given batch? In the PR above, it seems like batchTime was
used, but is there a way to create this batchTime -> offsets in the
streaming app itself?
Something like:
var currOffsetRanges = Array[OffsetRange]()
directKafkaStream.transform { rdd =>
currOffsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
rdd
}.foreachRDD { rdd =>
... /*DO STUFF*/
}
offsetMap += ((validTime, currOffsetRanges))
Then in the listener (onBatchComplete), retrieve corresponding offsetRanges
associated with the completed batchTime and update ZK accordingly.
I'm unsure how to define validTime above. Any help/advice would be
appreciated.
Thanks!