You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@samza.apache.org by Roger Hoover <ro...@gmail.com> on 2014/09/25 01:55:19 UTC

How to deal with scaling?

Hi all,

So it seems like one of the first decisions that you have to make when
creating a Samza job is how many partitions to have in your input topics.
This will dictate how many tasks are created and how many changelog
partitions get created.  It's great that you can independently change the
number of Samza containers that get deployed but what do you do once you
reach the max (# containers == # tasks)?

If the job's input topics are partitioned by key, then you cannot add more
partitions without corrupting existing state.  Does this come up for people
in practice?  How do you handle it?

Just trying to think it through, it seems like you need a procedure
something like this:

1) Create new topics to hold the same data but with more partitions
(inputs, outputs, and changelog topics)
2) Deploy jobs to repartition inputs and changelog topics into the new
topics
3) When caught up, stop the running job
4) Change job config to point to new topics and restart the job (if all
topics are new, this can be done while previous job run is still active
using new job.id)
5) Change downstream jobs to use new output topic if necessary.  Doing this
in a safe way might be hard.

Ideally at some point, this process could be automated.  I was wondering
whether a generic task could be written for step #2 but I think it would
require a couple of constraints:

1) All meaningfully-partitioned topics would need to include their keys in
the stream.  In Kafka, this is optional unless you enable compaction but
for this to work generically, it would have to be mandatory in Samza for
any stream for which partitions have meaning (not using random or
round-robin partitioning).
2) The partition keys should be re-hashable based on their raw byte
representation so that the repartition task would not have to know how to
deserialize the keys in order to compute their new partition.  At first
glance, this doesn't seem too onerous but I saw in the Config Stream
proposal (SAMZA-348) that keys might be JSON:

{"type":"offset","key","my-long-system-name.my-even-longer-stream-name-that-is-really-long.1000"
}

This would be problematic as the order of the dictionary keys can change
but would still mean the same thing.  In order to use JSON as a serde for
keys, you'd need to enforce a sort order on dictionaries.

I'm curious what others do about this or what your thoughts are.  Thanks,

Roger

Re: How to deal with scaling?

Posted by Roger Hoover <ro...@gmail.com>.
Very helpful.  Thanks for sharing, Chris.

On Mon, Sep 29, 2014 at 1:08 PM, Chris Riccomini <
criccomini@linkedin.com.invalid> wrote:

> Hey Roger,
>
> Sorry for the late reply. Trying to load balance across multiple
> obligations.
>
> > Just to be more explicit about  "starting it from scratch".  The only
> >way to do this theoretically correctly, I think, would be to have the
> >newly partitioned job start with no state and playback it's input topics
> >from the beginning of time.
>
> Correct. Though, it's worth considering what "beginning of time" means.
> When you start any Samza job for the first time, you don't generally play
> it all data since the beginning of time. It usually just picks up from
> time T, and moves forward from there. In this case, I think the time T
> that you'd want to pick up from would probably depend on the logic of the
> job. It could be "now", "last checkpoint of the old job", "a week ago",
> etc.
>
> > This brings me to another question about deployment.  Do you recommend
> >having two separate Kafka clusters?  In the "public" cluster, brokers
> >would be deployed on machines by themselves. Then you have another Kafka
> >cluster for Samza in which the brokers are co-located with YARN
> >NodeManagers on each machine.  With this approach, Samza topologies would
> >consume from and ultimately publish to topics on the "public" cluster.
> >All of the internal topics like repartitioning, changelog, etc. would be
> >hidden away in the Kafka cluster dedicated to Samza.
>
> As Jakob said, this is how we've been running over the past couple of
> years. We use MirrorMaker to pull data back and forth between the two
> clusters (be careful for cycles, though). Recently, we moved the NMs off
> of the Kafka broker boxes for the Samza grid. The main reason for this was
> that the stateful Samza jobs were using page cache as a side-effect of
> interacting with the LevelDB state. This caused a degradation in
> performance for the Kafka brokers that were running along side the Samza
> jobs. Kafka is very page cache sensitive, since it uses it as an in-memory
> buffer for the most recent N minutes of messages. By pulling the jobs off
> into their own boxes, we were able to solve some performance issues that
> we were seeing with the Kafka brokers.
>
> The locality gains that we were seeing by running both jobs and brokers
> together was never measured. I believe it's probably pretty negligible
> (and degrades as the cluster size increases). Thoughts on locality are
> here:
>
> https://issues.apache.org/jira/browse/SAMZA-335
>
>
> Kafka's partitioning model does not lend itself all that well to locality
> optimization (vs. a block store like HDFS).
>
> Anyway, food for thought.
>
> Cheers,
> Chris
>
> On 9/26/14 11:20 AM, "Roger Hoover" <ro...@gmail.com> wrote:
>
> >Chris,
> >
> >Thanks for the great answers.  It's helping me clear up my thinking...
> >
> >On Fri, Sep 26, 2014 at 9:10 AM, Chris Riccomini <
> >criccomini@linkedin.com.invalid> wrote:
> >
> >> Hey Roger,
> >>
> >> > If the job's input topics are partitioned by key, then you cannot add
> >> >more partitions without corrupting existing state.
> >>
> >> This is correct.
> >>
> >> > Does this come up for people in practice?
> >>
> >> It does come up occasionally for us. Thus far, we usually just run a
> >>Kafka
> >> topic-partition expansion (thereby trashing the semantics of the
> >> partitioning) and restart the job. Inconsistent output is then emitted
> >>for
> >> a while. We do this only when we agree that inconsistent output is
> >> tolerable.
> >>
> >
> >Thanks.  This might be a reasonable in many cases (not sure yet).
> >
> >
> >>
> >> Another thing we do for this is over-partition our Kafka topics when
> >>we're
> >> concerned about growth.
> >>
> >> Both of these solutions are admittedly hacky. As you said, the ideal
> >> solution would be some kind of automatic migration. It seems possible
> >>that
> >> the AM (job coordinator) might be able to manage this, especially of we
> >> had a pre-packaged "repartition job" that it could trigger. I haven't
> >> thought about this in detail, though.
> >>
> >> > Deploy jobs to repartition inputs and changelog topics into the new
> >> >topics
> >>
> >> The changelog topic seems problematic to me. It seems that they key used
> >> in the changelog might not always be directly related to the
> >>partitioning
> >> of the input topic. For example,  if you have a StreamTask that is
> >> consuming a single input partition, and keeping a count in the state
> >>store
> >> of all messages that it sees, how do you repartition this changelog? In
> >> the new world, the keys for the single partition that it's consuming
> >>could
> >> be spread across many different partitions, and the count is pretty much
> >> meaningless, since it can't be split up by key.
> >>
> >> It almost feels like state has to be totally reset to safely do an input
> >> partition expansion under all cases. In a sense, you have to treat the
> >>new
> >> job as a job that's completely new, and start it from scratch.
> >>
> >
> >Ah, you're right.  I think there's no way to migrate state in general.  If
> >a job is saving any kind of aggregate state then that's an irreversible
> >operation that was done on the old partition.  There's not enough
> >information to "repartition" the results.
> >
> >Just to be more explicit about  "starting it from scratch".  The only way
> >to do this theoretically correctly, I think, would be to have the newly
> >partitioned job start with no state and playback it's input topics from
> >the
> >beginning of time.
> >
> >
> >
> >>
> >> > Change job config to point to new topics and restart the job
> >>
> >> One problem with this is going to be the case where you don't control
> >>the
> >> producers for the old input topic. They'd either have to be migrated to
> >> produce to the new input topic for your job, or you'd have to
> >>permanently
> >> run the repartition job to move data from the original topic to the
> >> currently expanded topic. Keeping the repartition job is not all that
> >>wild
> >> of an idea. Most Samza topologies we run have some form of a repartition
> >> job that runs permanently at the beginning of their flow.
> >>
> >
> >I was thinking about repartitioning as a good design pattern as well.
> >Having your job always repartition the input decouples it from the it's
> >upstream topic dependencies.  This brings me to another question about
> >deployment.  Do you recommend having two separate Kafka clusters?  In the
> >"public" cluster, brokers would be deployed on machines by themselves.
> >Then you have another Kafka cluster for Samza in which the brokers are
> >co-located with YARN NodeManagers on each machine.  With this approach,
> >Samza topologies would consume from and ultimately publish to topics on
> >the
> >"public" cluster.  All of the internal topics like repartitioning,
> >changelog, etc. would be hidden away in the Kafka cluster dedicated to
> >Samza.
> >
> >
> >>
> >> > All meaningfully-partitioned topics would need to include their keys
> >>in
> >> >the stream
> >>
> >> True. Somewhat tangential to this is the case where the key that's been
> >> used is not the one your job wishes to partition by. In this case, a
> >> repartition job would be required as well.
> >>
> >> > This would be problematic as the order of the dictionary keys can
> >>change
> >> >but would still mean the same thing.  In order to use JSON as a serde
> >>for
> >> >keys, you'd need to enforce a sort order on dictionaries.
> >>
> >> I struggled with this as well. We basically need a forced ordering for
> >>the
> >> JSON keys in SAMZA-348. Originally, I was thinking of making the
> >>key/value
> >> messages just a simple string with a delimiter. Something like
> >> <type>:<key> for the key and <host>:<source>:<blah> for the value. This
> >> approach is also much more compact than JSON. The problem with the
> >>latter
> >> approach is that it doesn't easily allow for hierarchical key/value
> >>pairs.
> >>
> >
> >I've been constructing string keys in my jobs so far as you mentioned but
> >it adds extra boilerplate to the code.  It would be nice if there were an
> >automatic way to do it.
> >
> >
> >>
> >> Cheers,
> >> Chris
> >>
> >> On 9/24/14 4:55 PM, "Roger Hoover" <ro...@gmail.com> wrote:
> >>
> >> >Hi all,
> >> >
> >> >So it seems like one of the first decisions that you have to make when
> >> >creating a Samza job is how many partitions to have in your input
> >>topics.
> >> >This will dictate how many tasks are created and how many changelog
> >> >partitions get created.  It's great that you can independently change
> >>the
> >> >number of Samza containers that get deployed but what do you do once
> >>you
> >> >reach the max (# containers == # tasks)?
> >> >
> >> >If the job's input topics are partitioned by key, then you cannot add
> >>more
> >> >partitions without corrupting existing state.  Does this come up for
> >> >people
> >> >in practice?  How do you handle it?
> >> >
> >> >Just trying to think it through, it seems like you need a procedure
> >> >something like this:
> >> >
> >> >1) Create new topics to hold the same data but with more partitions
> >> >(inputs, outputs, and changelog topics)
> >> >2) Deploy jobs to repartition inputs and changelog topics into the new
> >> >topics
> >> >3) When caught up, stop the running job
> >> >4) Change job config to point to new topics and restart the job (if all
> >> >topics are new, this can be done while previous job run is still active
> >> >using new job.id)
> >> >5) Change downstream jobs to use new output topic if necessary.  Doing
> >> >this
> >> >in a safe way might be hard.
> >> >
> >> >Ideally at some point, this process could be automated.  I was
> >>wondering
> >> >whether a generic task could be written for step #2 but I think it
> >>would
> >> >require a couple of constraints:
> >> >
> >> >1) All meaningfully-partitioned topics would need to include their
> >>keys in
> >> >the stream.  In Kafka, this is optional unless you enable compaction
> >>but
> >> >for this to work generically, it would have to be mandatory in Samza
> >>for
> >> >any stream for which partitions have meaning (not using random or
> >> >round-robin partitioning).
> >> >2) The partition keys should be re-hashable based on their raw byte
> >> >representation so that the repartition task would not have to know how
> >>to
> >> >deserialize the keys in order to compute their new partition.  At first
> >> >glance, this doesn't seem too onerous but I saw in the Config Stream
> >> >proposal (SAMZA-348) that keys might be JSON:
> >> >
> >>
> >>>{"type":"offset","key","my-long-system-name.my-even-longer-stream-name-t
> >>>ha
> >> >t-is-really-long.1000"
> >> >}
> >> >
> >> >This would be problematic as the order of the dictionary keys can
> >>change
> >> >but would still mean the same thing.  In order to use JSON as a serde
> >>for
> >> >keys, you'd need to enforce a sort order on dictionaries.
> >> >
> >> >I'm curious what others do about this or what your thoughts are.
> >>Thanks,
> >> >
> >> >Roger
> >>
> >>
>
>

Re: How to deal with scaling?

Posted by Chris Riccomini <cr...@linkedin.com.INVALID>.
Hey Roger,

Sorry for the late reply. Trying to load balance across multiple
obligations.

> Just to be more explicit about  "starting it from scratch".  The only
>way to do this theoretically correctly, I think, would be to have the
>newly partitioned job start with no state and playback it's input topics
>from the beginning of time.

Correct. Though, it's worth considering what "beginning of time" means.
When you start any Samza job for the first time, you don't generally play
it all data since the beginning of time. It usually just picks up from
time T, and moves forward from there. In this case, I think the time T
that you'd want to pick up from would probably depend on the logic of the
job. It could be "now", "last checkpoint of the old job", "a week ago",
etc.

> This brings me to another question about deployment.  Do you recommend
>having two separate Kafka clusters?  In the "public" cluster, brokers
>would be deployed on machines by themselves. Then you have another Kafka
>cluster for Samza in which the brokers are co-located with YARN
>NodeManagers on each machine.  With this approach, Samza topologies would
>consume from and ultimately publish to topics on the "public" cluster.
>All of the internal topics like repartitioning, changelog, etc. would be
>hidden away in the Kafka cluster dedicated to Samza.

As Jakob said, this is how we've been running over the past couple of
years. We use MirrorMaker to pull data back and forth between the two
clusters (be careful for cycles, though). Recently, we moved the NMs off
of the Kafka broker boxes for the Samza grid. The main reason for this was
that the stateful Samza jobs were using page cache as a side-effect of
interacting with the LevelDB state. This caused a degradation in
performance for the Kafka brokers that were running along side the Samza
jobs. Kafka is very page cache sensitive, since it uses it as an in-memory
buffer for the most recent N minutes of messages. By pulling the jobs off
into their own boxes, we were able to solve some performance issues that
we were seeing with the Kafka brokers.

The locality gains that we were seeing by running both jobs and brokers
together was never measured. I believe it's probably pretty negligible
(and degrades as the cluster size increases). Thoughts on locality are
here:

https://issues.apache.org/jira/browse/SAMZA-335


Kafka's partitioning model does not lend itself all that well to locality
optimization (vs. a block store like HDFS).

Anyway, food for thought.

Cheers,
Chris

On 9/26/14 11:20 AM, "Roger Hoover" <ro...@gmail.com> wrote:

>Chris,
>
>Thanks for the great answers.  It's helping me clear up my thinking...
>
>On Fri, Sep 26, 2014 at 9:10 AM, Chris Riccomini <
>criccomini@linkedin.com.invalid> wrote:
>
>> Hey Roger,
>>
>> > If the job's input topics are partitioned by key, then you cannot add
>> >more partitions without corrupting existing state.
>>
>> This is correct.
>>
>> > Does this come up for people in practice?
>>
>> It does come up occasionally for us. Thus far, we usually just run a
>>Kafka
>> topic-partition expansion (thereby trashing the semantics of the
>> partitioning) and restart the job. Inconsistent output is then emitted
>>for
>> a while. We do this only when we agree that inconsistent output is
>> tolerable.
>>
>
>Thanks.  This might be a reasonable in many cases (not sure yet).
>
>
>>
>> Another thing we do for this is over-partition our Kafka topics when
>>we're
>> concerned about growth.
>>
>> Both of these solutions are admittedly hacky. As you said, the ideal
>> solution would be some kind of automatic migration. It seems possible
>>that
>> the AM (job coordinator) might be able to manage this, especially of we
>> had a pre-packaged "repartition job" that it could trigger. I haven't
>> thought about this in detail, though.
>>
>> > Deploy jobs to repartition inputs and changelog topics into the new
>> >topics
>>
>> The changelog topic seems problematic to me. It seems that they key used
>> in the changelog might not always be directly related to the
>>partitioning
>> of the input topic. For example,  if you have a StreamTask that is
>> consuming a single input partition, and keeping a count in the state
>>store
>> of all messages that it sees, how do you repartition this changelog? In
>> the new world, the keys for the single partition that it's consuming
>>could
>> be spread across many different partitions, and the count is pretty much
>> meaningless, since it can't be split up by key.
>>
>> It almost feels like state has to be totally reset to safely do an input
>> partition expansion under all cases. In a sense, you have to treat the
>>new
>> job as a job that's completely new, and start it from scratch.
>>
>
>Ah, you're right.  I think there's no way to migrate state in general.  If
>a job is saving any kind of aggregate state then that's an irreversible
>operation that was done on the old partition.  There's not enough
>information to "repartition" the results.
>
>Just to be more explicit about  "starting it from scratch".  The only way
>to do this theoretically correctly, I think, would be to have the newly
>partitioned job start with no state and playback it's input topics from
>the
>beginning of time.
>
>
>
>>
>> > Change job config to point to new topics and restart the job
>>
>> One problem with this is going to be the case where you don't control
>>the
>> producers for the old input topic. They'd either have to be migrated to
>> produce to the new input topic for your job, or you'd have to
>>permanently
>> run the repartition job to move data from the original topic to the
>> currently expanded topic. Keeping the repartition job is not all that
>>wild
>> of an idea. Most Samza topologies we run have some form of a repartition
>> job that runs permanently at the beginning of their flow.
>>
>
>I was thinking about repartitioning as a good design pattern as well.
>Having your job always repartition the input decouples it from the it's
>upstream topic dependencies.  This brings me to another question about
>deployment.  Do you recommend having two separate Kafka clusters?  In the
>"public" cluster, brokers would be deployed on machines by themselves.
>Then you have another Kafka cluster for Samza in which the brokers are
>co-located with YARN NodeManagers on each machine.  With this approach,
>Samza topologies would consume from and ultimately publish to topics on
>the
>"public" cluster.  All of the internal topics like repartitioning,
>changelog, etc. would be hidden away in the Kafka cluster dedicated to
>Samza.
>
>
>>
>> > All meaningfully-partitioned topics would need to include their keys
>>in
>> >the stream
>>
>> True. Somewhat tangential to this is the case where the key that's been
>> used is not the one your job wishes to partition by. In this case, a
>> repartition job would be required as well.
>>
>> > This would be problematic as the order of the dictionary keys can
>>change
>> >but would still mean the same thing.  In order to use JSON as a serde
>>for
>> >keys, you'd need to enforce a sort order on dictionaries.
>>
>> I struggled with this as well. We basically need a forced ordering for
>>the
>> JSON keys in SAMZA-348. Originally, I was thinking of making the
>>key/value
>> messages just a simple string with a delimiter. Something like
>> <type>:<key> for the key and <host>:<source>:<blah> for the value. This
>> approach is also much more compact than JSON. The problem with the
>>latter
>> approach is that it doesn't easily allow for hierarchical key/value
>>pairs.
>>
>
>I've been constructing string keys in my jobs so far as you mentioned but
>it adds extra boilerplate to the code.  It would be nice if there were an
>automatic way to do it.
>
>
>>
>> Cheers,
>> Chris
>>
>> On 9/24/14 4:55 PM, "Roger Hoover" <ro...@gmail.com> wrote:
>>
>> >Hi all,
>> >
>> >So it seems like one of the first decisions that you have to make when
>> >creating a Samza job is how many partitions to have in your input
>>topics.
>> >This will dictate how many tasks are created and how many changelog
>> >partitions get created.  It's great that you can independently change
>>the
>> >number of Samza containers that get deployed but what do you do once
>>you
>> >reach the max (# containers == # tasks)?
>> >
>> >If the job's input topics are partitioned by key, then you cannot add
>>more
>> >partitions without corrupting existing state.  Does this come up for
>> >people
>> >in practice?  How do you handle it?
>> >
>> >Just trying to think it through, it seems like you need a procedure
>> >something like this:
>> >
>> >1) Create new topics to hold the same data but with more partitions
>> >(inputs, outputs, and changelog topics)
>> >2) Deploy jobs to repartition inputs and changelog topics into the new
>> >topics
>> >3) When caught up, stop the running job
>> >4) Change job config to point to new topics and restart the job (if all
>> >topics are new, this can be done while previous job run is still active
>> >using new job.id)
>> >5) Change downstream jobs to use new output topic if necessary.  Doing
>> >this
>> >in a safe way might be hard.
>> >
>> >Ideally at some point, this process could be automated.  I was
>>wondering
>> >whether a generic task could be written for step #2 but I think it
>>would
>> >require a couple of constraints:
>> >
>> >1) All meaningfully-partitioned topics would need to include their
>>keys in
>> >the stream.  In Kafka, this is optional unless you enable compaction
>>but
>> >for this to work generically, it would have to be mandatory in Samza
>>for
>> >any stream for which partitions have meaning (not using random or
>> >round-robin partitioning).
>> >2) The partition keys should be re-hashable based on their raw byte
>> >representation so that the repartition task would not have to know how
>>to
>> >deserialize the keys in order to compute their new partition.  At first
>> >glance, this doesn't seem too onerous but I saw in the Config Stream
>> >proposal (SAMZA-348) that keys might be JSON:
>> >
>> 
>>>{"type":"offset","key","my-long-system-name.my-even-longer-stream-name-t
>>>ha
>> >t-is-really-long.1000"
>> >}
>> >
>> >This would be problematic as the order of the dictionary keys can
>>change
>> >but would still mean the same thing.  In order to use JSON as a serde
>>for
>> >keys, you'd need to enforce a sort order on dictionaries.
>> >
>> >I'm curious what others do about this or what your thoughts are.
>>Thanks,
>> >
>> >Roger
>>
>>


Re: How to deal with scaling?

Posted by Roger Hoover <ro...@gmail.com>.
Thanks, Jakob.

On Fri, Sep 26, 2014 at 2:21 PM, Jakob Homan <jg...@gmail.com> wrote:

> This is pretty close to how things are laid out.  The data from the 'public
> facing' kafka clusters are Mirror-made into Samza-specific Kafka clusters,
> which are colocated (though not necessarily on the same box) as the YARN
> resources.  Data produced through the Samza jobs is written to the Samza
> cluster and then  mirror-made to other clusters for consumption.  This
> approach has the advantage of keeping the Samza processes separate,
> controlled and out of the production path.  The disadvantage is more
> complexity, machines and a tiny bit of latency via the mirror making, but
> overall this approach is pretty rock solid.
>
> -Jakob
>
>
> On Fri, Sep 26, 2014 at 2:14 PM, Roger Hoover <ro...@gmail.com>
> wrote:
>
> > Chris,
> >
> > Would mind giving some advice on my deployment question below?
> >
> > "Do you recommend having two separate Kafka clusters?  In the "public"
> > cluster, brokers would be deployed on machines by themselves.  Then you
> > have another Kafka cluster for Samza in which the brokers are co-located
> > with YARN NodeManagers on each machine.  With this approach, Samza
> > topologies would consume from and ultimately publish to topics on the
> > "public" cluster.  All of the internal topics like repartitioning,
> > changelog, etc. would be hidden away in the Kafka cluster dedicated to
> > Samza."
> >
> > Thanks,
> >
> > Roger
> >
> > On Fri, Sep 26, 2014 at 11:20 AM, Roger Hoover <ro...@gmail.com>
> > wrote:
> >
> > > Chris,
> > >
> > > Thanks for the great answers.  It's helping me clear up my thinking...
> > >
> > > On Fri, Sep 26, 2014 at 9:10 AM, Chris Riccomini <
> > > criccomini@linkedin.com.invalid> wrote:
> > >
> > >> Hey Roger,
> > >>
> > >> > If the job's input topics are partitioned by key, then you cannot
> add
> > >> >more partitions without corrupting existing state.
> > >>
> > >> This is correct.
> > >>
> > >> > Does this come up for people in practice?
> > >>
> > >> It does come up occasionally for us. Thus far, we usually just run a
> > Kafka
> > >> topic-partition expansion (thereby trashing the semantics of the
> > >> partitioning) and restart the job. Inconsistent output is then emitted
> > for
> > >> a while. We do this only when we agree that inconsistent output is
> > >> tolerable.
> > >>
> > >
> > > Thanks.  This might be a reasonable in many cases (not sure yet).
> > >
> > >
> > >>
> > >> Another thing we do for this is over-partition our Kafka topics when
> > we're
> > >> concerned about growth.
> > >>
> > >> Both of these solutions are admittedly hacky. As you said, the ideal
> > >> solution would be some kind of automatic migration. It seems possible
> > that
> > >> the AM (job coordinator) might be able to manage this, especially of
> we
> > >> had a pre-packaged "repartition job" that it could trigger. I haven't
> > >> thought about this in detail, though.
> > >>
> > >> > Deploy jobs to repartition inputs and changelog topics into the new
> > >> >topics
> > >>
> > >> The changelog topic seems problematic to me. It seems that they key
> used
> > >> in the changelog might not always be directly related to the
> > partitioning
> > >> of the input topic. For example,  if you have a StreamTask that is
> > >> consuming a single input partition, and keeping a count in the state
> > store
> > >> of all messages that it sees, how do you repartition this changelog?
> In
> > >> the new world, the keys for the single partition that it's consuming
> > could
> > >> be spread across many different partitions, and the count is pretty
> much
> > >> meaningless, since it can't be split up by key.
> > >>
> > >> It almost feels like state has to be totally reset to safely do an
> input
> > >> partition expansion under all cases. In a sense, you have to treat the
> > new
> > >> job as a job that's completely new, and start it from scratch.
> > >>
> > >
> > > Ah, you're right.  I think there's no way to migrate state in general.
> > If
> > > a job is saving any kind of aggregate state then that's an irreversible
> > > operation that was done on the old partition.  There's not enough
> > > information to "repartition" the results.
> > >
> > > Just to be more explicit about  "starting it from scratch".  The only
> way
> > > to do this theoretically correctly, I think, would be to have the newly
> > > partitioned job start with no state and playback it's input topics from
> > the
> > > beginning of time.
> > >
> > >
> > >
> > >>
> > >> > Change job config to point to new topics and restart the job
> > >>
> > >> One problem with this is going to be the case where you don't control
> > the
> > >> producers for the old input topic. They'd either have to be migrated
> to
> > >> produce to the new input topic for your job, or you'd have to
> > permanently
> > >> run the repartition job to move data from the original topic to the
> > >> currently expanded topic. Keeping the repartition job is not all that
> > wild
> > >> of an idea. Most Samza topologies we run have some form of a
> repartition
> > >> job that runs permanently at the beginning of their flow.
> > >>
> > >
> > > I was thinking about repartitioning as a good design pattern as well.
> > > Having your job always repartition the input decouples it from the it's
> > > upstream topic dependencies.  This brings me to another question about
> > > deployment.  Do you recommend having two separate Kafka clusters?  In
> the
> > > "public" cluster, brokers would be deployed on machines by themselves.
> > > Then you have another Kafka cluster for Samza in which the brokers are
> > > co-located with YARN NodeManagers on each machine.  With this approach,
> > > Samza topologies would consume from and ultimately publish to topics on
> > the
> > > "public" cluster.  All of the internal topics like repartitioning,
> > > changelog, etc. would be hidden away in the Kafka cluster dedicated to
> > > Samza.
> > >
> > >
> > >>
> > >> > All meaningfully-partitioned topics would need to include their keys
> > in
> > >> >the stream
> > >>
> > >> True. Somewhat tangential to this is the case where the key that's
> been
> > >> used is not the one your job wishes to partition by. In this case, a
> > >> repartition job would be required as well.
> > >>
> > >> > This would be problematic as the order of the dictionary keys can
> > change
> > >> >but would still mean the same thing.  In order to use JSON as a serde
> > for
> > >> >keys, you'd need to enforce a sort order on dictionaries.
> > >>
> > >> I struggled with this as well. We basically need a forced ordering for
> > the
> > >> JSON keys in SAMZA-348. Originally, I was thinking of making the
> > key/value
> > >> messages just a simple string with a delimiter. Something like
> > >> <type>:<key> for the key and <host>:<source>:<blah> for the value.
> This
> > >> approach is also much more compact than JSON. The problem with the
> > latter
> > >> approach is that it doesn't easily allow for hierarchical key/value
> > pairs.
> > >>
> > >
> > > I've been constructing string keys in my jobs so far as you mentioned
> but
> > > it adds extra boilerplate to the code.  It would be nice if there were
> an
> > > automatic way to do it.
> > >
> > >
> > >>
> > >> Cheers,
> > >> Chris
> > >>
> > >> On 9/24/14 4:55 PM, "Roger Hoover" <ro...@gmail.com> wrote:
> > >>
> > >> >Hi all,
> > >> >
> > >> >So it seems like one of the first decisions that you have to make
> when
> > >> >creating a Samza job is how many partitions to have in your input
> > topics.
> > >> >This will dictate how many tasks are created and how many changelog
> > >> >partitions get created.  It's great that you can independently change
> > the
> > >> >number of Samza containers that get deployed but what do you do once
> > you
> > >> >reach the max (# containers == # tasks)?
> > >> >
> > >> >If the job's input topics are partitioned by key, then you cannot add
> > >> more
> > >> >partitions without corrupting existing state.  Does this come up for
> > >> >people
> > >> >in practice?  How do you handle it?
> > >> >
> > >> >Just trying to think it through, it seems like you need a procedure
> > >> >something like this:
> > >> >
> > >> >1) Create new topics to hold the same data but with more partitions
> > >> >(inputs, outputs, and changelog topics)
> > >> >2) Deploy jobs to repartition inputs and changelog topics into the
> new
> > >> >topics
> > >> >3) When caught up, stop the running job
> > >> >4) Change job config to point to new topics and restart the job (if
> all
> > >> >topics are new, this can be done while previous job run is still
> active
> > >> >using new job.id)
> > >> >5) Change downstream jobs to use new output topic if necessary.
> Doing
> > >> >this
> > >> >in a safe way might be hard.
> > >> >
> > >> >Ideally at some point, this process could be automated.  I was
> > wondering
> > >> >whether a generic task could be written for step #2 but I think it
> > would
> > >> >require a couple of constraints:
> > >> >
> > >> >1) All meaningfully-partitioned topics would need to include their
> keys
> > >> in
> > >> >the stream.  In Kafka, this is optional unless you enable compaction
> > but
> > >> >for this to work generically, it would have to be mandatory in Samza
> > for
> > >> >any stream for which partitions have meaning (not using random or
> > >> >round-robin partitioning).
> > >> >2) The partition keys should be re-hashable based on their raw byte
> > >> >representation so that the repartition task would not have to know
> how
> > to
> > >> >deserialize the keys in order to compute their new partition.  At
> first
> > >> >glance, this doesn't seem too onerous but I saw in the Config Stream
> > >> >proposal (SAMZA-348) that keys might be JSON:
> > >> >
> > >>
> > >>
> >
> >{"type":"offset","key","my-long-system-name.my-even-longer-stream-name-tha
> > >> >t-is-really-long.1000"
> > >> >}
> > >> >
> > >> >This would be problematic as the order of the dictionary keys can
> > change
> > >> >but would still mean the same thing.  In order to use JSON as a serde
> > for
> > >> >keys, you'd need to enforce a sort order on dictionaries.
> > >> >
> > >> >I'm curious what others do about this or what your thoughts are.
> > Thanks,
> > >> >
> > >> >Roger
> > >>
> > >>
> > >
> >
>

Re: How to deal with scaling?

Posted by Jakob Homan <jg...@gmail.com>.
This is pretty close to how things are laid out.  The data from the 'public
facing' kafka clusters are Mirror-made into Samza-specific Kafka clusters,
which are colocated (though not necessarily on the same box) as the YARN
resources.  Data produced through the Samza jobs is written to the Samza
cluster and then  mirror-made to other clusters for consumption.  This
approach has the advantage of keeping the Samza processes separate,
controlled and out of the production path.  The disadvantage is more
complexity, machines and a tiny bit of latency via the mirror making, but
overall this approach is pretty rock solid.

-Jakob


On Fri, Sep 26, 2014 at 2:14 PM, Roger Hoover <ro...@gmail.com>
wrote:

> Chris,
>
> Would mind giving some advice on my deployment question below?
>
> "Do you recommend having two separate Kafka clusters?  In the "public"
> cluster, brokers would be deployed on machines by themselves.  Then you
> have another Kafka cluster for Samza in which the brokers are co-located
> with YARN NodeManagers on each machine.  With this approach, Samza
> topologies would consume from and ultimately publish to topics on the
> "public" cluster.  All of the internal topics like repartitioning,
> changelog, etc. would be hidden away in the Kafka cluster dedicated to
> Samza."
>
> Thanks,
>
> Roger
>
> On Fri, Sep 26, 2014 at 11:20 AM, Roger Hoover <ro...@gmail.com>
> wrote:
>
> > Chris,
> >
> > Thanks for the great answers.  It's helping me clear up my thinking...
> >
> > On Fri, Sep 26, 2014 at 9:10 AM, Chris Riccomini <
> > criccomini@linkedin.com.invalid> wrote:
> >
> >> Hey Roger,
> >>
> >> > If the job's input topics are partitioned by key, then you cannot add
> >> >more partitions without corrupting existing state.
> >>
> >> This is correct.
> >>
> >> > Does this come up for people in practice?
> >>
> >> It does come up occasionally for us. Thus far, we usually just run a
> Kafka
> >> topic-partition expansion (thereby trashing the semantics of the
> >> partitioning) and restart the job. Inconsistent output is then emitted
> for
> >> a while. We do this only when we agree that inconsistent output is
> >> tolerable.
> >>
> >
> > Thanks.  This might be a reasonable in many cases (not sure yet).
> >
> >
> >>
> >> Another thing we do for this is over-partition our Kafka topics when
> we're
> >> concerned about growth.
> >>
> >> Both of these solutions are admittedly hacky. As you said, the ideal
> >> solution would be some kind of automatic migration. It seems possible
> that
> >> the AM (job coordinator) might be able to manage this, especially of we
> >> had a pre-packaged "repartition job" that it could trigger. I haven't
> >> thought about this in detail, though.
> >>
> >> > Deploy jobs to repartition inputs and changelog topics into the new
> >> >topics
> >>
> >> The changelog topic seems problematic to me. It seems that they key used
> >> in the changelog might not always be directly related to the
> partitioning
> >> of the input topic. For example,  if you have a StreamTask that is
> >> consuming a single input partition, and keeping a count in the state
> store
> >> of all messages that it sees, how do you repartition this changelog? In
> >> the new world, the keys for the single partition that it's consuming
> could
> >> be spread across many different partitions, and the count is pretty much
> >> meaningless, since it can't be split up by key.
> >>
> >> It almost feels like state has to be totally reset to safely do an input
> >> partition expansion under all cases. In a sense, you have to treat the
> new
> >> job as a job that's completely new, and start it from scratch.
> >>
> >
> > Ah, you're right.  I think there's no way to migrate state in general.
> If
> > a job is saving any kind of aggregate state then that's an irreversible
> > operation that was done on the old partition.  There's not enough
> > information to "repartition" the results.
> >
> > Just to be more explicit about  "starting it from scratch".  The only way
> > to do this theoretically correctly, I think, would be to have the newly
> > partitioned job start with no state and playback it's input topics from
> the
> > beginning of time.
> >
> >
> >
> >>
> >> > Change job config to point to new topics and restart the job
> >>
> >> One problem with this is going to be the case where you don't control
> the
> >> producers for the old input topic. They'd either have to be migrated to
> >> produce to the new input topic for your job, or you'd have to
> permanently
> >> run the repartition job to move data from the original topic to the
> >> currently expanded topic. Keeping the repartition job is not all that
> wild
> >> of an idea. Most Samza topologies we run have some form of a repartition
> >> job that runs permanently at the beginning of their flow.
> >>
> >
> > I was thinking about repartitioning as a good design pattern as well.
> > Having your job always repartition the input decouples it from the it's
> > upstream topic dependencies.  This brings me to another question about
> > deployment.  Do you recommend having two separate Kafka clusters?  In the
> > "public" cluster, brokers would be deployed on machines by themselves.
> > Then you have another Kafka cluster for Samza in which the brokers are
> > co-located with YARN NodeManagers on each machine.  With this approach,
> > Samza topologies would consume from and ultimately publish to topics on
> the
> > "public" cluster.  All of the internal topics like repartitioning,
> > changelog, etc. would be hidden away in the Kafka cluster dedicated to
> > Samza.
> >
> >
> >>
> >> > All meaningfully-partitioned topics would need to include their keys
> in
> >> >the stream
> >>
> >> True. Somewhat tangential to this is the case where the key that's been
> >> used is not the one your job wishes to partition by. In this case, a
> >> repartition job would be required as well.
> >>
> >> > This would be problematic as the order of the dictionary keys can
> change
> >> >but would still mean the same thing.  In order to use JSON as a serde
> for
> >> >keys, you'd need to enforce a sort order on dictionaries.
> >>
> >> I struggled with this as well. We basically need a forced ordering for
> the
> >> JSON keys in SAMZA-348. Originally, I was thinking of making the
> key/value
> >> messages just a simple string with a delimiter. Something like
> >> <type>:<key> for the key and <host>:<source>:<blah> for the value. This
> >> approach is also much more compact than JSON. The problem with the
> latter
> >> approach is that it doesn't easily allow for hierarchical key/value
> pairs.
> >>
> >
> > I've been constructing string keys in my jobs so far as you mentioned but
> > it adds extra boilerplate to the code.  It would be nice if there were an
> > automatic way to do it.
> >
> >
> >>
> >> Cheers,
> >> Chris
> >>
> >> On 9/24/14 4:55 PM, "Roger Hoover" <ro...@gmail.com> wrote:
> >>
> >> >Hi all,
> >> >
> >> >So it seems like one of the first decisions that you have to make when
> >> >creating a Samza job is how many partitions to have in your input
> topics.
> >> >This will dictate how many tasks are created and how many changelog
> >> >partitions get created.  It's great that you can independently change
> the
> >> >number of Samza containers that get deployed but what do you do once
> you
> >> >reach the max (# containers == # tasks)?
> >> >
> >> >If the job's input topics are partitioned by key, then you cannot add
> >> more
> >> >partitions without corrupting existing state.  Does this come up for
> >> >people
> >> >in practice?  How do you handle it?
> >> >
> >> >Just trying to think it through, it seems like you need a procedure
> >> >something like this:
> >> >
> >> >1) Create new topics to hold the same data but with more partitions
> >> >(inputs, outputs, and changelog topics)
> >> >2) Deploy jobs to repartition inputs and changelog topics into the new
> >> >topics
> >> >3) When caught up, stop the running job
> >> >4) Change job config to point to new topics and restart the job (if all
> >> >topics are new, this can be done while previous job run is still active
> >> >using new job.id)
> >> >5) Change downstream jobs to use new output topic if necessary.  Doing
> >> >this
> >> >in a safe way might be hard.
> >> >
> >> >Ideally at some point, this process could be automated.  I was
> wondering
> >> >whether a generic task could be written for step #2 but I think it
> would
> >> >require a couple of constraints:
> >> >
> >> >1) All meaningfully-partitioned topics would need to include their keys
> >> in
> >> >the stream.  In Kafka, this is optional unless you enable compaction
> but
> >> >for this to work generically, it would have to be mandatory in Samza
> for
> >> >any stream for which partitions have meaning (not using random or
> >> >round-robin partitioning).
> >> >2) The partition keys should be re-hashable based on their raw byte
> >> >representation so that the repartition task would not have to know how
> to
> >> >deserialize the keys in order to compute their new partition.  At first
> >> >glance, this doesn't seem too onerous but I saw in the Config Stream
> >> >proposal (SAMZA-348) that keys might be JSON:
> >> >
> >>
> >>
> >{"type":"offset","key","my-long-system-name.my-even-longer-stream-name-tha
> >> >t-is-really-long.1000"
> >> >}
> >> >
> >> >This would be problematic as the order of the dictionary keys can
> change
> >> >but would still mean the same thing.  In order to use JSON as a serde
> for
> >> >keys, you'd need to enforce a sort order on dictionaries.
> >> >
> >> >I'm curious what others do about this or what your thoughts are.
> Thanks,
> >> >
> >> >Roger
> >>
> >>
> >
>

Re: How to deal with scaling?

Posted by Roger Hoover <ro...@gmail.com>.
Chris,

Would mind giving some advice on my deployment question below?

"Do you recommend having two separate Kafka clusters?  In the "public"
cluster, brokers would be deployed on machines by themselves.  Then you
have another Kafka cluster for Samza in which the brokers are co-located
with YARN NodeManagers on each machine.  With this approach, Samza
topologies would consume from and ultimately publish to topics on the
"public" cluster.  All of the internal topics like repartitioning,
changelog, etc. would be hidden away in the Kafka cluster dedicated to
Samza."

Thanks,

Roger

On Fri, Sep 26, 2014 at 11:20 AM, Roger Hoover <ro...@gmail.com>
wrote:

> Chris,
>
> Thanks for the great answers.  It's helping me clear up my thinking...
>
> On Fri, Sep 26, 2014 at 9:10 AM, Chris Riccomini <
> criccomini@linkedin.com.invalid> wrote:
>
>> Hey Roger,
>>
>> > If the job's input topics are partitioned by key, then you cannot add
>> >more partitions without corrupting existing state.
>>
>> This is correct.
>>
>> > Does this come up for people in practice?
>>
>> It does come up occasionally for us. Thus far, we usually just run a Kafka
>> topic-partition expansion (thereby trashing the semantics of the
>> partitioning) and restart the job. Inconsistent output is then emitted for
>> a while. We do this only when we agree that inconsistent output is
>> tolerable.
>>
>
> Thanks.  This might be a reasonable in many cases (not sure yet).
>
>
>>
>> Another thing we do for this is over-partition our Kafka topics when we're
>> concerned about growth.
>>
>> Both of these solutions are admittedly hacky. As you said, the ideal
>> solution would be some kind of automatic migration. It seems possible that
>> the AM (job coordinator) might be able to manage this, especially of we
>> had a pre-packaged "repartition job" that it could trigger. I haven't
>> thought about this in detail, though.
>>
>> > Deploy jobs to repartition inputs and changelog topics into the new
>> >topics
>>
>> The changelog topic seems problematic to me. It seems that they key used
>> in the changelog might not always be directly related to the partitioning
>> of the input topic. For example,  if you have a StreamTask that is
>> consuming a single input partition, and keeping a count in the state store
>> of all messages that it sees, how do you repartition this changelog? In
>> the new world, the keys for the single partition that it's consuming could
>> be spread across many different partitions, and the count is pretty much
>> meaningless, since it can't be split up by key.
>>
>> It almost feels like state has to be totally reset to safely do an input
>> partition expansion under all cases. In a sense, you have to treat the new
>> job as a job that's completely new, and start it from scratch.
>>
>
> Ah, you're right.  I think there's no way to migrate state in general.  If
> a job is saving any kind of aggregate state then that's an irreversible
> operation that was done on the old partition.  There's not enough
> information to "repartition" the results.
>
> Just to be more explicit about  "starting it from scratch".  The only way
> to do this theoretically correctly, I think, would be to have the newly
> partitioned job start with no state and playback it's input topics from the
> beginning of time.
>
>
>
>>
>> > Change job config to point to new topics and restart the job
>>
>> One problem with this is going to be the case where you don't control the
>> producers for the old input topic. They'd either have to be migrated to
>> produce to the new input topic for your job, or you'd have to permanently
>> run the repartition job to move data from the original topic to the
>> currently expanded topic. Keeping the repartition job is not all that wild
>> of an idea. Most Samza topologies we run have some form of a repartition
>> job that runs permanently at the beginning of their flow.
>>
>
> I was thinking about repartitioning as a good design pattern as well.
> Having your job always repartition the input decouples it from the it's
> upstream topic dependencies.  This brings me to another question about
> deployment.  Do you recommend having two separate Kafka clusters?  In the
> "public" cluster, brokers would be deployed on machines by themselves.
> Then you have another Kafka cluster for Samza in which the brokers are
> co-located with YARN NodeManagers on each machine.  With this approach,
> Samza topologies would consume from and ultimately publish to topics on the
> "public" cluster.  All of the internal topics like repartitioning,
> changelog, etc. would be hidden away in the Kafka cluster dedicated to
> Samza.
>
>
>>
>> > All meaningfully-partitioned topics would need to include their keys in
>> >the stream
>>
>> True. Somewhat tangential to this is the case where the key that's been
>> used is not the one your job wishes to partition by. In this case, a
>> repartition job would be required as well.
>>
>> > This would be problematic as the order of the dictionary keys can change
>> >but would still mean the same thing.  In order to use JSON as a serde for
>> >keys, you'd need to enforce a sort order on dictionaries.
>>
>> I struggled with this as well. We basically need a forced ordering for the
>> JSON keys in SAMZA-348. Originally, I was thinking of making the key/value
>> messages just a simple string with a delimiter. Something like
>> <type>:<key> for the key and <host>:<source>:<blah> for the value. This
>> approach is also much more compact than JSON. The problem with the latter
>> approach is that it doesn't easily allow for hierarchical key/value pairs.
>>
>
> I've been constructing string keys in my jobs so far as you mentioned but
> it adds extra boilerplate to the code.  It would be nice if there were an
> automatic way to do it.
>
>
>>
>> Cheers,
>> Chris
>>
>> On 9/24/14 4:55 PM, "Roger Hoover" <ro...@gmail.com> wrote:
>>
>> >Hi all,
>> >
>> >So it seems like one of the first decisions that you have to make when
>> >creating a Samza job is how many partitions to have in your input topics.
>> >This will dictate how many tasks are created and how many changelog
>> >partitions get created.  It's great that you can independently change the
>> >number of Samza containers that get deployed but what do you do once you
>> >reach the max (# containers == # tasks)?
>> >
>> >If the job's input topics are partitioned by key, then you cannot add
>> more
>> >partitions without corrupting existing state.  Does this come up for
>> >people
>> >in practice?  How do you handle it?
>> >
>> >Just trying to think it through, it seems like you need a procedure
>> >something like this:
>> >
>> >1) Create new topics to hold the same data but with more partitions
>> >(inputs, outputs, and changelog topics)
>> >2) Deploy jobs to repartition inputs and changelog topics into the new
>> >topics
>> >3) When caught up, stop the running job
>> >4) Change job config to point to new topics and restart the job (if all
>> >topics are new, this can be done while previous job run is still active
>> >using new job.id)
>> >5) Change downstream jobs to use new output topic if necessary.  Doing
>> >this
>> >in a safe way might be hard.
>> >
>> >Ideally at some point, this process could be automated.  I was wondering
>> >whether a generic task could be written for step #2 but I think it would
>> >require a couple of constraints:
>> >
>> >1) All meaningfully-partitioned topics would need to include their keys
>> in
>> >the stream.  In Kafka, this is optional unless you enable compaction but
>> >for this to work generically, it would have to be mandatory in Samza for
>> >any stream for which partitions have meaning (not using random or
>> >round-robin partitioning).
>> >2) The partition keys should be re-hashable based on their raw byte
>> >representation so that the repartition task would not have to know how to
>> >deserialize the keys in order to compute their new partition.  At first
>> >glance, this doesn't seem too onerous but I saw in the Config Stream
>> >proposal (SAMZA-348) that keys might be JSON:
>> >
>>
>> >{"type":"offset","key","my-long-system-name.my-even-longer-stream-name-tha
>> >t-is-really-long.1000"
>> >}
>> >
>> >This would be problematic as the order of the dictionary keys can change
>> >but would still mean the same thing.  In order to use JSON as a serde for
>> >keys, you'd need to enforce a sort order on dictionaries.
>> >
>> >I'm curious what others do about this or what your thoughts are.  Thanks,
>> >
>> >Roger
>>
>>
>

Re: How to deal with scaling?

Posted by Roger Hoover <ro...@gmail.com>.
Chris,

Thanks for the great answers.  It's helping me clear up my thinking...

On Fri, Sep 26, 2014 at 9:10 AM, Chris Riccomini <
criccomini@linkedin.com.invalid> wrote:

> Hey Roger,
>
> > If the job's input topics are partitioned by key, then you cannot add
> >more partitions without corrupting existing state.
>
> This is correct.
>
> > Does this come up for people in practice?
>
> It does come up occasionally for us. Thus far, we usually just run a Kafka
> topic-partition expansion (thereby trashing the semantics of the
> partitioning) and restart the job. Inconsistent output is then emitted for
> a while. We do this only when we agree that inconsistent output is
> tolerable.
>

Thanks.  This might be a reasonable in many cases (not sure yet).


>
> Another thing we do for this is over-partition our Kafka topics when we're
> concerned about growth.
>
> Both of these solutions are admittedly hacky. As you said, the ideal
> solution would be some kind of automatic migration. It seems possible that
> the AM (job coordinator) might be able to manage this, especially of we
> had a pre-packaged "repartition job" that it could trigger. I haven't
> thought about this in detail, though.
>
> > Deploy jobs to repartition inputs and changelog topics into the new
> >topics
>
> The changelog topic seems problematic to me. It seems that they key used
> in the changelog might not always be directly related to the partitioning
> of the input topic. For example,  if you have a StreamTask that is
> consuming a single input partition, and keeping a count in the state store
> of all messages that it sees, how do you repartition this changelog? In
> the new world, the keys for the single partition that it's consuming could
> be spread across many different partitions, and the count is pretty much
> meaningless, since it can't be split up by key.
>
> It almost feels like state has to be totally reset to safely do an input
> partition expansion under all cases. In a sense, you have to treat the new
> job as a job that's completely new, and start it from scratch.
>

Ah, you're right.  I think there's no way to migrate state in general.  If
a job is saving any kind of aggregate state then that's an irreversible
operation that was done on the old partition.  There's not enough
information to "repartition" the results.

Just to be more explicit about  "starting it from scratch".  The only way
to do this theoretically correctly, I think, would be to have the newly
partitioned job start with no state and playback it's input topics from the
beginning of time.



>
> > Change job config to point to new topics and restart the job
>
> One problem with this is going to be the case where you don't control the
> producers for the old input topic. They'd either have to be migrated to
> produce to the new input topic for your job, or you'd have to permanently
> run the repartition job to move data from the original topic to the
> currently expanded topic. Keeping the repartition job is not all that wild
> of an idea. Most Samza topologies we run have some form of a repartition
> job that runs permanently at the beginning of their flow.
>

I was thinking about repartitioning as a good design pattern as well.
Having your job always repartition the input decouples it from the it's
upstream topic dependencies.  This brings me to another question about
deployment.  Do you recommend having two separate Kafka clusters?  In the
"public" cluster, brokers would be deployed on machines by themselves.
Then you have another Kafka cluster for Samza in which the brokers are
co-located with YARN NodeManagers on each machine.  With this approach,
Samza topologies would consume from and ultimately publish to topics on the
"public" cluster.  All of the internal topics like repartitioning,
changelog, etc. would be hidden away in the Kafka cluster dedicated to
Samza.


>
> > All meaningfully-partitioned topics would need to include their keys in
> >the stream
>
> True. Somewhat tangential to this is the case where the key that's been
> used is not the one your job wishes to partition by. In this case, a
> repartition job would be required as well.
>
> > This would be problematic as the order of the dictionary keys can change
> >but would still mean the same thing.  In order to use JSON as a serde for
> >keys, you'd need to enforce a sort order on dictionaries.
>
> I struggled with this as well. We basically need a forced ordering for the
> JSON keys in SAMZA-348. Originally, I was thinking of making the key/value
> messages just a simple string with a delimiter. Something like
> <type>:<key> for the key and <host>:<source>:<blah> for the value. This
> approach is also much more compact than JSON. The problem with the latter
> approach is that it doesn't easily allow for hierarchical key/value pairs.
>

I've been constructing string keys in my jobs so far as you mentioned but
it adds extra boilerplate to the code.  It would be nice if there were an
automatic way to do it.


>
> Cheers,
> Chris
>
> On 9/24/14 4:55 PM, "Roger Hoover" <ro...@gmail.com> wrote:
>
> >Hi all,
> >
> >So it seems like one of the first decisions that you have to make when
> >creating a Samza job is how many partitions to have in your input topics.
> >This will dictate how many tasks are created and how many changelog
> >partitions get created.  It's great that you can independently change the
> >number of Samza containers that get deployed but what do you do once you
> >reach the max (# containers == # tasks)?
> >
> >If the job's input topics are partitioned by key, then you cannot add more
> >partitions without corrupting existing state.  Does this come up for
> >people
> >in practice?  How do you handle it?
> >
> >Just trying to think it through, it seems like you need a procedure
> >something like this:
> >
> >1) Create new topics to hold the same data but with more partitions
> >(inputs, outputs, and changelog topics)
> >2) Deploy jobs to repartition inputs and changelog topics into the new
> >topics
> >3) When caught up, stop the running job
> >4) Change job config to point to new topics and restart the job (if all
> >topics are new, this can be done while previous job run is still active
> >using new job.id)
> >5) Change downstream jobs to use new output topic if necessary.  Doing
> >this
> >in a safe way might be hard.
> >
> >Ideally at some point, this process could be automated.  I was wondering
> >whether a generic task could be written for step #2 but I think it would
> >require a couple of constraints:
> >
> >1) All meaningfully-partitioned topics would need to include their keys in
> >the stream.  In Kafka, this is optional unless you enable compaction but
> >for this to work generically, it would have to be mandatory in Samza for
> >any stream for which partitions have meaning (not using random or
> >round-robin partitioning).
> >2) The partition keys should be re-hashable based on their raw byte
> >representation so that the repartition task would not have to know how to
> >deserialize the keys in order to compute their new partition.  At first
> >glance, this doesn't seem too onerous but I saw in the Config Stream
> >proposal (SAMZA-348) that keys might be JSON:
> >
> >{"type":"offset","key","my-long-system-name.my-even-longer-stream-name-tha
> >t-is-really-long.1000"
> >}
> >
> >This would be problematic as the order of the dictionary keys can change
> >but would still mean the same thing.  In order to use JSON as a serde for
> >keys, you'd need to enforce a sort order on dictionaries.
> >
> >I'm curious what others do about this or what your thoughts are.  Thanks,
> >
> >Roger
>
>

Re: How to deal with scaling?

Posted by Chris Riccomini <cr...@linkedin.com.INVALID>.
Hey Roger,

> If the job's input topics are partitioned by key, then you cannot add
>more partitions without corrupting existing state.

This is correct.

> Does this come up for people in practice?

It does come up occasionally for us. Thus far, we usually just run a Kafka
topic-partition expansion (thereby trashing the semantics of the
partitioning) and restart the job. Inconsistent output is then emitted for
a while. We do this only when we agree that inconsistent output is
tolerable.

Another thing we do for this is over-partition our Kafka topics when we're
concerned about growth.

Both of these solutions are admittedly hacky. As you said, the ideal
solution would be some kind of automatic migration. It seems possible that
the AM (job coordinator) might be able to manage this, especially of we
had a pre-packaged "repartition job" that it could trigger. I haven't
thought about this in detail, though.

> Deploy jobs to repartition inputs and changelog topics into the new
>topics

The changelog topic seems problematic to me. It seems that they key used
in the changelog might not always be directly related to the partitioning
of the input topic. For example,  if you have a StreamTask that is
consuming a single input partition, and keeping a count in the state store
of all messages that it sees, how do you repartition this changelog? In
the new world, the keys for the single partition that it's consuming could
be spread across many different partitions, and the count is pretty much
meaningless, since it can't be split up by key.

It almost feels like state has to be totally reset to safely do an input
partition expansion under all cases. In a sense, you have to treat the new
job as a job that's completely new, and start it from scratch.

> Change job config to point to new topics and restart the job

One problem with this is going to be the case where you don't control the
producers for the old input topic. They'd either have to be migrated to
produce to the new input topic for your job, or you'd have to permanently
run the repartition job to move data from the original topic to the
currently expanded topic. Keeping the repartition job is not all that wild
of an idea. Most Samza topologies we run have some form of a repartition
job that runs permanently at the beginning of their flow.

> All meaningfully-partitioned topics would need to include their keys in
>the stream

True. Somewhat tangential to this is the case where the key that's been
used is not the one your job wishes to partition by. In this case, a
repartition job would be required as well.

> This would be problematic as the order of the dictionary keys can change
>but would still mean the same thing.  In order to use JSON as a serde for
>keys, you'd need to enforce a sort order on dictionaries.

I struggled with this as well. We basically need a forced ordering for the
JSON keys in SAMZA-348. Originally, I was thinking of making the key/value
messages just a simple string with a delimiter. Something like
<type>:<key> for the key and <host>:<source>:<blah> for the value. This
approach is also much more compact than JSON. The problem with the latter
approach is that it doesn't easily allow for hierarchical key/value pairs.

Cheers,
Chris

On 9/24/14 4:55 PM, "Roger Hoover" <ro...@gmail.com> wrote:

>Hi all,
>
>So it seems like one of the first decisions that you have to make when
>creating a Samza job is how many partitions to have in your input topics.
>This will dictate how many tasks are created and how many changelog
>partitions get created.  It's great that you can independently change the
>number of Samza containers that get deployed but what do you do once you
>reach the max (# containers == # tasks)?
>
>If the job's input topics are partitioned by key, then you cannot add more
>partitions without corrupting existing state.  Does this come up for
>people
>in practice?  How do you handle it?
>
>Just trying to think it through, it seems like you need a procedure
>something like this:
>
>1) Create new topics to hold the same data but with more partitions
>(inputs, outputs, and changelog topics)
>2) Deploy jobs to repartition inputs and changelog topics into the new
>topics
>3) When caught up, stop the running job
>4) Change job config to point to new topics and restart the job (if all
>topics are new, this can be done while previous job run is still active
>using new job.id)
>5) Change downstream jobs to use new output topic if necessary.  Doing
>this
>in a safe way might be hard.
>
>Ideally at some point, this process could be automated.  I was wondering
>whether a generic task could be written for step #2 but I think it would
>require a couple of constraints:
>
>1) All meaningfully-partitioned topics would need to include their keys in
>the stream.  In Kafka, this is optional unless you enable compaction but
>for this to work generically, it would have to be mandatory in Samza for
>any stream for which partitions have meaning (not using random or
>round-robin partitioning).
>2) The partition keys should be re-hashable based on their raw byte
>representation so that the repartition task would not have to know how to
>deserialize the keys in order to compute their new partition.  At first
>glance, this doesn't seem too onerous but I saw in the Config Stream
>proposal (SAMZA-348) that keys might be JSON:
>
>{"type":"offset","key","my-long-system-name.my-even-longer-stream-name-tha
>t-is-really-long.1000"
>}
>
>This would be problematic as the order of the dictionary keys can change
>but would still mean the same thing.  In order to use JSON as a serde for
>keys, you'd need to enforce a sort order on dictionaries.
>
>I'm curious what others do about this or what your thoughts are.  Thanks,
>
>Roger


Re: How to deal with scaling?

Posted by Dan Di Spaltro <da...@gmail.com>.
I am interested in this as well, particularly because coming from a grown
system (LI) this design makes sense, but with a growing system it's hard to
parse in my head how this would work.  Great question!

On Wed, Sep 24, 2014 at 4:55 PM, Roger Hoover <ro...@gmail.com>
wrote:

> Hi all,
>
> So it seems like one of the first decisions that you have to make when
> creating a Samza job is how many partitions to have in your input topics.
> This will dictate how many tasks are created and how many changelog
> partitions get created.  It's great that you can independently change the
> number of Samza containers that get deployed but what do you do once you
> reach the max (# containers == # tasks)?
>
> If the job's input topics are partitioned by key, then you cannot add more
> partitions without corrupting existing state.  Does this come up for people
> in practice?  How do you handle it?
>
> Just trying to think it through, it seems like you need a procedure
> something like this:
>
> 1) Create new topics to hold the same data but with more partitions
> (inputs, outputs, and changelog topics)
> 2) Deploy jobs to repartition inputs and changelog topics into the new
> topics
> 3) When caught up, stop the running job
> 4) Change job config to point to new topics and restart the job (if all
> topics are new, this can be done while previous job run is still active
> using new job.id)
> 5) Change downstream jobs to use new output topic if necessary.  Doing this
> in a safe way might be hard.
>
> Ideally at some point, this process could be automated.  I was wondering
> whether a generic task could be written for step #2 but I think it would
> require a couple of constraints:
>
> 1) All meaningfully-partitioned topics would need to include their keys in
> the stream.  In Kafka, this is optional unless you enable compaction but
> for this to work generically, it would have to be mandatory in Samza for
> any stream for which partitions have meaning (not using random or
> round-robin partitioning).
> 2) The partition keys should be re-hashable based on their raw byte
> representation so that the repartition task would not have to know how to
> deserialize the keys in order to compute their new partition.  At first
> glance, this doesn't seem too onerous but I saw in the Config Stream
> proposal (SAMZA-348) that keys might be JSON:
>
>
> {"type":"offset","key","my-long-system-name.my-even-longer-stream-name-that-is-really-long.1000"
> }
>
> This would be problematic as the order of the dictionary keys can change
> but would still mean the same thing.  In order to use JSON as a serde for
> keys, you'd need to enforce a sort order on dictionaries.
>
> I'm curious what others do about this or what your thoughts are.  Thanks,
>
> Roger
>



-- 
Dan Di Spaltro