You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Charles Chao <ch...@bluecava.com> on 2016/01/22 20:30:11 UTC

Use KafkaRDD to Batch Process Messages from Kafka

Hi,

I have been using DirectKafkaInputDStream in Spark Streaming to consumer kafka messages and it's been working very well. Now I have the need to batch process messages from Kafka, for example, retrieve all messages every hour and process them, output to destinations like Hive or HDFS. I would like to use KafkaRDD and normal Spark job to achieve this, so that many of the logics in my streaming code can be reused.

In the excellent blog post Exactly-Once Spark Streaming from Apache Kafka, there are code examples about using KafkaRDD. However, it requires an array of OffsetRange, which needs specify the start and end offset.

My question is, should I write additional code to talk to Kafka and retrieve the latest offset for each partition every time this hourly job is run? Or is there any way to let KafkaUtils know to "read till latest" when creating the KafkaRDD?

Thanks,

Charles


Re: Use KafkaRDD to Batch Process Messages from Kafka

Posted by Charles Chao <xp...@gmail.com>.
Thanks a lot for the help! I'll definately check out the
KafkaCluster.scala. I probably first try use that api from java, and later
try to build the subproject.

thanks,

Charles

On Fri, Jan 22, 2016 at 12:26 PM, Cody Koeninger <co...@koeninger.org> wrote:

> Yes, you should query Kafka if you want to know the latest available
> offsets.
>
> There's code to make this straightforward in KafkaCluster.scala, but the
> interface isnt public.  There's an outstanding pull request to expose the
> api at
>
> https://issues.apache.org/jira/browse/SPARK-10963
>
> but frankly it appears unlikely that a committer will merge it.
>
> Your options are:
>  - use that api from java (since javac apparently doesn't respect scala
> privacy)
> - apply that patch or its equivalent and rebuild (just the
> spark-streaming-kafka subproject, you don't have to redeploy spark)
> - write / find equivalent code yourself
>
> If you want to build a patched version of the subproject and need a hand,
> just ask on the list.
>
>
> On Fri, Jan 22, 2016 at 1:30 PM, Charles Chao <ch...@bluecava.com>
> wrote:
>
>> Hi,
>>
>> I have been using DirectKafkaInputDStream in Spark Streaming to consumer
>> kafka messages and it’s been working very well. Now I have the need to
>> batch process messages from Kafka, for example, retrieve all messages every
>> hour and process them, output to destinations like Hive or HDFS. I would
>> like to use KafkaRDD and normal Spark job to achieve this, so that many of
>> the logics in my streaming code can be reused.
>>
>> In the excellent blog post *Exactly-Once Spark Streaming from Apache
>> Kafka*, there are code examples about using KafkaRDD. However, it
>> requires an array of OffsetRange, which needs specify the start and end
>> offset.
>>
>> My question is, should I write additional code to talk to Kafka and
>> retrieve the latest offset for each partition every time this hourly job is
>> run? Or is there any way to let KafkaUtils know to “read till latest” when
>> creating the KafkaRDD?
>>
>> Thanks,
>>
>> Charles
>>
>>
>

Re: Use KafkaRDD to Batch Process Messages from Kafka

Posted by Cody Koeninger <co...@koeninger.org>.
Yes, you should query Kafka if you want to know the latest available
offsets.

There's code to make this straightforward in KafkaCluster.scala, but the
interface isnt public.  There's an outstanding pull request to expose the
api at

https://issues.apache.org/jira/browse/SPARK-10963

but frankly it appears unlikely that a committer will merge it.

Your options are:
 - use that api from java (since javac apparently doesn't respect scala
privacy)
- apply that patch or its equivalent and rebuild (just the
spark-streaming-kafka subproject, you don't have to redeploy spark)
- write / find equivalent code yourself

If you want to build a patched version of the subproject and need a hand,
just ask on the list.


On Fri, Jan 22, 2016 at 1:30 PM, Charles Chao <ch...@bluecava.com>
wrote:

> Hi,
>
> I have been using DirectKafkaInputDStream in Spark Streaming to consumer
> kafka messages and it’s been working very well. Now I have the need to
> batch process messages from Kafka, for example, retrieve all messages every
> hour and process them, output to destinations like Hive or HDFS. I would
> like to use KafkaRDD and normal Spark job to achieve this, so that many of
> the logics in my streaming code can be reused.
>
> In the excellent blog post *Exactly-Once Spark Streaming from Apache
> Kafka*, there are code examples about using KafkaRDD. However, it
> requires an array of OffsetRange, which needs specify the start and end
> offset.
>
> My question is, should I write additional code to talk to Kafka and
> retrieve the latest offset for each partition every time this hourly job is
> run? Or is there any way to let KafkaUtils know to “read till latest” when
> creating the KafkaRDD?
>
> Thanks,
>
> Charles
>
>