You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@samza.apache.org by "Bae, Jae Hyeon" <me...@gmail.com> on 2015/02/06 22:26:39 UTC

How to create a single task per container

Our current main purpose of samza is for data pipeline, so we don't want to
create multiple tasks in the single SamzaContainer. As I read Samza
implementation, it will create as many tasks as the number of partitions
assigned in the container, right?

The problem of that approach is, each task will have a separate buffer,
which is not necessary in our use case. So, I wrote the following
SystemStreamPartitionGrouper

public class SingleTaskSystemStreamPartition implements
SystemStreamPartitionGrouper {
  @Override
  public Map<TaskName, Set<SystemStreamPartition>>
group(Set<SystemStreamPartition> ssps) {
    return new ImmutableMap.Builder<TaskName, Set<SystemStreamPartition>>()
                .put(new TaskName(ssps.iterator().next().getStream()), ssps)
                .build();
  }
}

The above worked with yarn.container.count=1 but if I increase that number,
containers couldn't start because they couldn't get assigned
SystemStreamPartition.

Could you guide me how to write SystemStreamPartitionGrouper to create a
single task per SamzaContainer?

Thank you
Best, Jae

Re: How to create a single task per container

Posted by Chris Riccomini <cr...@apache.org>.
Hey Jae,

You're correct. You'll need to write a custom SytsemStreamPartitionGrouper.
You can use the config object in getSystemStreamPartitionGrouper to pull
out your container count (yarn.container.count), and group the SSPs into
that many tasks. This will guarantee you that you'll have exactly as many
tasks as containers (i.e. one task per container).

Cheers,
Chris

On Fri, Feb 6, 2015 at 2:39 PM, Bae, Jae Hyeon <me...@gmail.com> wrote:

> I am reading JobCoordinator and now I can understand why multiple
> containers were not launched. I need to create multiple tasks, which are
> grouped again based on containerCount.
>
> On Fri, Feb 6, 2015 at 1:26 PM, Bae, Jae Hyeon <me...@gmail.com> wrote:
>
> > Our current main purpose of samza is for data pipeline, so we don't want
> > to create multiple tasks in the single SamzaContainer. As I read Samza
> > implementation, it will create as many tasks as the number of partitions
> > assigned in the container, right?
> >
> > The problem of that approach is, each task will have a separate buffer,
> > which is not necessary in our use case. So, I wrote the following
> > SystemStreamPartitionGrouper
> >
> > public class SingleTaskSystemStreamPartition implements
> > SystemStreamPartitionGrouper {
> >   @Override
> >   public Map<TaskName, Set<SystemStreamPartition>>
> > group(Set<SystemStreamPartition> ssps) {
> >     return new ImmutableMap.Builder<TaskName,
> Set<SystemStreamPartition>>()
> >                 .put(new TaskName(ssps.iterator().next().getStream()),
> > ssps)
> >                 .build();
> >   }
> > }
> >
> > The above worked with yarn.container.count=1 but if I increase that
> > number, containers couldn't start because they couldn't get assigned
> > SystemStreamPartition.
> >
> > Could you guide me how to write SystemStreamPartitionGrouper to create a
> > single task per SamzaContainer?
> >
> > Thank you
> > Best, Jae
> >
>

Re: How to create a single task per container

Posted by "Bae, Jae Hyeon" <me...@gmail.com>.
I am reading JobCoordinator and now I can understand why multiple
containers were not launched. I need to create multiple tasks, which are
grouped again based on containerCount.

On Fri, Feb 6, 2015 at 1:26 PM, Bae, Jae Hyeon <me...@gmail.com> wrote:

> Our current main purpose of samza is for data pipeline, so we don't want
> to create multiple tasks in the single SamzaContainer. As I read Samza
> implementation, it will create as many tasks as the number of partitions
> assigned in the container, right?
>
> The problem of that approach is, each task will have a separate buffer,
> which is not necessary in our use case. So, I wrote the following
> SystemStreamPartitionGrouper
>
> public class SingleTaskSystemStreamPartition implements
> SystemStreamPartitionGrouper {
>   @Override
>   public Map<TaskName, Set<SystemStreamPartition>>
> group(Set<SystemStreamPartition> ssps) {
>     return new ImmutableMap.Builder<TaskName, Set<SystemStreamPartition>>()
>                 .put(new TaskName(ssps.iterator().next().getStream()),
> ssps)
>                 .build();
>   }
> }
>
> The above worked with yarn.container.count=1 but if I increase that
> number, containers couldn't start because they couldn't get assigned
> SystemStreamPartition.
>
> Could you guide me how to write SystemStreamPartitionGrouper to create a
> single task per SamzaContainer?
>
> Thank you
> Best, Jae
>