You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by p pathiyil <pa...@gmail.com> on 2016/02/11 14:59:56 UTC

Spark Streaming with Kafka: Dealing with 'slow' partitions

Hi,

I am looking at a way to isolate the processing of messages from each Kafka
partition within the same driver.

Scenario: A DStream is created with the createDirectStream call by passing
in a few partitions. Let us say that the streaming context is defined to
have a time duration of 2 seconds. If the processing of messages from a
single partition takes more than 2 seconds (while all the others finish
much quicker), it seems that the next set of jobs get scheduled only after
the processing of that last partition. This means that the delay is
effective for all partitions and not just the partition that was truly the
cause of the delay. What I would like to do is to have the delay only
impact the 'slow' partition.

Tried to create one DStream per partition and then do a union of all
partitions, (similar to the sample in
http://spark.apache.org/docs/latest/streaming-programming-guide.html#reducing-the-batch-processing-times),
but that didn't seem to help.

Please suggest the correct approach to solve this issue.

Thanks,
Praveen.

Re: Spark Streaming with Kafka: Dealing with 'slow' partitions

Posted by p pathiyil <pa...@gmail.com>.
Thanks Sebastian.

I was indeed trying out FAIR scheduling with a high value for
concurrentJobs today.

It does improve the latency seen by the non-hot partitions, even if it does
not provide complete isolation. So it might be an acceptable middle ground.
On 12 Feb 2016 12:18, "Sebastian Piu" <se...@gmail.com> wrote:

> Have you tried using fair scheduler and queues
> On 12 Feb 2016 4:24 a.m., "p pathiyil" <pa...@gmail.com> wrote:
>
>> With this setting, I can see that the next job is being executed before
>> the previous one is finished. However, the processing of the 'hot'
>> partition eventually hogs all the concurrent jobs. If there was a way to
>> restrict jobs to be one per partition, then this setting would provide the
>> per-partition isolation.
>>
>> Is there anything in the framework which would give control over that
>> aspect ?
>>
>> Thanks.
>>
>>
>> On Thu, Feb 11, 2016 at 9:55 PM, Cody Koeninger <co...@koeninger.org>
>> wrote:
>>
>>> spark.streaming.concurrentJobs
>>>
>>>
>>> see e.g. http://stackoverflow.com/questions/23528006/how-jobs-are-assigned-to-executors-in-spark-streaming
>>>
>>>
>>> On Thu, Feb 11, 2016 at 9:33 AM, p pathiyil <pa...@gmail.com> wrote:
>>>
>>>> Thanks for the response Cody.
>>>>
>>>> The producers are out of my control, so can't really balance the
>>>> incoming content across the various topics and partitions. The number of
>>>> topics and partitions are quite large and the volume across then not very
>>>> well known ahead of time. So it is quite hard to segregate low and high
>>>> volume topics in to separate driver programs.
>>>>
>>>> Will look at shuffle / repartition.
>>>>
>>>> Could you share the setting for starting another batch in parallel ? It
>>>> might be ok to call the 'save' of the processed messages out of order if
>>>> that is the only consequence of this setting.
>>>>
>>>> When separate DStreams are created per partition (and if union() is not
>>>> called on them), what aspect of the framework still ties the scheduling of
>>>> jobs across the partitions together ? Asking this to see if creating
>>>> multiple threads in the driver and calling createDirectStream per partition
>>>> in those threads can provide isolation.
>>>>
>>>>
>>>>
>>>> On Thu, Feb 11, 2016 at 8:14 PM, Cody Koeninger <co...@koeninger.org>
>>>> wrote:
>>>>
>>>>> The real way to fix this is by changing partitioning, so you don't
>>>>> have a hot partition.  It would be better to do this at the time you're
>>>>> producing messages, but you can also do it with a shuffle / repartition
>>>>> during consuming.
>>>>>
>>>>> There is a setting to allow another batch to start in parallel, but
>>>>> that's likely to have unintended consequences.
>>>>>
>>>>> On Thu, Feb 11, 2016 at 7:59 AM, p pathiyil <pa...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Hi,
>>>>>>
>>>>>> I am looking at a way to isolate the processing of messages from each
>>>>>> Kafka partition within the same driver.
>>>>>>
>>>>>> Scenario: A DStream is created with the createDirectStream call by
>>>>>> passing in a few partitions. Let us say that the streaming context is
>>>>>> defined to have a time duration of 2 seconds. If the processing of messages
>>>>>> from a single partition takes more than 2 seconds (while all the others
>>>>>> finish much quicker), it seems that the next set of jobs get scheduled only
>>>>>> after the processing of that last partition. This means that the delay is
>>>>>> effective for all partitions and not just the partition that was truly the
>>>>>> cause of the delay. What I would like to do is to have the delay only
>>>>>> impact the 'slow' partition.
>>>>>>
>>>>>> Tried to create one DStream per partition and then do a union of all
>>>>>> partitions, (similar to the sample in
>>>>>> http://spark.apache.org/docs/latest/streaming-programming-guide.html#reducing-the-batch-processing-times),
>>>>>> but that didn't seem to help.
>>>>>>
>>>>>> Please suggest the correct approach to solve this issue.
>>>>>>
>>>>>> Thanks,
>>>>>> Praveen.
>>>>>>
>>>>>
>>>>>
>>>>
>>>
>>

Re: Spark Streaming with Kafka: Dealing with 'slow' partitions

Posted by Sebastian Piu <se...@gmail.com>.
Have you tried using fair scheduler and queues
On 12 Feb 2016 4:24 a.m., "p pathiyil" <pa...@gmail.com> wrote:

> With this setting, I can see that the next job is being executed before
> the previous one is finished. However, the processing of the 'hot'
> partition eventually hogs all the concurrent jobs. If there was a way to
> restrict jobs to be one per partition, then this setting would provide the
> per-partition isolation.
>
> Is there anything in the framework which would give control over that
> aspect ?
>
> Thanks.
>
>
> On Thu, Feb 11, 2016 at 9:55 PM, Cody Koeninger <co...@koeninger.org>
> wrote:
>
>> spark.streaming.concurrentJobs
>>
>>
>> see e.g. http://stackoverflow.com/questions/23528006/how-jobs-are-assigned-to-executors-in-spark-streaming
>>
>>
>> On Thu, Feb 11, 2016 at 9:33 AM, p pathiyil <pa...@gmail.com> wrote:
>>
>>> Thanks for the response Cody.
>>>
>>> The producers are out of my control, so can't really balance the
>>> incoming content across the various topics and partitions. The number of
>>> topics and partitions are quite large and the volume across then not very
>>> well known ahead of time. So it is quite hard to segregate low and high
>>> volume topics in to separate driver programs.
>>>
>>> Will look at shuffle / repartition.
>>>
>>> Could you share the setting for starting another batch in parallel ? It
>>> might be ok to call the 'save' of the processed messages out of order if
>>> that is the only consequence of this setting.
>>>
>>> When separate DStreams are created per partition (and if union() is not
>>> called on them), what aspect of the framework still ties the scheduling of
>>> jobs across the partitions together ? Asking this to see if creating
>>> multiple threads in the driver and calling createDirectStream per partition
>>> in those threads can provide isolation.
>>>
>>>
>>>
>>> On Thu, Feb 11, 2016 at 8:14 PM, Cody Koeninger <co...@koeninger.org>
>>> wrote:
>>>
>>>> The real way to fix this is by changing partitioning, so you don't have
>>>> a hot partition.  It would be better to do this at the time you're
>>>> producing messages, but you can also do it with a shuffle / repartition
>>>> during consuming.
>>>>
>>>> There is a setting to allow another batch to start in parallel, but
>>>> that's likely to have unintended consequences.
>>>>
>>>> On Thu, Feb 11, 2016 at 7:59 AM, p pathiyil <pa...@gmail.com> wrote:
>>>>
>>>>> Hi,
>>>>>
>>>>> I am looking at a way to isolate the processing of messages from each
>>>>> Kafka partition within the same driver.
>>>>>
>>>>> Scenario: A DStream is created with the createDirectStream call by
>>>>> passing in a few partitions. Let us say that the streaming context is
>>>>> defined to have a time duration of 2 seconds. If the processing of messages
>>>>> from a single partition takes more than 2 seconds (while all the others
>>>>> finish much quicker), it seems that the next set of jobs get scheduled only
>>>>> after the processing of that last partition. This means that the delay is
>>>>> effective for all partitions and not just the partition that was truly the
>>>>> cause of the delay. What I would like to do is to have the delay only
>>>>> impact the 'slow' partition.
>>>>>
>>>>> Tried to create one DStream per partition and then do a union of all
>>>>> partitions, (similar to the sample in
>>>>> http://spark.apache.org/docs/latest/streaming-programming-guide.html#reducing-the-batch-processing-times),
>>>>> but that didn't seem to help.
>>>>>
>>>>> Please suggest the correct approach to solve this issue.
>>>>>
>>>>> Thanks,
>>>>> Praveen.
>>>>>
>>>>
>>>>
>>>
>>
>

Re: Spark Streaming with Kafka: Dealing with 'slow' partitions

Posted by p pathiyil <pa...@gmail.com>.
With this setting, I can see that the next job is being executed before the
previous one is finished. However, the processing of the 'hot' partition
eventually hogs all the concurrent jobs. If there was a way to restrict
jobs to be one per partition, then this setting would provide the
per-partition isolation.

Is there anything in the framework which would give control over that
aspect ?

Thanks.


On Thu, Feb 11, 2016 at 9:55 PM, Cody Koeninger <co...@koeninger.org> wrote:

> spark.streaming.concurrentJobs
>
>
> see e.g. http://stackoverflow.com/questions/23528006/how-jobs-are-assigned-to-executors-in-spark-streaming
>
>
> On Thu, Feb 11, 2016 at 9:33 AM, p pathiyil <pa...@gmail.com> wrote:
>
>> Thanks for the response Cody.
>>
>> The producers are out of my control, so can't really balance the incoming
>> content across the various topics and partitions. The number of topics and
>> partitions are quite large and the volume across then not very well known
>> ahead of time. So it is quite hard to segregate low and high volume topics
>> in to separate driver programs.
>>
>> Will look at shuffle / repartition.
>>
>> Could you share the setting for starting another batch in parallel ? It
>> might be ok to call the 'save' of the processed messages out of order if
>> that is the only consequence of this setting.
>>
>> When separate DStreams are created per partition (and if union() is not
>> called on them), what aspect of the framework still ties the scheduling of
>> jobs across the partitions together ? Asking this to see if creating
>> multiple threads in the driver and calling createDirectStream per partition
>> in those threads can provide isolation.
>>
>>
>>
>> On Thu, Feb 11, 2016 at 8:14 PM, Cody Koeninger <co...@koeninger.org>
>> wrote:
>>
>>> The real way to fix this is by changing partitioning, so you don't have
>>> a hot partition.  It would be better to do this at the time you're
>>> producing messages, but you can also do it with a shuffle / repartition
>>> during consuming.
>>>
>>> There is a setting to allow another batch to start in parallel, but
>>> that's likely to have unintended consequences.
>>>
>>> On Thu, Feb 11, 2016 at 7:59 AM, p pathiyil <pa...@gmail.com> wrote:
>>>
>>>> Hi,
>>>>
>>>> I am looking at a way to isolate the processing of messages from each
>>>> Kafka partition within the same driver.
>>>>
>>>> Scenario: A DStream is created with the createDirectStream call by
>>>> passing in a few partitions. Let us say that the streaming context is
>>>> defined to have a time duration of 2 seconds. If the processing of messages
>>>> from a single partition takes more than 2 seconds (while all the others
>>>> finish much quicker), it seems that the next set of jobs get scheduled only
>>>> after the processing of that last partition. This means that the delay is
>>>> effective for all partitions and not just the partition that was truly the
>>>> cause of the delay. What I would like to do is to have the delay only
>>>> impact the 'slow' partition.
>>>>
>>>> Tried to create one DStream per partition and then do a union of all
>>>> partitions, (similar to the sample in
>>>> http://spark.apache.org/docs/latest/streaming-programming-guide.html#reducing-the-batch-processing-times),
>>>> but that didn't seem to help.
>>>>
>>>> Please suggest the correct approach to solve this issue.
>>>>
>>>> Thanks,
>>>> Praveen.
>>>>
>>>
>>>
>>
>

Re: Spark Streaming with Kafka: Dealing with 'slow' partitions

Posted by Cody Koeninger <co...@koeninger.org>.
The real way to fix this is by changing partitioning, so you don't have a
hot partition.  It would be better to do this at the time you're producing
messages, but you can also do it with a shuffle / repartition during
consuming.

There is a setting to allow another batch to start in parallel, but that's
likely to have unintended consequences.

On Thu, Feb 11, 2016 at 7:59 AM, p pathiyil <pa...@gmail.com> wrote:

> Hi,
>
> I am looking at a way to isolate the processing of messages from each
> Kafka partition within the same driver.
>
> Scenario: A DStream is created with the createDirectStream call by passing
> in a few partitions. Let us say that the streaming context is defined to
> have a time duration of 2 seconds. If the processing of messages from a
> single partition takes more than 2 seconds (while all the others finish
> much quicker), it seems that the next set of jobs get scheduled only after
> the processing of that last partition. This means that the delay is
> effective for all partitions and not just the partition that was truly the
> cause of the delay. What I would like to do is to have the delay only
> impact the 'slow' partition.
>
> Tried to create one DStream per partition and then do a union of all
> partitions, (similar to the sample in
> http://spark.apache.org/docs/latest/streaming-programming-guide.html#reducing-the-batch-processing-times),
> but that didn't seem to help.
>
> Please suggest the correct approach to solve this issue.
>
> Thanks,
> Praveen.
>