You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Ashish Soni <as...@gmail.com> on 2016/01/25 17:31:51 UTC

Determine Topic MetaData Spark Streaming Job

Hi All ,

What is the best way to tell spark streaming job for the no of partition to
to a given topic -

Should that be provided as a parameter or command line argument
or
We should connect to kafka in the driver program and query it

Map<TopicAndPartition, Long> fromOffsets = new HashMap<TopicAndPartition,
Long>();
fromOffsets.put(new TopicAndPartition(driverArgs.inputTopic, 0), 0L);

Thanks,
Ashish

Re: Determine Topic MetaData Spark Streaming Job

Posted by Gerard Maas <ge...@gmail.com>.
That's precisely what this constructor does:
KafkaUtils.createDirectStream[...](ssc,
kafkaConfig, topics)

Is there a reason to do that yourself?  In that case, look at how it's done
in Spark Streaming for inspiration:
https://github.com/apache/spark/blob/master/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala#L204

-kr, Gerard.




On Mon, Jan 25, 2016 at 5:53 PM, Ashish Soni <as...@gmail.com> wrote:

> Correct what i am trying to achieve is that before the streaming job
> starts query the topic meta data from kafka , determine all the partition
> and provide those to direct API.
>
> So my question is should i consider passing all the partition from command
> line and query kafka and find and provide , what is the correct approach.
>
> Ashish
>
> On Mon, Jan 25, 2016 at 11:38 AM, Gerard Maas <ge...@gmail.com>
> wrote:
>
>> What are you trying to achieve?
>>
>> Looks like you want to provide offsets but you're not managing them
>> and I'm assuming you're using the direct stream approach.
>>
>> In that case, use the simpler constructor that takes the kafka config and
>> the topics. Let it figure it out the offsets (it will contact kafka and
>> request the partitions for the topics provided)
>>
>> KafkaUtils.createDirectStream[...](ssc, kafkaConfig, topics)
>>
>>  -kr, Gerard
>>
>> On Mon, Jan 25, 2016 at 5:31 PM, Ashish Soni <as...@gmail.com>
>> wrote:
>>
>>> Hi All ,
>>>
>>> What is the best way to tell spark streaming job for the no of partition
>>> to to a given topic -
>>>
>>> Should that be provided as a parameter or command line argument
>>> or
>>> We should connect to kafka in the driver program and query it
>>>
>>> Map<TopicAndPartition, Long> fromOffsets = new
>>> HashMap<TopicAndPartition, Long>();
>>> fromOffsets.put(new TopicAndPartition(driverArgs.inputTopic, 0), 0L);
>>>
>>> Thanks,
>>> Ashish
>>>
>>
>>
>

Re: Determine Topic MetaData Spark Streaming Job

Posted by Ashish Soni <as...@gmail.com>.
Correct what i am trying to achieve is that before the streaming job starts
query the topic meta data from kafka , determine all the partition and
provide those to direct API.

So my question is should i consider passing all the partition from command
line and query kafka and find and provide , what is the correct approach.

Ashish

On Mon, Jan 25, 2016 at 11:38 AM, Gerard Maas <ge...@gmail.com> wrote:

> What are you trying to achieve?
>
> Looks like you want to provide offsets but you're not managing them
> and I'm assuming you're using the direct stream approach.
>
> In that case, use the simpler constructor that takes the kafka config and
> the topics. Let it figure it out the offsets (it will contact kafka and
> request the partitions for the topics provided)
>
> KafkaUtils.createDirectStream[...](ssc, kafkaConfig, topics)
>
>  -kr, Gerard
>
> On Mon, Jan 25, 2016 at 5:31 PM, Ashish Soni <as...@gmail.com>
> wrote:
>
>> Hi All ,
>>
>> What is the best way to tell spark streaming job for the no of partition
>> to to a given topic -
>>
>> Should that be provided as a parameter or command line argument
>> or
>> We should connect to kafka in the driver program and query it
>>
>> Map<TopicAndPartition, Long> fromOffsets = new HashMap<TopicAndPartition,
>> Long>();
>> fromOffsets.put(new TopicAndPartition(driverArgs.inputTopic, 0), 0L);
>>
>> Thanks,
>> Ashish
>>
>
>

Re: Determine Topic MetaData Spark Streaming Job

Posted by Gerard Maas <ge...@gmail.com>.
What are you trying to achieve?

Looks like you want to provide offsets but you're not managing them and I'm
assuming you're using the direct stream approach.

In that case, use the simpler constructor that takes the kafka config and
the topics. Let it figure it out the offsets (it will contact kafka and
request the partitions for the topics provided)

KafkaUtils.createDirectStream[...](ssc, kafkaConfig, topics)

 -kr, Gerard

On Mon, Jan 25, 2016 at 5:31 PM, Ashish Soni <as...@gmail.com> wrote:

> Hi All ,
>
> What is the best way to tell spark streaming job for the no of partition
> to to a given topic -
>
> Should that be provided as a parameter or command line argument
> or
> We should connect to kafka in the driver program and query it
>
> Map<TopicAndPartition, Long> fromOffsets = new HashMap<TopicAndPartition,
> Long>();
> fromOffsets.put(new TopicAndPartition(driverArgs.inputTopic, 0), 0L);
>
> Thanks,
> Ashish
>