You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Susan Zhang <su...@gmail.com> on 2015/08/22 01:01:57 UTC

Streaming: BatchTime OffsetRange Mapping?

I've been reading documentation on accessing offsetRanges and updating ZK
yourself when using DirectKafkaInputDStream (from createDirectStream),
along with the code changes in this PR:
https://github.com/apache/spark/pull/4805.

I'm planning on adding a listener to update ZK (for monitoring purposes)
when batch completes. What would be a consistent manner to index the
offsets for a given batch? In the PR above, it seems like batchTime was
used, but is there a way to create this batchTime -> offsets in the
streaming app itself?

Something like:

var currOffsetRanges = Array[OffsetRange]()
 directKafkaStream.transform { rdd =>
   currOffsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
   rdd
 }.foreachRDD { rdd =>
   ... /*DO STUFF*/
}

offsetMap += ((validTime, currOffsetRanges))

Then in the listener (onBatchComplete), retrieve corresponding offsetRanges
associated with the completed batchTime and update ZK accordingly.

I'm unsure how to define validTime above. Any help/advice would be
appreciated.


Thanks!