You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@samza.apache.org by Klaus Schaefers <kl...@ligatus.com> on 2013/12/09 17:02:58 UTC

Fwd: Some Questions About Stream Partitions

Hi,

I have been reading about the Samza and I like the concept behind it a lot.
In particular the local key-value store is a good idea. However I have some
short questions regarding the local state that I couldn't answer while
reading the web page. I would be very happy if someone could answer them
shortly. Here they are:


1) In case of failure, will Samza restore the state automatically on
another node?

2) If I want to scale out and increase the number of stream partitions. How
is the local storage handled? Is it distributed by the framework as well
according to the partitions?


Cheers,

Klaus




-- 

-- 

Klaus Schaefers
Senior Optimization Manager

Ligatus GmbH
Hohenstaufenring 30-32
D-50674 Köln

Tel.:  +49 (0) 221 / 56939 -784
Fax:  +49 (0) 221 / 56 939 - 599
E-Mail: klaus.schaefers@ligatus.com
Web: www.ligatus.de

HRB Köln 56003
Geschäftsführung:
Dipl.-Kaufmann Lars Hasselbach, Dipl.-Kaufmann Klaus Ludemann,
Dipl.-Wirtschaftsingenieur Arne Wolter



-- 

-- 

Klaus Schaefers
Senior Optimization Manager

Ligatus GmbH
Hohenstaufenring 30-32
D-50674 Köln

Tel.:  +49 (0) 221 / 56939 -784
Fax:  +49 (0) 221 / 56 939 - 599
E-Mail: klaus.schaefers@ligatus.com
Web: www.ligatus.de

HRB Köln 56003
Geschäftsführung:
Dipl.-Kaufmann Lars Hasselbach, Dipl.-Kaufmann Klaus Ludemann,
Dipl.-Wirtschaftsingenieur Arne Wolter

Re: Some Questions About Stream Partitions

Posted by Jay Kreps <ja...@gmail.com>.
Hey Klaus,

These are good questions.

1. Yes. The stores.my-store.changelog=my-stream-name indicates that the
key-value store should be backed by a changelog. If this is set each change
to the local store is logged for fault-tolerance and the store will be
automatically restored on fail-over. If it is not set it is assumed the
task can restore its state from its inputs.

2. The concepts here are partitions, which are a logical notion of
parallelism and containers which are the physical notion of parallelism
(i.e. unix processes). Partitions are fixed, but containers are changable.
If you want to add containers later you do so just by changing the config
and restarting the job. So you will want to over-partition your job to
allow for future growth.

-Jay


On Mon, Dec 9, 2013 at 8:02 AM, Klaus Schaefers <klaus.schaefers@ligatus.com
> wrote:

> Hi,
>
> I have been reading about the Samza and I like the concept behind it a lot.
> In particular the local key-value store is a good idea. However I have some
> short questions regarding the local state that I couldn't answer while
> reading the web page. I would be very happy if someone could answer them
> shortly. Here they are:
>
>
> 1) In case of failure, will Samza restore the state automatically on
> another node?
>
> 2) If I want to scale out and increase the number of stream partitions. How
> is the local storage handled? Is it distributed by the framework as well
> according to the partitions?
>
>
> Cheers,
>
> Klaus
>
>
>
>
> --
>
> --
>
> Klaus Schaefers
> Senior Optimization Manager
>
> Ligatus GmbH
> Hohenstaufenring 30-32
> D-50674 Köln
>
> Tel.:  +49 (0) 221 / 56939 -784
> Fax:  +49 (0) 221 / 56 939 - 599
> E-Mail: klaus.schaefers@ligatus.com
> Web: www.ligatus.de
>
> HRB Köln 56003
> Geschäftsführung:
> Dipl.-Kaufmann Lars Hasselbach, Dipl.-Kaufmann Klaus Ludemann,
> Dipl.-Wirtschaftsingenieur Arne Wolter
>
>
>
> --
>
> --
>
> Klaus Schaefers
> Senior Optimization Manager
>
> Ligatus GmbH
> Hohenstaufenring 30-32
> D-50674 Köln
>
> Tel.:  +49 (0) 221 / 56939 -784
> Fax:  +49 (0) 221 / 56 939 - 599
> E-Mail: klaus.schaefers@ligatus.com
> Web: www.ligatus.de
>
> HRB Köln 56003
> Geschäftsführung:
> Dipl.-Kaufmann Lars Hasselbach, Dipl.-Kaufmann Klaus Ludemann,
> Dipl.-Wirtschaftsingenieur Arne Wolter
>

Re: Some Questions About Stream Partitions

Posted by Jay Kreps <ja...@gmail.com>.
Yes, exactly. Say you have N partitions and a single input. The number of
containers can be anything from 1 to N, and you can change it any time.


On Tue, Dec 10, 2013 at 2:27 AM, Klaus Schaefers <
klaus.schaefers@ligatus.com> wrote:

> Hi,
>
> thx for the detailed answers. But one more question, what do you mean be
> "over-partition"?  Do you mean I would initially define lets say 100
> partitions and then just assign 2 containers? When I need to scale out I
> would just add more containers and Samza would the also redistribute the
> state store? What if I want to reduce the number of containers (e.g.
> hosting on EC2), can Samza merge the state stores from several containers
> to one?
>
> Cheers,
>
> Klaus
>
>
>
> On Mon, Dec 9, 2013 at 6:40 PM, Chris Riccomini <criccomini@linkedin.com
> >wrote:
>
> > Hi Klaush,
> >
> > Thanks for your interest. I'm happy to answer any questions you've got.
> >
> > 1) In case of failure, will Samza restore the state automatically on
> > another node?
> >
> > Yes. If you are using YARN
> > (job.factory.class=org.apache.samza.job.yarn.YarnJobFactory), then YARN
> > will see that a Samza container has failed. It will re-start the Samza
> > container on a machine in the YARN grid. This node might be the same node
> > on the grid, or it might be a new node. YARN makes that decision based on
> > the available resources in the grid. When the Samza container starts up,
> > Samza will restore the container's state to where it was before the
> > failure occurred. Once the state has been fully restored, the Samza
> > container will then start feeding your StreamTask new messages from the
> > input streams.
> >
> > 2) If I want to scale out and increase the number of stream partitions.
> How
> > is the local storage handled? Is it distributed by the framework as well
> > according to the partitions?
> >
> >
> > Samza's partitioning model is currently determined by the number of
> > partitions that a Samza jobs input streams have. A Samza job will always
> > have the max of the partition counts from all of its input streams. For
> > example, if a Samza job has two input streams: A and B, and stream A has
> 4
> > partitions, and stream B has 8, then the Samza job will have 8
> partitions.
> > The first four partitions (0-3) of the Samza job will receive messages
> > from both stream A and Stream b, and the last 4 partitions (4-7) will
> > receive messages ONLY from stream B. These partitions are run physically
> > inside of Samza "containers". Samza containers are assigned partitions
> > that they are responsible for processing. For example, if you had two
> > Samza containers, the first container would process 4 partitions, and the
> > second container would process the other four. With YARN, the number of
> > containers you have is defined by the config yarn.container.count. Right
> > now, you can never have more containers that input stream partitions.
> >
> > The state for each one of a Samza job's partitions is managed entirely
> > independently. That is, each Samza partition has its own state store
> > (LevelDb, if you're using samza-kv). So in the example above, if the
> > StreamTask were using a single key-value store, there would be 8 LevelDb
> > stores, one for each Samza partition. This allows us to move partitions
> > between containers. If you were to decide you wanted 3 containers instead
> > of 2, Samza would simply cease processing the partitions on the other two
> > containers, restore the state for the third container, and then begin
> > processing the partitions across all three containers. This model means
> > that the maximum parallelism you can get when processing is 1 partition
> > per container, and up to as many partitions as your input streams have.
> >
> > Samza does not support resizing an input stream's partition count right
> > now. Once you start a Samza job, the partition counts for the input
> > streams are assumed to be static. If you decide you need more
> parallelism,
> > you need to start a new Samza job with a different job.name, and
> > re-process all the input data again.
> >
> > Cheers,
> > Chris
> >
> > On 12/9/13 8:02 AM, "Klaus Schaefers" <kl...@ligatus.com>
> wrote:
> >
> > >Hi,
> > >
> > >I have been reading about the Samza and I like the concept behind it a
> > >lot.
> > >In particular the local key-value store is a good idea. However I have
> > >some
> > >short questions regarding the local state that I couldn't answer while
> > >reading the web page. I would be very happy if someone could answer them
> > >shortly. Here they are:
> > >
> > >
> > >1) In case of failure, will Samza restore the state automatically on
> > >another node?
> > >
> > >2) If I want to scale out and increase the number of stream partitions.
> > >How
> > >is the local storage handled? Is it distributed by the framework as well
> > >according to the partitions?
> > >
> > >
> > >Cheers,
> > >
> > >Klaus
> > >
> > >
> > >
> > >
> > >--
> > >
> > >--
> > >
> > >Klaus Schaefers
> > >Senior Optimization Manager
> > >
> > >Ligatus GmbH
> > >Hohenstaufenring 30-32
> > >D-50674 Köln
> > >
> > >Tel.:  +49 (0) 221 / 56939 -784
> > >Fax:  +49 (0) 221 / 56 939 - 599
> > >E-Mail: klaus.schaefers@ligatus.com
> > >Web: www.ligatus.de
> > >
> > >HRB Köln 56003
> > >Geschäftsführung:
> > >Dipl.-Kaufmann Lars Hasselbach, Dipl.-Kaufmann Klaus Ludemann,
> > >Dipl.-Wirtschaftsingenieur Arne Wolter
> > >
> > >
> > >
> > >--
> > >
> > >--
> > >
> > >Klaus Schaefers
> > >Senior Optimization Manager
> > >
> > >Ligatus GmbH
> > >Hohenstaufenring 30-32
> > >D-50674 Köln
> > >
> > >Tel.:  +49 (0) 221 / 56939 -784
> > >Fax:  +49 (0) 221 / 56 939 - 599
> > >E-Mail: klaus.schaefers@ligatus.com
> > >Web: www.ligatus.de
> > >
> > >HRB Köln 56003
> > >Geschäftsführung:
> > >Dipl.-Kaufmann Lars Hasselbach, Dipl.-Kaufmann Klaus Ludemann,
> > >Dipl.-Wirtschaftsingenieur Arne Wolter
> >
> >
>
>
> --
>
> --
>
> Klaus Schaefers
> Senior Optimization Manager
>
> Ligatus GmbH
> Hohenstaufenring 30-32
> D-50674 Köln
>
> Tel.:  +49 (0) 221 / 56939 -784
> Fax:  +49 (0) 221 / 56 939 - 599
> E-Mail: klaus.schaefers@ligatus.com
> Web: www.ligatus.de
>
> HRB Köln 56003
> Geschäftsführung:
> Dipl.-Kaufmann Lars Hasselbach, Dipl.-Kaufmann Klaus Ludemann,
> Dipl.-Wirtschaftsingenieur Arne Wolter
>

Re: Some Questions About Stream Partitions

Posted by Klaus Schaefers <kl...@ligatus.com>.
Hi,

thx for the detailed answers. But one more question, what do you mean be
"over-partition"?  Do you mean I would initially define lets say 100
partitions and then just assign 2 containers? When I need to scale out I
would just add more containers and Samza would the also redistribute the
state store? What if I want to reduce the number of containers (e.g.
hosting on EC2), can Samza merge the state stores from several containers
to one?

Cheers,

Klaus



On Mon, Dec 9, 2013 at 6:40 PM, Chris Riccomini <cr...@linkedin.com>wrote:

> Hi Klaush,
>
> Thanks for your interest. I'm happy to answer any questions you've got.
>
> 1) In case of failure, will Samza restore the state automatically on
> another node?
>
> Yes. If you are using YARN
> (job.factory.class=org.apache.samza.job.yarn.YarnJobFactory), then YARN
> will see that a Samza container has failed. It will re-start the Samza
> container on a machine in the YARN grid. This node might be the same node
> on the grid, or it might be a new node. YARN makes that decision based on
> the available resources in the grid. When the Samza container starts up,
> Samza will restore the container's state to where it was before the
> failure occurred. Once the state has been fully restored, the Samza
> container will then start feeding your StreamTask new messages from the
> input streams.
>
> 2) If I want to scale out and increase the number of stream partitions. How
> is the local storage handled? Is it distributed by the framework as well
> according to the partitions?
>
>
> Samza's partitioning model is currently determined by the number of
> partitions that a Samza jobs input streams have. A Samza job will always
> have the max of the partition counts from all of its input streams. For
> example, if a Samza job has two input streams: A and B, and stream A has 4
> partitions, and stream B has 8, then the Samza job will have 8 partitions.
> The first four partitions (0-3) of the Samza job will receive messages
> from both stream A and Stream b, and the last 4 partitions (4-7) will
> receive messages ONLY from stream B. These partitions are run physically
> inside of Samza "containers". Samza containers are assigned partitions
> that they are responsible for processing. For example, if you had two
> Samza containers, the first container would process 4 partitions, and the
> second container would process the other four. With YARN, the number of
> containers you have is defined by the config yarn.container.count. Right
> now, you can never have more containers that input stream partitions.
>
> The state for each one of a Samza job's partitions is managed entirely
> independently. That is, each Samza partition has its own state store
> (LevelDb, if you're using samza-kv). So in the example above, if the
> StreamTask were using a single key-value store, there would be 8 LevelDb
> stores, one for each Samza partition. This allows us to move partitions
> between containers. If you were to decide you wanted 3 containers instead
> of 2, Samza would simply cease processing the partitions on the other two
> containers, restore the state for the third container, and then begin
> processing the partitions across all three containers. This model means
> that the maximum parallelism you can get when processing is 1 partition
> per container, and up to as many partitions as your input streams have.
>
> Samza does not support resizing an input stream's partition count right
> now. Once you start a Samza job, the partition counts for the input
> streams are assumed to be static. If you decide you need more parallelism,
> you need to start a new Samza job with a different job.name, and
> re-process all the input data again.
>
> Cheers,
> Chris
>
> On 12/9/13 8:02 AM, "Klaus Schaefers" <kl...@ligatus.com> wrote:
>
> >Hi,
> >
> >I have been reading about the Samza and I like the concept behind it a
> >lot.
> >In particular the local key-value store is a good idea. However I have
> >some
> >short questions regarding the local state that I couldn't answer while
> >reading the web page. I would be very happy if someone could answer them
> >shortly. Here they are:
> >
> >
> >1) In case of failure, will Samza restore the state automatically on
> >another node?
> >
> >2) If I want to scale out and increase the number of stream partitions.
> >How
> >is the local storage handled? Is it distributed by the framework as well
> >according to the partitions?
> >
> >
> >Cheers,
> >
> >Klaus
> >
> >
> >
> >
> >--
> >
> >--
> >
> >Klaus Schaefers
> >Senior Optimization Manager
> >
> >Ligatus GmbH
> >Hohenstaufenring 30-32
> >D-50674 Köln
> >
> >Tel.:  +49 (0) 221 / 56939 -784
> >Fax:  +49 (0) 221 / 56 939 - 599
> >E-Mail: klaus.schaefers@ligatus.com
> >Web: www.ligatus.de
> >
> >HRB Köln 56003
> >Geschäftsführung:
> >Dipl.-Kaufmann Lars Hasselbach, Dipl.-Kaufmann Klaus Ludemann,
> >Dipl.-Wirtschaftsingenieur Arne Wolter
> >
> >
> >
> >--
> >
> >--
> >
> >Klaus Schaefers
> >Senior Optimization Manager
> >
> >Ligatus GmbH
> >Hohenstaufenring 30-32
> >D-50674 Köln
> >
> >Tel.:  +49 (0) 221 / 56939 -784
> >Fax:  +49 (0) 221 / 56 939 - 599
> >E-Mail: klaus.schaefers@ligatus.com
> >Web: www.ligatus.de
> >
> >HRB Köln 56003
> >Geschäftsführung:
> >Dipl.-Kaufmann Lars Hasselbach, Dipl.-Kaufmann Klaus Ludemann,
> >Dipl.-Wirtschaftsingenieur Arne Wolter
>
>


-- 

-- 

Klaus Schaefers
Senior Optimization Manager

Ligatus GmbH
Hohenstaufenring 30-32
D-50674 Köln

Tel.:  +49 (0) 221 / 56939 -784
Fax:  +49 (0) 221 / 56 939 - 599
E-Mail: klaus.schaefers@ligatus.com
Web: www.ligatus.de

HRB Köln 56003
Geschäftsführung:
Dipl.-Kaufmann Lars Hasselbach, Dipl.-Kaufmann Klaus Ludemann,
Dipl.-Wirtschaftsingenieur Arne Wolter

Re: Some Questions About Stream Partitions

Posted by Chris Riccomini <cr...@linkedin.com>.
Hi Klaush,

Thanks for your interest. I'm happy to answer any questions you've got.

1) In case of failure, will Samza restore the state automatically on
another node?

Yes. If you are using YARN
(job.factory.class=org.apache.samza.job.yarn.YarnJobFactory), then YARN
will see that a Samza container has failed. It will re-start the Samza
container on a machine in the YARN grid. This node might be the same node
on the grid, or it might be a new node. YARN makes that decision based on
the available resources in the grid. When the Samza container starts up,
Samza will restore the container's state to where it was before the
failure occurred. Once the state has been fully restored, the Samza
container will then start feeding your StreamTask new messages from the
input streams.

2) If I want to scale out and increase the number of stream partitions. How
is the local storage handled? Is it distributed by the framework as well
according to the partitions?


Samza's partitioning model is currently determined by the number of
partitions that a Samza jobs input streams have. A Samza job will always
have the max of the partition counts from all of its input streams. For
example, if a Samza job has two input streams: A and B, and stream A has 4
partitions, and stream B has 8, then the Samza job will have 8 partitions.
The first four partitions (0-3) of the Samza job will receive messages
from both stream A and Stream b, and the last 4 partitions (4-7) will
receive messages ONLY from stream B. These partitions are run physically
inside of Samza "containers". Samza containers are assigned partitions
that they are responsible for processing. For example, if you had two
Samza containers, the first container would process 4 partitions, and the
second container would process the other four. With YARN, the number of
containers you have is defined by the config yarn.container.count. Right
now, you can never have more containers that input stream partitions.

The state for each one of a Samza job's partitions is managed entirely
independently. That is, each Samza partition has its own state store
(LevelDb, if you're using samza-kv). So in the example above, if the
StreamTask were using a single key-value store, there would be 8 LevelDb
stores, one for each Samza partition. This allows us to move partitions
between containers. If you were to decide you wanted 3 containers instead
of 2, Samza would simply cease processing the partitions on the other two
containers, restore the state for the third container, and then begin
processing the partitions across all three containers. This model means
that the maximum parallelism you can get when processing is 1 partition
per container, and up to as many partitions as your input streams have.

Samza does not support resizing an input stream's partition count right
now. Once you start a Samza job, the partition counts for the input
streams are assumed to be static. If you decide you need more parallelism,
you need to start a new Samza job with a different job.name, and
re-process all the input data again.

Cheers,
Chris

On 12/9/13 8:02 AM, "Klaus Schaefers" <kl...@ligatus.com> wrote:

>Hi,
>
>I have been reading about the Samza and I like the concept behind it a
>lot.
>In particular the local key-value store is a good idea. However I have
>some
>short questions regarding the local state that I couldn't answer while
>reading the web page. I would be very happy if someone could answer them
>shortly. Here they are:
>
>
>1) In case of failure, will Samza restore the state automatically on
>another node?
>
>2) If I want to scale out and increase the number of stream partitions.
>How
>is the local storage handled? Is it distributed by the framework as well
>according to the partitions?
>
>
>Cheers,
>
>Klaus
>
>
>
>
>-- 
>
>-- 
>
>Klaus Schaefers
>Senior Optimization Manager
>
>Ligatus GmbH
>Hohenstaufenring 30-32
>D-50674 Köln
>
>Tel.:  +49 (0) 221 / 56939 -784
>Fax:  +49 (0) 221 / 56 939 - 599
>E-Mail: klaus.schaefers@ligatus.com
>Web: www.ligatus.de
>
>HRB Köln 56003
>Geschäftsführung:
>Dipl.-Kaufmann Lars Hasselbach, Dipl.-Kaufmann Klaus Ludemann,
>Dipl.-Wirtschaftsingenieur Arne Wolter
>
>
>
>-- 
>
>-- 
>
>Klaus Schaefers
>Senior Optimization Manager
>
>Ligatus GmbH
>Hohenstaufenring 30-32
>D-50674 Köln
>
>Tel.:  +49 (0) 221 / 56939 -784
>Fax:  +49 (0) 221 / 56 939 - 599
>E-Mail: klaus.schaefers@ligatus.com
>Web: www.ligatus.de
>
>HRB Köln 56003
>Geschäftsführung:
>Dipl.-Kaufmann Lars Hasselbach, Dipl.-Kaufmann Klaus Ludemann,
>Dipl.-Wirtschaftsingenieur Arne Wolter