You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Shushant Arora <sh...@gmail.com> on 2015/07/28 14:48:17 UTC
spark streaming get kafka individual message's offset and partition no
Hi
I am processing kafka messages using spark streaming 1.3.
I am using mapPartitions function to process kafka message.
How can I access offset no of individual message getting being processed.
JavaPairInputDStream<byte[], byte[]> directKafkaStream
=KafkaUtils.createDirectStream(..);
directKafkaStream.mapPartitions(new
FlatMapFunction<Iterator<Tuple2<byte[],byte[]>>, String>() {
public Iterable<String> call(Iterator<Tuple2<byte[], byte[]>> t)
throws Exception {
while(t.hasNext()){
Tuple2<byte[], byte[]> tuple = t.next();
byte[] key = tuple._1();
byte[] msg = tuple._2();
///how to get kafka partition no and offset of this message
}
}
});
Re: spark streaming get kafka individual message's offset and
partition no
Posted by Cody Koeninger <co...@koeninger.org>.
You don't have to use some other package in order to get access to the
offsets.
Shushant, have you read the available documentation at
http://spark.apache.org/docs/latest/streaming-kafka-integration.html
https://github.com/koeninger/kafka-exactly-once/blob/master/blogpost.md
or watched
https://www.youtube.com/watch?v=fXnNEq1v3VA
The kafka partition number is the same as the spark partition number. The
messages for a given partition are in offset order without gaps, so you can
use the offset range to determine the offset for a given message. Or you
can use the messageHandler argument to KafkaUtils.createDirectStream to get
access to all of the MessageAndMetadata, including partition and offset, on
a per-message basis.
On Tue, Jul 28, 2015 at 7:48 AM, Shushant Arora <sh...@gmail.com>
wrote:
> Hi
>
> I am processing kafka messages using spark streaming 1.3.
>
> I am using mapPartitions function to process kafka message.
> How can I access offset no of individual message getting being processed.
>
>
> JavaPairInputDStream<byte[], byte[]> directKafkaStream
> =KafkaUtils.createDirectStream(..);
>
> directKafkaStream.mapPartitions(new
> FlatMapFunction<Iterator<Tuple2<byte[],byte[]>>, String>() {
> public Iterable<String> call(Iterator<Tuple2<byte[], byte[]>> t)
> throws Exception {
>
> while(t.hasNext()){
> Tuple2<byte[], byte[]> tuple = t.next();
> byte[] key = tuple._1();
> byte[] msg = tuple._2();
> ///how to get kafka partition no and offset of this message
> }
> }
> });
>
>
>
>
>
Re: spark streaming get kafka individual message's offset and
partition no
Posted by Dibyendu Bhattacharya <di...@gmail.com>.
If you want the offset of individual kafka messages , you can use this
consumer form Spark Packages ..
http://spark-packages.org/package/dibbhatt/kafka-spark-consumer
Regards,
Dibyendu
On Tue, Jul 28, 2015 at 6:18 PM, Shushant Arora <sh...@gmail.com>
wrote:
> Hi
>
> I am processing kafka messages using spark streaming 1.3.
>
> I am using mapPartitions function to process kafka message.
> How can I access offset no of individual message getting being processed.
>
>
> JavaPairInputDStream<byte[], byte[]> directKafkaStream
> =KafkaUtils.createDirectStream(..);
>
> directKafkaStream.mapPartitions(new
> FlatMapFunction<Iterator<Tuple2<byte[],byte[]>>, String>() {
> public Iterable<String> call(Iterator<Tuple2<byte[], byte[]>> t)
> throws Exception {
>
> while(t.hasNext()){
> Tuple2<byte[], byte[]> tuple = t.next();
> byte[] key = tuple._1();
> byte[] msg = tuple._2();
> ///how to get kafka partition no and offset of this message
> }
> }
> });
>
>
>
>
>