You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@helix.apache.org by matt hoffman <ma...@acm.org> on 2014/11/02 01:39:31 UTC

Modeling a pipeline in Helix

I'm doing a quick POC to write an event processing pipeline using Helix,
and I had a couple questions about the best way to model it.

For simplicity, these pipelines are made up of a linear set of stages, and
an item flows through each stage in order.

I'd like to be able to scale a particular pipeline up and down (3 instances
of Pipeline A on the cluster, for example).  That seems straightforward to
do in Helix -- it's just changing the number of replicas of a resource.

Sometimes particular stages have to run on a particular set of machines
(say, a stage that requires a GPU that is only on some machines in the
cluster, for example).  It looks like I would do that in Helix using a
SEMI_AUTO rebalancing mode.

For efficiency, I'd like Helix to try to group as many stages of the same
pipeline onto the same machine as possible.  I don't want it to spread the
stages across the cluster so that a single item has to hop from machine to
machine any more than necessary.  I'm not sure how best to model this in
Helix.
In Helix's world, does it make sense to model stages as "partitions" within
a pipeline "resource"?  Or should the stages be resources themselves?   And
if they are resources, can I define a constraint or rebalancing algorithm
that attempts to colocate them?   I see that the task execution recipe is
pretty similar to what I want, and it models the individual stages as
resources... so I'm guessing that's the best way to model it, but I don't
know enough about Helix yet to know the pros and cons.

I'm also assuming that the new "task" abstraction in Helix 0.7 probably
isn't what I want; it seems to be modeling something with a discrete
execution like a MapReduce mapper, as opposed to a stage that items flow
through.  Am I correct?

Thanks for any advice you can give!

matt

Re: Modeling a pipeline in Helix

Posted by Matt Hoffman <ma...@mhoffman.org>.
Thanks! That's very helpful.

I appreciate it!

Matt
On Nov 3, 2014 2:00 AM, "kishore g" <g....@gmail.com> wrote:

> Thanks for the info. That definitely helps.
>
> It would be better to model each stage as a resource and you can have
> multiple partitions for each stage. To scale each stage you can simply
> increase the number of partitions ( note you mentioned the same in your
> initial email but called them replicas). Each item can go to any of the
> partition since its stateless.
>
> GPU affinity for stages.
>
> This can be achieved by tagging the instances and stage resources. For
> example if node1,node2, node3 have GPU you can tag them as GPU and when you
> create a stage - StageToBeRunOnGPU you can specify the tags that it can be
> assigned to and Helix will distribute them only among the instances with
> that tag.
>
> Also note that each node can have more than one tag.
>
> If you use the tagging feature, the AUTO rebalancer mode should be good
> enough for your use case. If you need some custom placement strategy, you
> can always fall back to custom mode or write your own rebalancer.
>
> Hope that helps.
>
> thanks,
> Kishore G
>
>
>
> On Sun, Nov 2, 2014 at 10:33 AM, Matt Hoffman <ma...@mhoffman.org> wrote:
>
>> Kishore,
>>
>> Thanks for the quick response.  Some replies inline.
>>
>> On Sun, Nov 2, 2014 at 12:52 AM, kishore g <g....@gmail.com> wrote:
>>
>>> Hi Matt,
>>>
>>> When you say scale particular pipeline up/down by adding replicas, do
>>> you mean each replica will process different set of items? If that is the
>>> case then you probably want to increase the number of partitions and not
>>> the replicas.
>>>
>>
>> The pipeline is stateless (at least, as it's implemented right now,
>> without Helix), and we have a load balancer that round-robins requests to
>> the various servers hosting a given pipeline.  I hadn't really thought
>> about partitioning the items, but I suppose we could.
>>
>>
>>> I am assuming these stages are long running. Is this assumption correct?
>>>
>>
>> Depending on what you mean by "long-running", yes.  The stages are
>> typically defined once and then remain available for months or years at a
>> time, so in that sense they're long-running. An individual item passing
>> through the stage typically takes only milliseconds. So you could think of
>> it as a task that takes a few milliseconds to complete (where the task
>> starts when the item comes in, and returns the transformed item when it
>> stops) or as an indefinitely long-running task that processed a endless
>> stream of items.  Since an individual stage is stateless, I think either
>> way of thinking about it is equivalent.
>>
>>
>>> What do you want to happen when a node/process fails?
>>>
>>>
>> It's OK for now if the stages just start up on another node, future items
>> go to the new node, and items in-flight on the failed node fail.
>> Propagating the error back to the client will be interesting if the
>> pipeline spans multiple nodes... I'm not sure how that would work, but I'm
>> assuming that would be some mechanism outside of Helix.
>> It would be better if we could fail over in-flight items, but that'd
>> require us to be able to replay those items on the new node. We'd probably
>> bring in something like Kafka for that.
>>
>> Does that make sense?
>>
>>
>> matt
>>
>>
>>
>>
>>> thanks,
>>> Kishore G
>>>
>>>
>>> On Sat, Nov 1, 2014 at 5:39 PM, matt hoffman <ma...@acm.org>
>>> wrote:
>>>
>>>> I'm doing a quick POC to write an event processing pipeline using
>>>> Helix, and I had a couple questions about the best way to model it.
>>>>
>>>> For simplicity, these pipelines are made up of a linear set of stages,
>>>> and an item flows through each stage in order.
>>>>
>>>> I'd like to be able to scale a particular pipeline up and down (3
>>>> instances of Pipeline A on the cluster, for example).  That seems
>>>> straightforward to do in Helix -- it's just changing the number of replicas
>>>> of a resource.
>>>>
>>>> Sometimes particular stages have to run on a particular set of machines
>>>> (say, a stage that requires a GPU that is only on some machines in the
>>>> cluster, for example).  It looks like I would do that in Helix using a
>>>> SEMI_AUTO rebalancing mode.
>>>>
>>>> For efficiency, I'd like Helix to try to group as many stages of the
>>>> same pipeline onto the same machine as possible.  I don't want it to spread
>>>> the stages across the cluster so that a single item has to hop from machine
>>>> to machine any more than necessary.  I'm not sure how best to model this in
>>>> Helix.
>>>> In Helix's world, does it make sense to model stages as "partitions"
>>>> within a pipeline "resource"?  Or should the stages be resources
>>>> themselves?   And if they are resources, can I define a constraint or
>>>> rebalancing algorithm that attempts to colocate them?   I see that the task
>>>> execution recipe is pretty similar to what I want, and it models the
>>>> individual stages as resources... so I'm guessing that's the best way to
>>>> model it, but I don't know enough about Helix yet to know the pros and cons.
>>>>
>>>> I'm also assuming that the new "task" abstraction in Helix 0.7 probably
>>>> isn't what I want; it seems to be modeling something with a discrete
>>>> execution like a MapReduce mapper, as opposed to a stage that items flow
>>>> through.  Am I correct?
>>>>
>>>> Thanks for any advice you can give!
>>>>
>>>> matt
>>>>
>>>
>>>
>>
>

Re: Modeling a pipeline in Helix

Posted by kishore g <g....@gmail.com>.
Thanks for the info. That definitely helps.

It would be better to model each stage as a resource and you can have
multiple partitions for each stage. To scale each stage you can simply
increase the number of partitions ( note you mentioned the same in your
initial email but called them replicas). Each item can go to any of the
partition since its stateless.

GPU affinity for stages.

This can be achieved by tagging the instances and stage resources. For
example if node1,node2, node3 have GPU you can tag them as GPU and when you
create a stage - StageToBeRunOnGPU you can specify the tags that it can be
assigned to and Helix will distribute them only among the instances with
that tag.

Also note that each node can have more than one tag.

If you use the tagging feature, the AUTO rebalancer mode should be good
enough for your use case. If you need some custom placement strategy, you
can always fall back to custom mode or write your own rebalancer.

Hope that helps.

thanks,
Kishore G



On Sun, Nov 2, 2014 at 10:33 AM, Matt Hoffman <ma...@mhoffman.org> wrote:

> Kishore,
>
> Thanks for the quick response.  Some replies inline.
>
> On Sun, Nov 2, 2014 at 12:52 AM, kishore g <g....@gmail.com> wrote:
>
>> Hi Matt,
>>
>> When you say scale particular pipeline up/down by adding replicas, do you
>> mean each replica will process different set of items? If that is the case
>> then you probably want to increase the number of partitions and not the
>> replicas.
>>
>
> The pipeline is stateless (at least, as it's implemented right now,
> without Helix), and we have a load balancer that round-robins requests to
> the various servers hosting a given pipeline.  I hadn't really thought
> about partitioning the items, but I suppose we could.
>
>
>> I am assuming these stages are long running. Is this assumption correct?
>>
>
> Depending on what you mean by "long-running", yes.  The stages are
> typically defined once and then remain available for months or years at a
> time, so in that sense they're long-running. An individual item passing
> through the stage typically takes only milliseconds. So you could think of
> it as a task that takes a few milliseconds to complete (where the task
> starts when the item comes in, and returns the transformed item when it
> stops) or as an indefinitely long-running task that processed a endless
> stream of items.  Since an individual stage is stateless, I think either
> way of thinking about it is equivalent.
>
>
>> What do you want to happen when a node/process fails?
>>
>>
> It's OK for now if the stages just start up on another node, future items
> go to the new node, and items in-flight on the failed node fail.
> Propagating the error back to the client will be interesting if the
> pipeline spans multiple nodes... I'm not sure how that would work, but I'm
> assuming that would be some mechanism outside of Helix.
> It would be better if we could fail over in-flight items, but that'd
> require us to be able to replay those items on the new node. We'd probably
> bring in something like Kafka for that.
>
> Does that make sense?
>
>
> matt
>
>
>
>
>> thanks,
>> Kishore G
>>
>>
>> On Sat, Nov 1, 2014 at 5:39 PM, matt hoffman <ma...@acm.org> wrote:
>>
>>> I'm doing a quick POC to write an event processing pipeline using Helix,
>>> and I had a couple questions about the best way to model it.
>>>
>>> For simplicity, these pipelines are made up of a linear set of stages,
>>> and an item flows through each stage in order.
>>>
>>> I'd like to be able to scale a particular pipeline up and down (3
>>> instances of Pipeline A on the cluster, for example).  That seems
>>> straightforward to do in Helix -- it's just changing the number of replicas
>>> of a resource.
>>>
>>> Sometimes particular stages have to run on a particular set of machines
>>> (say, a stage that requires a GPU that is only on some machines in the
>>> cluster, for example).  It looks like I would do that in Helix using a
>>> SEMI_AUTO rebalancing mode.
>>>
>>> For efficiency, I'd like Helix to try to group as many stages of the
>>> same pipeline onto the same machine as possible.  I don't want it to spread
>>> the stages across the cluster so that a single item has to hop from machine
>>> to machine any more than necessary.  I'm not sure how best to model this in
>>> Helix.
>>> In Helix's world, does it make sense to model stages as "partitions"
>>> within a pipeline "resource"?  Or should the stages be resources
>>> themselves?   And if they are resources, can I define a constraint or
>>> rebalancing algorithm that attempts to colocate them?   I see that the task
>>> execution recipe is pretty similar to what I want, and it models the
>>> individual stages as resources... so I'm guessing that's the best way to
>>> model it, but I don't know enough about Helix yet to know the pros and cons.
>>>
>>> I'm also assuming that the new "task" abstraction in Helix 0.7 probably
>>> isn't what I want; it seems to be modeling something with a discrete
>>> execution like a MapReduce mapper, as opposed to a stage that items flow
>>> through.  Am I correct?
>>>
>>> Thanks for any advice you can give!
>>>
>>> matt
>>>
>>
>>
>

Re: Modeling a pipeline in Helix

Posted by Matt Hoffman <ma...@mhoffman.org>.
Kishore,

Thanks for the quick response.  Some replies inline.

On Sun, Nov 2, 2014 at 12:52 AM, kishore g <g....@gmail.com> wrote:

> Hi Matt,
>
> When you say scale particular pipeline up/down by adding replicas, do you
> mean each replica will process different set of items? If that is the case
> then you probably want to increase the number of partitions and not the
> replicas.
>

The pipeline is stateless (at least, as it's implemented right now, without
Helix), and we have a load balancer that round-robins requests to the
various servers hosting a given pipeline.  I hadn't really thought about
partitioning the items, but I suppose we could.


> I am assuming these stages are long running. Is this assumption correct?
>

Depending on what you mean by "long-running", yes.  The stages are
typically defined once and then remain available for months or years at a
time, so in that sense they're long-running. An individual item passing
through the stage typically takes only milliseconds. So you could think of
it as a task that takes a few milliseconds to complete (where the task
starts when the item comes in, and returns the transformed item when it
stops) or as an indefinitely long-running task that processed a endless
stream of items.  Since an individual stage is stateless, I think either
way of thinking about it is equivalent.


> What do you want to happen when a node/process fails?
>
>
It's OK for now if the stages just start up on another node, future items
go to the new node, and items in-flight on the failed node fail.
Propagating the error back to the client will be interesting if the
pipeline spans multiple nodes... I'm not sure how that would work, but I'm
assuming that would be some mechanism outside of Helix.
It would be better if we could fail over in-flight items, but that'd
require us to be able to replay those items on the new node. We'd probably
bring in something like Kafka for that.

Does that make sense?


matt




> thanks,
> Kishore G
>
>
> On Sat, Nov 1, 2014 at 5:39 PM, matt hoffman <ma...@acm.org> wrote:
>
>> I'm doing a quick POC to write an event processing pipeline using Helix,
>> and I had a couple questions about the best way to model it.
>>
>> For simplicity, these pipelines are made up of a linear set of stages,
>> and an item flows through each stage in order.
>>
>> I'd like to be able to scale a particular pipeline up and down (3
>> instances of Pipeline A on the cluster, for example).  That seems
>> straightforward to do in Helix -- it's just changing the number of replicas
>> of a resource.
>>
>> Sometimes particular stages have to run on a particular set of machines
>> (say, a stage that requires a GPU that is only on some machines in the
>> cluster, for example).  It looks like I would do that in Helix using a
>> SEMI_AUTO rebalancing mode.
>>
>> For efficiency, I'd like Helix to try to group as many stages of the same
>> pipeline onto the same machine as possible.  I don't want it to spread the
>> stages across the cluster so that a single item has to hop from machine to
>> machine any more than necessary.  I'm not sure how best to model this in
>> Helix.
>> In Helix's world, does it make sense to model stages as "partitions"
>> within a pipeline "resource"?  Or should the stages be resources
>> themselves?   And if they are resources, can I define a constraint or
>> rebalancing algorithm that attempts to colocate them?   I see that the task
>> execution recipe is pretty similar to what I want, and it models the
>> individual stages as resources... so I'm guessing that's the best way to
>> model it, but I don't know enough about Helix yet to know the pros and cons.
>>
>> I'm also assuming that the new "task" abstraction in Helix 0.7 probably
>> isn't what I want; it seems to be modeling something with a discrete
>> execution like a MapReduce mapper, as opposed to a stage that items flow
>> through.  Am I correct?
>>
>> Thanks for any advice you can give!
>>
>> matt
>>
>
>

Re: Modeling a pipeline in Helix

Posted by kishore g <g....@gmail.com>.
Hi Matt,

When you say scale particular pipeline up/down by adding replicas, do you
mean each replica will process different set of items? If that is the case
then you probably want to increase the number of partitions and not the
replicas.

I am assuming these stages are long running. Is this assumption correct?

What do you want to happen when a node/process fails?

thanks,
Kishore G


On Sat, Nov 1, 2014 at 5:39 PM, matt hoffman <ma...@acm.org> wrote:

> I'm doing a quick POC to write an event processing pipeline using Helix,
> and I had a couple questions about the best way to model it.
>
> For simplicity, these pipelines are made up of a linear set of stages, and
> an item flows through each stage in order.
>
> I'd like to be able to scale a particular pipeline up and down (3
> instances of Pipeline A on the cluster, for example).  That seems
> straightforward to do in Helix -- it's just changing the number of replicas
> of a resource.
>
> Sometimes particular stages have to run on a particular set of machines
> (say, a stage that requires a GPU that is only on some machines in the
> cluster, for example).  It looks like I would do that in Helix using a
> SEMI_AUTO rebalancing mode.
>
> For efficiency, I'd like Helix to try to group as many stages of the same
> pipeline onto the same machine as possible.  I don't want it to spread the
> stages across the cluster so that a single item has to hop from machine to
> machine any more than necessary.  I'm not sure how best to model this in
> Helix.
> In Helix's world, does it make sense to model stages as "partitions"
> within a pipeline "resource"?  Or should the stages be resources
> themselves?   And if they are resources, can I define a constraint or
> rebalancing algorithm that attempts to colocate them?   I see that the task
> execution recipe is pretty similar to what I want, and it models the
> individual stages as resources... so I'm guessing that's the best way to
> model it, but I don't know enough about Helix yet to know the pros and cons.
>
> I'm also assuming that the new "task" abstraction in Helix 0.7 probably
> isn't what I want; it seems to be modeling something with a discrete
> execution like a MapReduce mapper, as opposed to a stage that items flow
> through.  Am I correct?
>
> Thanks for any advice you can give!
>
> matt
>