You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Srikanth <sr...@gmail.com> on 2016/07/22 16:29:44 UTC

Rebalancing when adding kafka partitions

Hello,

I'd like to understand how Spark Streaming(direct) would handle Kafka
partition addition?
Will a running job be aware of new partitions and read from it?
Since it uses Kafka APIs to query offsets and offsets are handled
internally.

Srikanth

Re: Rebalancing when adding kafka partitions

Posted by Cody Koeninger <co...@koeninger.org>.
The underlying kafka consumer

On Tue, Aug 16, 2016 at 2:17 PM, Srikanth <sr...@gmail.com> wrote:
> Yes, SubscribePattern detects new partition. Also, it has a comment saying
>
>> Subscribe to all topics matching specified pattern to get dynamically
>> assigned partitions.
>>  * The pattern matching will be done periodically against topics existing
>> at the time of check.
>>  * @param pattern pattern to subscribe to
>>  * @param kafkaParams Kafka
>
>
> Who does the new partition discover? Underlying kafka consumer or
> spark-streaming-kafka-0-10-assembly??
>
> Srikanth
>
> On Fri, Aug 12, 2016 at 5:15 PM, Cody Koeninger <co...@koeninger.org> wrote:
>>
>> Hrrm, that's interesting. Did you try with subscribe pattern, out of
>> curiosity?
>>
>> I haven't tested repartitioning on the  underlying new Kafka consumer, so
>> its possible I misunderstood something.
>>
>> On Aug 12, 2016 2:47 PM, "Srikanth" <sr...@gmail.com> wrote:
>>>
>>> I did try a test with spark 2.0 + spark-streaming-kafka-0-10-assembly.
>>> Partition was increased using "bin/kafka-topics.sh --alter" after spark
>>> job was started.
>>> I don't see messages from new partitions in the DStream.
>>>
>>>>     KafkaUtils.createDirectStream[Array[Byte], Array[Byte]] (
>>>>         ssc, PreferConsistent, Subscribe[Array[Byte],
>>>> Array[Byte]](topics, kafkaParams) )
>>>>     .map(r => (r.key(), r.value()))
>>>
>>>
>>> Also, no.of partitions did not increase too.
>>>>
>>>>     dataStream.foreachRDD( (rdd, curTime) => {
>>>>      logger.info(s"rdd has ${rdd.getNumPartitions} partitions.")
>>>
>>>
>>> Should I be setting some parameter/config? Is the doc for new integ
>>> available?
>>>
>>> Thanks,
>>> Srikanth
>>>
>>> On Fri, Jul 22, 2016 at 2:15 PM, Cody Koeninger <co...@koeninger.org>
>>> wrote:
>>>>
>>>> No, restarting from a checkpoint won't do it, you need to re-define the
>>>> stream.
>>>>
>>>> Here's the jira for the 0.10 integration
>>>>
>>>> https://issues.apache.org/jira/browse/SPARK-12177
>>>>
>>>> I haven't gotten docs completed yet, but there are examples at
>>>>
>>>> https://github.com/koeninger/kafka-exactly-once/tree/kafka-0.10
>>>>
>>>> On Fri, Jul 22, 2016 at 1:05 PM, Srikanth <sr...@gmail.com> wrote:
>>>> > In Spark 1.x, if we restart from a checkpoint, will it read from new
>>>> > partitions?
>>>> >
>>>> > If you can, pls point us to some doc/link that talks about Kafka 0.10
>>>> > integ
>>>> > in Spark 2.0.
>>>> >
>>>> > On Fri, Jul 22, 2016 at 1:33 PM, Cody Koeninger <co...@koeninger.org>
>>>> > wrote:
>>>> >>
>>>> >> For the integration for kafka 0.8, you are literally starting a
>>>> >> streaming job against a fixed set of topicapartitions,  It will not
>>>> >> change throughout the job, so you'll need to restart the spark job if
>>>> >> you change kafka partitions.
>>>> >>
>>>> >> For the integration for kafka 0.10 / spark 2.0, if you use subscribe
>>>> >> or subscribepattern, it should pick up new partitions as they are
>>>> >> added.
>>>> >>
>>>> >> On Fri, Jul 22, 2016 at 11:29 AM, Srikanth <sr...@gmail.com>
>>>> >> wrote:
>>>> >> > Hello,
>>>> >> >
>>>> >> > I'd like to understand how Spark Streaming(direct) would handle
>>>> >> > Kafka
>>>> >> > partition addition?
>>>> >> > Will a running job be aware of new partitions and read from it?
>>>> >> > Since it uses Kafka APIs to query offsets and offsets are handled
>>>> >> > internally.
>>>> >> >
>>>> >> > Srikanth
>>>> >
>>>> >
>>>
>>>
>

---------------------------------------------------------------------
To unsubscribe e-mail: user-unsubscribe@spark.apache.org


Re: Rebalancing when adding kafka partitions

Posted by Srikanth <sr...@gmail.com>.
Yes, SubscribePattern detects new partition. Also, it has a comment saying

Subscribe to all topics matching specified pattern to get dynamically
> assigned partitions.
>  * The pattern matching will be done periodically against topics existing
> at the time of check.
>  * @param pattern pattern to subscribe to
>  * @param kafkaParams Kafka


Who does the new partition discover? Underlying kafka consumer or
spark-streaming-kafka-0-10-assembly??

Srikanth

On Fri, Aug 12, 2016 at 5:15 PM, Cody Koeninger <co...@koeninger.org> wrote:

> Hrrm, that's interesting. Did you try with subscribe pattern, out of
> curiosity?
>
> I haven't tested repartitioning on the  underlying new Kafka consumer, so
> its possible I misunderstood something.
> On Aug 12, 2016 2:47 PM, "Srikanth" <sr...@gmail.com> wrote:
>
>> I did try a test with spark 2.0 + spark-streaming-kafka-0-10-assembly.
>> Partition was increased using "bin/kafka-topics.sh --alter" after spark
>> job was started.
>> I don't see messages from new partitions in the DStream.
>>
>>     KafkaUtils.createDirectStream[Array[Byte], Array[Byte]] (
>>>         ssc, PreferConsistent, Subscribe[Array[Byte],
>>> Array[Byte]](topics, kafkaParams) )
>>>     .map(r => (r.key(), r.value()))
>>
>>
>> Also, no.of partitions did not increase too.
>>
>>>     dataStream.foreachRDD( (rdd, curTime) => {
>>>      logger.info(s"rdd has ${rdd.getNumPartitions} partitions.")
>>
>>
>> Should I be setting some parameter/config? Is the doc for new integ
>> available?
>>
>> Thanks,
>> Srikanth
>>
>> On Fri, Jul 22, 2016 at 2:15 PM, Cody Koeninger <co...@koeninger.org>
>> wrote:
>>
>>> No, restarting from a checkpoint won't do it, you need to re-define the
>>> stream.
>>>
>>> Here's the jira for the 0.10 integration
>>>
>>> https://issues.apache.org/jira/browse/SPARK-12177
>>>
>>> I haven't gotten docs completed yet, but there are examples at
>>>
>>> https://github.com/koeninger/kafka-exactly-once/tree/kafka-0.10
>>>
>>> On Fri, Jul 22, 2016 at 1:05 PM, Srikanth <sr...@gmail.com> wrote:
>>> > In Spark 1.x, if we restart from a checkpoint, will it read from new
>>> > partitions?
>>> >
>>> > If you can, pls point us to some doc/link that talks about Kafka 0.10
>>> integ
>>> > in Spark 2.0.
>>> >
>>> > On Fri, Jul 22, 2016 at 1:33 PM, Cody Koeninger <co...@koeninger.org>
>>> wrote:
>>> >>
>>> >> For the integration for kafka 0.8, you are literally starting a
>>> >> streaming job against a fixed set of topicapartitions,  It will not
>>> >> change throughout the job, so you'll need to restart the spark job if
>>> >> you change kafka partitions.
>>> >>
>>> >> For the integration for kafka 0.10 / spark 2.0, if you use subscribe
>>> >> or subscribepattern, it should pick up new partitions as they are
>>> >> added.
>>> >>
>>> >> On Fri, Jul 22, 2016 at 11:29 AM, Srikanth <sr...@gmail.com>
>>> wrote:
>>> >> > Hello,
>>> >> >
>>> >> > I'd like to understand how Spark Streaming(direct) would handle
>>> Kafka
>>> >> > partition addition?
>>> >> > Will a running job be aware of new partitions and read from it?
>>> >> > Since it uses Kafka APIs to query offsets and offsets are handled
>>> >> > internally.
>>> >> >
>>> >> > Srikanth
>>> >
>>> >
>>>
>>
>>

Re: Rebalancing when adding kafka partitions

Posted by Cody Koeninger <co...@koeninger.org>.
Hrrm, that's interesting. Did you try with subscribe pattern, out of
curiosity?

I haven't tested repartitioning on the  underlying new Kafka consumer, so
its possible I misunderstood something.
On Aug 12, 2016 2:47 PM, "Srikanth" <sr...@gmail.com> wrote:

> I did try a test with spark 2.0 + spark-streaming-kafka-0-10-assembly.
> Partition was increased using "bin/kafka-topics.sh --alter" after spark
> job was started.
> I don't see messages from new partitions in the DStream.
>
>     KafkaUtils.createDirectStream[Array[Byte], Array[Byte]] (
>>         ssc, PreferConsistent, Subscribe[Array[Byte],
>> Array[Byte]](topics, kafkaParams) )
>>     .map(r => (r.key(), r.value()))
>
>
> Also, no.of partitions did not increase too.
>
>>     dataStream.foreachRDD( (rdd, curTime) => {
>>      logger.info(s"rdd has ${rdd.getNumPartitions} partitions.")
>
>
> Should I be setting some parameter/config? Is the doc for new integ
> available?
>
> Thanks,
> Srikanth
>
> On Fri, Jul 22, 2016 at 2:15 PM, Cody Koeninger <co...@koeninger.org>
> wrote:
>
>> No, restarting from a checkpoint won't do it, you need to re-define the
>> stream.
>>
>> Here's the jira for the 0.10 integration
>>
>> https://issues.apache.org/jira/browse/SPARK-12177
>>
>> I haven't gotten docs completed yet, but there are examples at
>>
>> https://github.com/koeninger/kafka-exactly-once/tree/kafka-0.10
>>
>> On Fri, Jul 22, 2016 at 1:05 PM, Srikanth <sr...@gmail.com> wrote:
>> > In Spark 1.x, if we restart from a checkpoint, will it read from new
>> > partitions?
>> >
>> > If you can, pls point us to some doc/link that talks about Kafka 0.10
>> integ
>> > in Spark 2.0.
>> >
>> > On Fri, Jul 22, 2016 at 1:33 PM, Cody Koeninger <co...@koeninger.org>
>> wrote:
>> >>
>> >> For the integration for kafka 0.8, you are literally starting a
>> >> streaming job against a fixed set of topicapartitions,  It will not
>> >> change throughout the job, so you'll need to restart the spark job if
>> >> you change kafka partitions.
>> >>
>> >> For the integration for kafka 0.10 / spark 2.0, if you use subscribe
>> >> or subscribepattern, it should pick up new partitions as they are
>> >> added.
>> >>
>> >> On Fri, Jul 22, 2016 at 11:29 AM, Srikanth <sr...@gmail.com>
>> wrote:
>> >> > Hello,
>> >> >
>> >> > I'd like to understand how Spark Streaming(direct) would handle Kafka
>> >> > partition addition?
>> >> > Will a running job be aware of new partitions and read from it?
>> >> > Since it uses Kafka APIs to query offsets and offsets are handled
>> >> > internally.
>> >> >
>> >> > Srikanth
>> >
>> >
>>
>
>

Re: Rebalancing when adding kafka partitions

Posted by Srikanth <sr...@gmail.com>.
I did try a test with spark 2.0 + spark-streaming-kafka-0-10-assembly.
Partition was increased using "bin/kafka-topics.sh --alter" after spark job
was started.
I don't see messages from new partitions in the DStream.

    KafkaUtils.createDirectStream[Array[Byte], Array[Byte]] (
>         ssc, PreferConsistent, Subscribe[Array[Byte], Array[Byte]](topics,
> kafkaParams) )
>     .map(r => (r.key(), r.value()))


Also, no.of partitions did not increase too.

>     dataStream.foreachRDD( (rdd, curTime) => {
>      logger.info(s"rdd has ${rdd.getNumPartitions} partitions.")


Should I be setting some parameter/config? Is the doc for new integ
available?

Thanks,
Srikanth

On Fri, Jul 22, 2016 at 2:15 PM, Cody Koeninger <co...@koeninger.org> wrote:

> No, restarting from a checkpoint won't do it, you need to re-define the
> stream.
>
> Here's the jira for the 0.10 integration
>
> https://issues.apache.org/jira/browse/SPARK-12177
>
> I haven't gotten docs completed yet, but there are examples at
>
> https://github.com/koeninger/kafka-exactly-once/tree/kafka-0.10
>
> On Fri, Jul 22, 2016 at 1:05 PM, Srikanth <sr...@gmail.com> wrote:
> > In Spark 1.x, if we restart from a checkpoint, will it read from new
> > partitions?
> >
> > If you can, pls point us to some doc/link that talks about Kafka 0.10
> integ
> > in Spark 2.0.
> >
> > On Fri, Jul 22, 2016 at 1:33 PM, Cody Koeninger <co...@koeninger.org>
> wrote:
> >>
> >> For the integration for kafka 0.8, you are literally starting a
> >> streaming job against a fixed set of topicapartitions,  It will not
> >> change throughout the job, so you'll need to restart the spark job if
> >> you change kafka partitions.
> >>
> >> For the integration for kafka 0.10 / spark 2.0, if you use subscribe
> >> or subscribepattern, it should pick up new partitions as they are
> >> added.
> >>
> >> On Fri, Jul 22, 2016 at 11:29 AM, Srikanth <sr...@gmail.com>
> wrote:
> >> > Hello,
> >> >
> >> > I'd like to understand how Spark Streaming(direct) would handle Kafka
> >> > partition addition?
> >> > Will a running job be aware of new partitions and read from it?
> >> > Since it uses Kafka APIs to query offsets and offsets are handled
> >> > internally.
> >> >
> >> > Srikanth
> >
> >
>

Re: Rebalancing when adding kafka partitions

Posted by Cody Koeninger <co...@koeninger.org>.
Scaladoc is already in the code, just not the html docs

On Fri, Jul 22, 2016 at 1:46 PM, Srikanth <sr...@gmail.com> wrote:
> Yeah, that's what I thought. We need to redefine not just restart.
> Thanks for the info!
>
> I do see the usage of subscribe[K,V] in your DStreams example.
> Looks simple but its not very obvious how it works :-)
> I'll watch out for the docs and ScalaDoc.
>
> Srikanth
>
> On Fri, Jul 22, 2016 at 2:15 PM, Cody Koeninger <co...@koeninger.org> wrote:
>>
>> No, restarting from a checkpoint won't do it, you need to re-define the
>> stream.
>>
>> Here's the jira for the 0.10 integration
>>
>> https://issues.apache.org/jira/browse/SPARK-12177
>>
>> I haven't gotten docs completed yet, but there are examples at
>>
>> https://github.com/koeninger/kafka-exactly-once/tree/kafka-0.10
>>
>> On Fri, Jul 22, 2016 at 1:05 PM, Srikanth <sr...@gmail.com> wrote:
>> > In Spark 1.x, if we restart from a checkpoint, will it read from new
>> > partitions?
>> >
>> > If you can, pls point us to some doc/link that talks about Kafka 0.10
>> > integ
>> > in Spark 2.0.
>> >
>> > On Fri, Jul 22, 2016 at 1:33 PM, Cody Koeninger <co...@koeninger.org>
>> > wrote:
>> >>
>> >> For the integration for kafka 0.8, you are literally starting a
>> >> streaming job against a fixed set of topicapartitions,  It will not
>> >> change throughout the job, so you'll need to restart the spark job if
>> >> you change kafka partitions.
>> >>
>> >> For the integration for kafka 0.10 / spark 2.0, if you use subscribe
>> >> or subscribepattern, it should pick up new partitions as they are
>> >> added.
>> >>
>> >> On Fri, Jul 22, 2016 at 11:29 AM, Srikanth <sr...@gmail.com>
>> >> wrote:
>> >> > Hello,
>> >> >
>> >> > I'd like to understand how Spark Streaming(direct) would handle Kafka
>> >> > partition addition?
>> >> > Will a running job be aware of new partitions and read from it?
>> >> > Since it uses Kafka APIs to query offsets and offsets are handled
>> >> > internally.
>> >> >
>> >> > Srikanth
>> >
>> >
>
>

---------------------------------------------------------------------
To unsubscribe e-mail: user-unsubscribe@spark.apache.org


Re: Rebalancing when adding kafka partitions

Posted by Srikanth <sr...@gmail.com>.
Yeah, that's what I thought. We need to redefine not just restart.
Thanks for the info!

I do see the usage of subscribe[K,V] in your DStreams example.
Looks simple but its not very obvious how it works :-)
I'll watch out for the docs and ScalaDoc.

Srikanth

On Fri, Jul 22, 2016 at 2:15 PM, Cody Koeninger <co...@koeninger.org> wrote:

> No, restarting from a checkpoint won't do it, you need to re-define the
> stream.
>
> Here's the jira for the 0.10 integration
>
> https://issues.apache.org/jira/browse/SPARK-12177
>
> I haven't gotten docs completed yet, but there are examples at
>
> https://github.com/koeninger/kafka-exactly-once/tree/kafka-0.10
>
> On Fri, Jul 22, 2016 at 1:05 PM, Srikanth <sr...@gmail.com> wrote:
> > In Spark 1.x, if we restart from a checkpoint, will it read from new
> > partitions?
> >
> > If you can, pls point us to some doc/link that talks about Kafka 0.10
> integ
> > in Spark 2.0.
> >
> > On Fri, Jul 22, 2016 at 1:33 PM, Cody Koeninger <co...@koeninger.org>
> wrote:
> >>
> >> For the integration for kafka 0.8, you are literally starting a
> >> streaming job against a fixed set of topicapartitions,  It will not
> >> change throughout the job, so you'll need to restart the spark job if
> >> you change kafka partitions.
> >>
> >> For the integration for kafka 0.10 / spark 2.0, if you use subscribe
> >> or subscribepattern, it should pick up new partitions as they are
> >> added.
> >>
> >> On Fri, Jul 22, 2016 at 11:29 AM, Srikanth <sr...@gmail.com>
> wrote:
> >> > Hello,
> >> >
> >> > I'd like to understand how Spark Streaming(direct) would handle Kafka
> >> > partition addition?
> >> > Will a running job be aware of new partitions and read from it?
> >> > Since it uses Kafka APIs to query offsets and offsets are handled
> >> > internally.
> >> >
> >> > Srikanth
> >
> >
>

Re: Rebalancing when adding kafka partitions

Posted by Cody Koeninger <co...@koeninger.org>.
No, restarting from a checkpoint won't do it, you need to re-define the stream.

Here's the jira for the 0.10 integration

https://issues.apache.org/jira/browse/SPARK-12177

I haven't gotten docs completed yet, but there are examples at

https://github.com/koeninger/kafka-exactly-once/tree/kafka-0.10

On Fri, Jul 22, 2016 at 1:05 PM, Srikanth <sr...@gmail.com> wrote:
> In Spark 1.x, if we restart from a checkpoint, will it read from new
> partitions?
>
> If you can, pls point us to some doc/link that talks about Kafka 0.10 integ
> in Spark 2.0.
>
> On Fri, Jul 22, 2016 at 1:33 PM, Cody Koeninger <co...@koeninger.org> wrote:
>>
>> For the integration for kafka 0.8, you are literally starting a
>> streaming job against a fixed set of topicapartitions,  It will not
>> change throughout the job, so you'll need to restart the spark job if
>> you change kafka partitions.
>>
>> For the integration for kafka 0.10 / spark 2.0, if you use subscribe
>> or subscribepattern, it should pick up new partitions as they are
>> added.
>>
>> On Fri, Jul 22, 2016 at 11:29 AM, Srikanth <sr...@gmail.com> wrote:
>> > Hello,
>> >
>> > I'd like to understand how Spark Streaming(direct) would handle Kafka
>> > partition addition?
>> > Will a running job be aware of new partitions and read from it?
>> > Since it uses Kafka APIs to query offsets and offsets are handled
>> > internally.
>> >
>> > Srikanth
>
>

---------------------------------------------------------------------
To unsubscribe e-mail: user-unsubscribe@spark.apache.org


Re: Rebalancing when adding kafka partitions

Posted by Srikanth <sr...@gmail.com>.
In Spark 1.x, if we restart from a checkpoint, will it read from new
partitions?

If you can, pls point us to some doc/link that talks about Kafka 0.10 integ
in Spark 2.0.

On Fri, Jul 22, 2016 at 1:33 PM, Cody Koeninger <co...@koeninger.org> wrote:

> For the integration for kafka 0.8, you are literally starting a
> streaming job against a fixed set of topicapartitions,  It will not
> change throughout the job, so you'll need to restart the spark job if
> you change kafka partitions.
>
> For the integration for kafka 0.10 / spark 2.0, if you use subscribe
> or subscribepattern, it should pick up new partitions as they are
> added.
>
> On Fri, Jul 22, 2016 at 11:29 AM, Srikanth <sr...@gmail.com> wrote:
> > Hello,
> >
> > I'd like to understand how Spark Streaming(direct) would handle Kafka
> > partition addition?
> > Will a running job be aware of new partitions and read from it?
> > Since it uses Kafka APIs to query offsets and offsets are handled
> > internally.
> >
> > Srikanth
>

Re: Rebalancing when adding kafka partitions

Posted by Cody Koeninger <co...@koeninger.org>.
For the integration for kafka 0.8, you are literally starting a
streaming job against a fixed set of topicapartitions,  It will not
change throughout the job, so you'll need to restart the spark job if
you change kafka partitions.

For the integration for kafka 0.10 / spark 2.0, if you use subscribe
or subscribepattern, it should pick up new partitions as they are
added.

On Fri, Jul 22, 2016 at 11:29 AM, Srikanth <sr...@gmail.com> wrote:
> Hello,
>
> I'd like to understand how Spark Streaming(direct) would handle Kafka
> partition addition?
> Will a running job be aware of new partitions and read from it?
> Since it uses Kafka APIs to query offsets and offsets are handled
> internally.
>
> Srikanth

---------------------------------------------------------------------
To unsubscribe e-mail: user-unsubscribe@spark.apache.org