You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@samza.apache.org by Tommy Becker <to...@tivo.com> on 2016/08/11 18:56:42 UTC

Question on changelog partition mapping

We recently had an issue that caused us to lose the contents of one of our Samza job's checkpoint topics. We were not that concerned about losing the checkpointed offsets and so we restarted the job. We then started seeing some very strange results and were able to trace it back to the fact that changelog paritition mapping changed. We were unaware this data was stored in the checkpoint topic. Can someone explain why this mapping is necessary? I was under the impression that the number of changelog partitions is identical to the number of task instances. If this is so, can't partitions just be assigned based on the task number? Assuming the mapping is necessary, it would be nice if it was deterministic. Looking at JobCoordinator, it seems to be dependent on the order in which things come back in the map produced by the SystemStreamPartitionGrouper. This non-determinism seems to have been the cause of our issues. Obviously data loss is a problem, but it seems like Samza could have recreated the original mapping. Should I file a bug on this?

--
Tommy Becker
Senior Software Engineer

Digitalsmiths
A TiVo Company

www.digitalsmiths.com<http://www.digitalsmiths.com>
tobecker@tivo.com<ma...@tivo.com>

________________________________

This email and any attachments may contain confidential and privileged material for the sole use of the intended recipient. Any review, copying, or distribution of this email (or any attachments) by others is prohibited. If you are not the intended recipient, please contact the sender immediately and permanently delete this email and any attachments. No employee or agent of TiVo Inc. is authorized to conclude any binding agreement on behalf of TiVo Inc. by email. Binding agreements with TiVo Inc. may only be made by a signed written agreement.

Re: Question on changelog partition mapping

Posted by Tommy Becker <to...@tivo.com>.
Done. https://issues.apache.org/jira/browse/SAMZA-1012

On 08/26/2016 06:57 PM, Yi Pan wrote:

Hi, Tommy,

It is perfectly fine. Would you please open a JIRA to include this
improvement?

Thanks!

-Yi

On Fri, Aug 26, 2016 at 6:11 AM, Tommy Becker <to...@tivo.com> wrote:



Hey Yi,

Apologies for the lateness of my reply. Yeah that makes sense, and we can
certainly implement. Would you consider accepting a PR that makes this
change to the standard groupers? It's just strange that the generated
partition mappings can vary like this, even for identical inputs.

-Tommy


On 08/16/2016 03:04 PM, Yi Pan wrote:

Hi, Tommy,

Yes. Now I understand what you referred to as "non-determinism". The design
of the JobCoordinator has the thought that if "no-previous run is found, we
are free to start from scratch" in mind. I think the current solution that
you can try  is to implement a grouper that will guarantee the order of
groups coming out of the group() method.

Does that make sense?

Thanks!

-Yi

On Fri, Aug 12, 2016 at 5:31 AM, Tommy Becker <to...@tivo.com><mailto:
tobecker@tivo.com><ma...@tivo.com> wrote:



Hi Yi,

Thanks for the response. We are running Samza 0.9.1, so we do not yet have
the coordinator stream. But to answer your other questions, the number of
task instances did not change. Specifically, none of the input topic, the
number of partitions in that topic, nor the grouper algorithm changed. The
non-determinism I am referring to can be seen here:

https://github.com/apache/samza/blob/0.9.1/samza-core/src/
main/scala/org/apache/samza/coordinator/JobCoordinator.scala#L141

Since we lost the original mapping, there is no previousChangelogeMapping
(sic) and the code creates a new mapping by simply assigning sequential
partition numbers from 0 to the number of tasks. But the order in which
these are assigned seems to be determined by the order of the TaskNames in
the map returned by the SystemStreamPartitionGrouper (i.e. no meaningful
order). So there is no guarantee that this code will produce the same
changelog mapping each time it runs, even if the number of tasks is the
same. Does that make sense?  The code has changed some since 0.9.1 but
seems to have the same issue even in 0.10.1.

-Tommy

On 08/11/2016 06:12 PM, Yi Pan wrote:

Hi, Tommy,

Which version of Samza are you using? Since 0.10, the changelog partition
mapping has been moved to the coordinator stream, not in the checkpoint
topic any more.

That said, I want to ask a few more questions to understand what you
referred to as "non-deterministic" behavior. So, between the job restarts,
did the total number of tasks change? As you have observed, the total
number of partitions in a changelog topic is equivalent to the total number
of tasks in a job. And the reasons for the total number of tasks to change
include:
- the input topic partition changed
- the grouper algorithm changed
In both cases, the states are no longer considered valid, since data may
have been shuffled between the Kafka partitions, or between the tasks
already.

Could you clarify whether you saw the "non-determinism" w/ or w/o the total
number of tasks changed?

Thanks!

-Yi

On Thu, Aug 11, 2016 at 11:56 AM, Tommy Becker <to...@tivo.com><mailto:
tobecker@tivo.com><ma...@tivo.com><mailto:
tobecker@tivo.com><ma...@tivo.com> wrote:



We recently had an issue that caused us to lose the contents of one of our
Samza job's checkpoint topics. We were not that concerned about losing the
checkpointed offsets and so we restarted the job. We then started seeing
some very strange results and were able to trace it back to the fact that
changelog paritition mapping changed. We were unaware this data was stored
in the checkpoint topic. Can someone explain why this mapping is necessary?
I was under the impression that the number of changelog partitions is
identical to the number of task instances. If this is so, can't partitions
just be assigned based on the task number? Assuming the mapping is
necessary, it would be nice if it was deterministic. Looking at
JobCoordinator, it seems to be dependent on the order in which things come
back in the map produced by the SystemStreamPartitionGrouper. This
non-determinism seems to have been the cause of our issues. Obviously data
loss is a problem, but it seems like Samza could have recreated the
original mapping. Should I file a bug on this?

--
Tommy Becker
Senior Software Engineer

Digitalsmiths
A TiVo Company

www.digitalsmiths.com<http://www.digitalsmiths.com><http://www.digitalsmiths.com><http://www.digitalsmiths.com><http://w
ww.digitalsmiths.com><http://www.digitalsmiths.com><http://www.digitalsmiths.com><http://www.digitalsmiths.com><http://w
ww.digitalsmiths.com><http://www.digitalsmiths.com><http://www.digitalsmiths.com><http://www.digitalsmiths.com><http://w
ww.digitalsmiths.com><http://www.digitalsmiths.com><http://www.digitalsmiths.com><http://www.digitalsmiths.com>
tobecker@tivo.com<ma...@tivo.com><mailto:tobecker@tivo.com


<ma...@tivo.com><mailto:tobecker@tivo.com





<ma...@tivo.com>




________________________________

This email and any attachments may contain confidential and privileged
material for the sole use of the intended recipient. Any review, copying,
or distribution of this email (or any attachments) by others is prohibited.
If you are not the intended recipient, please contact the sender
immediately and permanently delete this email and any attachments. No
employee or agent of TiVo Inc. is authorized to conclude any binding
agreement on behalf of TiVo Inc. by email. Binding agreements with TiVo
Inc. may only be made by a signed written agreement.






--
Tommy Becker
Senior Software Engineer

Digitalsmiths
A TiVo Company

www.digitalsmiths.com<http://www.digitalsmiths.com><http://www.digitalsmiths.com><http://www.digitalsmiths.com><http://w
ww.digitalsmiths.com><http://www.digitalsmiths.com><http://www.digitalsmiths.com><http://www.digitalsmiths.com>
tobecker@tivo.com<ma...@tivo.com><mailto:tobecker@tivo.com


<ma...@tivo.com>



________________________________

This email and any attachments may contain confidential and privileged
material for the sole use of the intended recipient. Any review, copying,
or distribution of this email (or any attachments) by others is prohibited.
If you are not the intended recipient, please contact the sender
immediately and permanently delete this email and any attachments. No
employee or agent of TiVo Inc. is authorized to conclude any binding
agreement on behalf of TiVo Inc. by email. Binding agreements with TiVo
Inc. may only be made by a signed written agreement.






--
Tommy Becker
Senior Software Engineer

Digitalsmiths
A TiVo Company

www.digitalsmiths.com<http://www.digitalsmiths.com><http://www.digitalsmiths.com><http://www.digitalsmiths.com>
tobecker@tivo.com<ma...@tivo.com>

________________________________

This email and any attachments may contain confidential and privileged
material for the sole use of the intended recipient. Any review, copying,
or distribution of this email (or any attachments) by others is prohibited.
If you are not the intended recipient, please contact the sender
immediately and permanently delete this email and any attachments. No
employee or agent of TiVo Inc. is authorized to conclude any binding
agreement on behalf of TiVo Inc. by email. Binding agreements with TiVo
Inc. may only be made by a signed written agreement.






--
Tommy Becker
Senior Software Engineer

Digitalsmiths
A TiVo Company

www.digitalsmiths.com<http://www.digitalsmiths.com>
tobecker@tivo.com<ma...@tivo.com>

________________________________

This email and any attachments may contain confidential and privileged material for the sole use of the intended recipient. Any review, copying, or distribution of this email (or any attachments) by others is prohibited. If you are not the intended recipient, please contact the sender immediately and permanently delete this email and any attachments. No employee or agent of TiVo Inc. is authorized to conclude any binding agreement on behalf of TiVo Inc. by email. Binding agreements with TiVo Inc. may only be made by a signed written agreement.

Re: Question on changelog partition mapping

Posted by Yi Pan <ni...@gmail.com>.
Hi, Tommy,

It is perfectly fine. Would you please open a JIRA to include this
improvement?

Thanks!

-Yi

On Fri, Aug 26, 2016 at 6:11 AM, Tommy Becker <to...@tivo.com> wrote:

> Hey Yi,
>
> Apologies for the lateness of my reply. Yeah that makes sense, and we can
> certainly implement. Would you consider accepting a PR that makes this
> change to the standard groupers? It's just strange that the generated
> partition mappings can vary like this, even for identical inputs.
>
> -Tommy
>
>
> On 08/16/2016 03:04 PM, Yi Pan wrote:
>
> Hi, Tommy,
>
> Yes. Now I understand what you referred to as "non-determinism". The design
> of the JobCoordinator has the thought that if "no-previous run is found, we
> are free to start from scratch" in mind. I think the current solution that
> you can try  is to implement a grouper that will guarantee the order of
> groups coming out of the group() method.
>
> Does that make sense?
>
> Thanks!
>
> -Yi
>
> On Fri, Aug 12, 2016 at 5:31 AM, Tommy Becker <to...@tivo.com><mailto:
> tobecker@tivo.com> wrote:
>
>
>
> Hi Yi,
>
> Thanks for the response. We are running Samza 0.9.1, so we do not yet have
> the coordinator stream. But to answer your other questions, the number of
> task instances did not change. Specifically, none of the input topic, the
> number of partitions in that topic, nor the grouper algorithm changed. The
> non-determinism I am referring to can be seen here:
>
> https://github.com/apache/samza/blob/0.9.1/samza-core/src/
> main/scala/org/apache/samza/coordinator/JobCoordinator.scala#L141
>
> Since we lost the original mapping, there is no previousChangelogeMapping
> (sic) and the code creates a new mapping by simply assigning sequential
> partition numbers from 0 to the number of tasks. But the order in which
> these are assigned seems to be determined by the order of the TaskNames in
> the map returned by the SystemStreamPartitionGrouper (i.e. no meaningful
> order). So there is no guarantee that this code will produce the same
> changelog mapping each time it runs, even if the number of tasks is the
> same. Does that make sense?  The code has changed some since 0.9.1 but
> seems to have the same issue even in 0.10.1.
>
> -Tommy
>
> On 08/11/2016 06:12 PM, Yi Pan wrote:
>
> Hi, Tommy,
>
> Which version of Samza are you using? Since 0.10, the changelog partition
> mapping has been moved to the coordinator stream, not in the checkpoint
> topic any more.
>
> That said, I want to ask a few more questions to understand what you
> referred to as "non-deterministic" behavior. So, between the job restarts,
> did the total number of tasks change? As you have observed, the total
> number of partitions in a changelog topic is equivalent to the total number
> of tasks in a job. And the reasons for the total number of tasks to change
> include:
> - the input topic partition changed
> - the grouper algorithm changed
> In both cases, the states are no longer considered valid, since data may
> have been shuffled between the Kafka partitions, or between the tasks
> already.
>
> Could you clarify whether you saw the "non-determinism" w/ or w/o the total
> number of tasks changed?
>
> Thanks!
>
> -Yi
>
> On Thu, Aug 11, 2016 at 11:56 AM, Tommy Becker <to...@tivo.com><mailto:
> tobecker@tivo.com><mailto:
> tobecker@tivo.com><ma...@tivo.com> wrote:
>
>
>
> We recently had an issue that caused us to lose the contents of one of our
> Samza job's checkpoint topics. We were not that concerned about losing the
> checkpointed offsets and so we restarted the job. We then started seeing
> some very strange results and were able to trace it back to the fact that
> changelog paritition mapping changed. We were unaware this data was stored
> in the checkpoint topic. Can someone explain why this mapping is necessary?
> I was under the impression that the number of changelog partitions is
> identical to the number of task instances. If this is so, can't partitions
> just be assigned based on the task number? Assuming the mapping is
> necessary, it would be nice if it was deterministic. Looking at
> JobCoordinator, it seems to be dependent on the order in which things come
> back in the map produced by the SystemStreamPartitionGrouper. This
> non-determinism seems to have been the cause of our issues. Obviously data
> loss is a problem, but it seems like Samza could have recreated the
> original mapping. Should I file a bug on this?
>
> --
> Tommy Becker
> Senior Software Engineer
>
> Digitalsmiths
> A TiVo Company
>
> www.digitalsmiths.com<http://www.digitalsmiths.com><http://w
> ww.digitalsmiths.com><http://www.digitalsmiths.com><http://w
> ww.digitalsmiths.com><http://www.digitalsmiths.com><http://w
> ww.digitalsmiths.com><http://www.digitalsmiths.com>
> tobecker@tivo.com<ma...@tivo.com><mailto:tobecker@tivo.com
> ><ma...@tivo.com><mailto:tobecker@tivo.com
>
>
>
> <ma...@tivo.com>
>
>
>
>
> ________________________________
>
> This email and any attachments may contain confidential and privileged
> material for the sole use of the intended recipient. Any review, copying,
> or distribution of this email (or any attachments) by others is prohibited.
> If you are not the intended recipient, please contact the sender
> immediately and permanently delete this email and any attachments. No
> employee or agent of TiVo Inc. is authorized to conclude any binding
> agreement on behalf of TiVo Inc. by email. Binding agreements with TiVo
> Inc. may only be made by a signed written agreement.
>
>
>
>
>
>
> --
> Tommy Becker
> Senior Software Engineer
>
> Digitalsmiths
> A TiVo Company
>
> www.digitalsmiths.com<http://www.digitalsmiths.com><http://w
> ww.digitalsmiths.com><http://www.digitalsmiths.com>
> tobecker@tivo.com<ma...@tivo.com><mailto:tobecker@tivo.com
> ><ma...@tivo.com>
>
> ________________________________
>
> This email and any attachments may contain confidential and privileged
> material for the sole use of the intended recipient. Any review, copying,
> or distribution of this email (or any attachments) by others is prohibited.
> If you are not the intended recipient, please contact the sender
> immediately and permanently delete this email and any attachments. No
> employee or agent of TiVo Inc. is authorized to conclude any binding
> agreement on behalf of TiVo Inc. by email. Binding agreements with TiVo
> Inc. may only be made by a signed written agreement.
>
>
>
>
>
>
> --
> Tommy Becker
> Senior Software Engineer
>
> Digitalsmiths
> A TiVo Company
>
> www.digitalsmiths.com<http://www.digitalsmiths.com>
> tobecker@tivo.com<ma...@tivo.com>
>
> ________________________________
>
> This email and any attachments may contain confidential and privileged
> material for the sole use of the intended recipient. Any review, copying,
> or distribution of this email (or any attachments) by others is prohibited.
> If you are not the intended recipient, please contact the sender
> immediately and permanently delete this email and any attachments. No
> employee or agent of TiVo Inc. is authorized to conclude any binding
> agreement on behalf of TiVo Inc. by email. Binding agreements with TiVo
> Inc. may only be made by a signed written agreement.
>

Re: Question on changelog partition mapping

Posted by Tommy Becker <to...@tivo.com>.
Hey Yi,

Apologies for the lateness of my reply. Yeah that makes sense, and we can certainly implement. Would you consider accepting a PR that makes this change to the standard groupers? It's just strange that the generated partition mappings can vary like this, even for identical inputs.

-Tommy


On 08/16/2016 03:04 PM, Yi Pan wrote:

Hi, Tommy,

Yes. Now I understand what you referred to as "non-determinism". The design
of the JobCoordinator has the thought that if "no-previous run is found, we
are free to start from scratch" in mind. I think the current solution that
you can try  is to implement a grouper that will guarantee the order of
groups coming out of the group() method.

Does that make sense?

Thanks!

-Yi

On Fri, Aug 12, 2016 at 5:31 AM, Tommy Becker <to...@tivo.com> wrote:



Hi Yi,

Thanks for the response. We are running Samza 0.9.1, so we do not yet have
the coordinator stream. But to answer your other questions, the number of
task instances did not change. Specifically, none of the input topic, the
number of partitions in that topic, nor the grouper algorithm changed. The
non-determinism I am referring to can be seen here:

https://github.com/apache/samza/blob/0.9.1/samza-core/src/
main/scala/org/apache/samza/coordinator/JobCoordinator.scala#L141

Since we lost the original mapping, there is no previousChangelogeMapping
(sic) and the code creates a new mapping by simply assigning sequential
partition numbers from 0 to the number of tasks. But the order in which
these are assigned seems to be determined by the order of the TaskNames in
the map returned by the SystemStreamPartitionGrouper (i.e. no meaningful
order). So there is no guarantee that this code will produce the same
changelog mapping each time it runs, even if the number of tasks is the
same. Does that make sense?  The code has changed some since 0.9.1 but
seems to have the same issue even in 0.10.1.

-Tommy

On 08/11/2016 06:12 PM, Yi Pan wrote:

Hi, Tommy,

Which version of Samza are you using? Since 0.10, the changelog partition
mapping has been moved to the coordinator stream, not in the checkpoint
topic any more.

That said, I want to ask a few more questions to understand what you
referred to as "non-deterministic" behavior. So, between the job restarts,
did the total number of tasks change? As you have observed, the total
number of partitions in a changelog topic is equivalent to the total number
of tasks in a job. And the reasons for the total number of tasks to change
include:
- the input topic partition changed
- the grouper algorithm changed
In both cases, the states are no longer considered valid, since data may
have been shuffled between the Kafka partitions, or between the tasks
already.

Could you clarify whether you saw the "non-determinism" w/ or w/o the total
number of tasks changed?

Thanks!

-Yi

On Thu, Aug 11, 2016 at 11:56 AM, Tommy Becker <to...@tivo.com><mailto:
tobecker@tivo.com><ma...@tivo.com> wrote:



We recently had an issue that caused us to lose the contents of one of our
Samza job's checkpoint topics. We were not that concerned about losing the
checkpointed offsets and so we restarted the job. We then started seeing
some very strange results and were able to trace it back to the fact that
changelog paritition mapping changed. We were unaware this data was stored
in the checkpoint topic. Can someone explain why this mapping is necessary?
I was under the impression that the number of changelog partitions is
identical to the number of task instances. If this is so, can't partitions
just be assigned based on the task number? Assuming the mapping is
necessary, it would be nice if it was deterministic. Looking at
JobCoordinator, it seems to be dependent on the order in which things come
back in the map produced by the SystemStreamPartitionGrouper. This
non-determinism seems to have been the cause of our issues. Obviously data
loss is a problem, but it seems like Samza could have recreated the
original mapping. Should I file a bug on this?

--
Tommy Becker
Senior Software Engineer

Digitalsmiths
A TiVo Company

www.digitalsmiths.com<http://www.digitalsmiths.com><http://www.digitalsmiths.com><http://www.digitalsmiths.com><http://w
ww.digitalsmiths.com><http://www.digitalsmiths.com><http://www.digitalsmiths.com><http://www.digitalsmiths.com>
tobecker@tivo.com<ma...@tivo.com><mailto:tobecker@tivo.com


<ma...@tivo.com>




________________________________

This email and any attachments may contain confidential and privileged
material for the sole use of the intended recipient. Any review, copying,
or distribution of this email (or any attachments) by others is prohibited.
If you are not the intended recipient, please contact the sender
immediately and permanently delete this email and any attachments. No
employee or agent of TiVo Inc. is authorized to conclude any binding
agreement on behalf of TiVo Inc. by email. Binding agreements with TiVo
Inc. may only be made by a signed written agreement.






--
Tommy Becker
Senior Software Engineer

Digitalsmiths
A TiVo Company

www.digitalsmiths.com<http://www.digitalsmiths.com><http://www.digitalsmiths.com><http://www.digitalsmiths.com>
tobecker@tivo.com<ma...@tivo.com>

________________________________

This email and any attachments may contain confidential and privileged
material for the sole use of the intended recipient. Any review, copying,
or distribution of this email (or any attachments) by others is prohibited.
If you are not the intended recipient, please contact the sender
immediately and permanently delete this email and any attachments. No
employee or agent of TiVo Inc. is authorized to conclude any binding
agreement on behalf of TiVo Inc. by email. Binding agreements with TiVo
Inc. may only be made by a signed written agreement.






--
Tommy Becker
Senior Software Engineer

Digitalsmiths
A TiVo Company

www.digitalsmiths.com<http://www.digitalsmiths.com>
tobecker@tivo.com<ma...@tivo.com>

________________________________

This email and any attachments may contain confidential and privileged material for the sole use of the intended recipient. Any review, copying, or distribution of this email (or any attachments) by others is prohibited. If you are not the intended recipient, please contact the sender immediately and permanently delete this email and any attachments. No employee or agent of TiVo Inc. is authorized to conclude any binding agreement on behalf of TiVo Inc. by email. Binding agreements with TiVo Inc. may only be made by a signed written agreement.

Re: Question on changelog partition mapping

Posted by Yi Pan <ni...@gmail.com>.
Hi, Tommy,

Yes. Now I understand what you referred to as "non-determinism". The design
of the JobCoordinator has the thought that if "no-previous run is found, we
are free to start from scratch" in mind. I think the current solution that
you can try  is to implement a grouper that will guarantee the order of
groups coming out of the group() method.

Does that make sense?

Thanks!

-Yi

On Fri, Aug 12, 2016 at 5:31 AM, Tommy Becker <to...@tivo.com> wrote:

> Hi Yi,
>
> Thanks for the response. We are running Samza 0.9.1, so we do not yet have
> the coordinator stream. But to answer your other questions, the number of
> task instances did not change. Specifically, none of the input topic, the
> number of partitions in that topic, nor the grouper algorithm changed. The
> non-determinism I am referring to can be seen here:
>
> https://github.com/apache/samza/blob/0.9.1/samza-core/src/
> main/scala/org/apache/samza/coordinator/JobCoordinator.scala#L141
>
> Since we lost the original mapping, there is no previousChangelogeMapping
> (sic) and the code creates a new mapping by simply assigning sequential
> partition numbers from 0 to the number of tasks. But the order in which
> these are assigned seems to be determined by the order of the TaskNames in
> the map returned by the SystemStreamPartitionGrouper (i.e. no meaningful
> order). So there is no guarantee that this code will produce the same
> changelog mapping each time it runs, even if the number of tasks is the
> same. Does that make sense?  The code has changed some since 0.9.1 but
> seems to have the same issue even in 0.10.1.
>
> -Tommy
>
> On 08/11/2016 06:12 PM, Yi Pan wrote:
>
> Hi, Tommy,
>
> Which version of Samza are you using? Since 0.10, the changelog partition
> mapping has been moved to the coordinator stream, not in the checkpoint
> topic any more.
>
> That said, I want to ask a few more questions to understand what you
> referred to as "non-deterministic" behavior. So, between the job restarts,
> did the total number of tasks change? As you have observed, the total
> number of partitions in a changelog topic is equivalent to the total number
> of tasks in a job. And the reasons for the total number of tasks to change
> include:
> - the input topic partition changed
> - the grouper algorithm changed
> In both cases, the states are no longer considered valid, since data may
> have been shuffled between the Kafka partitions, or between the tasks
> already.
>
> Could you clarify whether you saw the "non-determinism" w/ or w/o the total
> number of tasks changed?
>
> Thanks!
>
> -Yi
>
> On Thu, Aug 11, 2016 at 11:56 AM, Tommy Becker <to...@tivo.com><mailto:
> tobecker@tivo.com> wrote:
>
>
>
> We recently had an issue that caused us to lose the contents of one of our
> Samza job's checkpoint topics. We were not that concerned about losing the
> checkpointed offsets and so we restarted the job. We then started seeing
> some very strange results and were able to trace it back to the fact that
> changelog paritition mapping changed. We were unaware this data was stored
> in the checkpoint topic. Can someone explain why this mapping is necessary?
> I was under the impression that the number of changelog partitions is
> identical to the number of task instances. If this is so, can't partitions
> just be assigned based on the task number? Assuming the mapping is
> necessary, it would be nice if it was deterministic. Looking at
> JobCoordinator, it seems to be dependent on the order in which things come
> back in the map produced by the SystemStreamPartitionGrouper. This
> non-determinism seems to have been the cause of our issues. Obviously data
> loss is a problem, but it seems like Samza could have recreated the
> original mapping. Should I file a bug on this?
>
> --
> Tommy Becker
> Senior Software Engineer
>
> Digitalsmiths
> A TiVo Company
>
> www.digitalsmiths.com<http://www.digitalsmiths.com><http://w
> ww.digitalsmiths.com><http://www.digitalsmiths.com>
> tobecker@tivo.com<ma...@tivo.com><mailto:tobecker@tivo.com
> ><ma...@tivo.com>
>
>
> ________________________________
>
> This email and any attachments may contain confidential and privileged
> material for the sole use of the intended recipient. Any review, copying,
> or distribution of this email (or any attachments) by others is prohibited.
> If you are not the intended recipient, please contact the sender
> immediately and permanently delete this email and any attachments. No
> employee or agent of TiVo Inc. is authorized to conclude any binding
> agreement on behalf of TiVo Inc. by email. Binding agreements with TiVo
> Inc. may only be made by a signed written agreement.
>
>
>
>
>
>
> --
> Tommy Becker
> Senior Software Engineer
>
> Digitalsmiths
> A TiVo Company
>
> www.digitalsmiths.com<http://www.digitalsmiths.com>
> tobecker@tivo.com<ma...@tivo.com>
>
> ________________________________
>
> This email and any attachments may contain confidential and privileged
> material for the sole use of the intended recipient. Any review, copying,
> or distribution of this email (or any attachments) by others is prohibited.
> If you are not the intended recipient, please contact the sender
> immediately and permanently delete this email and any attachments. No
> employee or agent of TiVo Inc. is authorized to conclude any binding
> agreement on behalf of TiVo Inc. by email. Binding agreements with TiVo
> Inc. may only be made by a signed written agreement.
>

Re: Question on changelog partition mapping

Posted by Tommy Becker <to...@tivo.com>.
Hi Yi,

Thanks for the response. We are running Samza 0.9.1, so we do not yet have the coordinator stream. But to answer your other questions, the number of task instances did not change. Specifically, none of the input topic, the number of partitions in that topic, nor the grouper algorithm changed. The non-determinism I am referring to can be seen here:

https://github.com/apache/samza/blob/0.9.1/samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala#L141

Since we lost the original mapping, there is no previousChangelogeMapping (sic) and the code creates a new mapping by simply assigning sequential partition numbers from 0 to the number of tasks. But the order in which these are assigned seems to be determined by the order of the TaskNames in the map returned by the SystemStreamPartitionGrouper (i.e. no meaningful order). So there is no guarantee that this code will produce the same changelog mapping each time it runs, even if the number of tasks is the same. Does that make sense?  The code has changed some since 0.9.1 but seems to have the same issue even in 0.10.1.

-Tommy

On 08/11/2016 06:12 PM, Yi Pan wrote:

Hi, Tommy,

Which version of Samza are you using? Since 0.10, the changelog partition
mapping has been moved to the coordinator stream, not in the checkpoint
topic any more.

That said, I want to ask a few more questions to understand what you
referred to as "non-deterministic" behavior. So, between the job restarts,
did the total number of tasks change? As you have observed, the total
number of partitions in a changelog topic is equivalent to the total number
of tasks in a job. And the reasons for the total number of tasks to change
include:
- the input topic partition changed
- the grouper algorithm changed
In both cases, the states are no longer considered valid, since data may
have been shuffled between the Kafka partitions, or between the tasks
already.

Could you clarify whether you saw the "non-determinism" w/ or w/o the total
number of tasks changed?

Thanks!

-Yi

On Thu, Aug 11, 2016 at 11:56 AM, Tommy Becker <to...@tivo.com> wrote:



We recently had an issue that caused us to lose the contents of one of our
Samza job's checkpoint topics. We were not that concerned about losing the
checkpointed offsets and so we restarted the job. We then started seeing
some very strange results and were able to trace it back to the fact that
changelog paritition mapping changed. We were unaware this data was stored
in the checkpoint topic. Can someone explain why this mapping is necessary?
I was under the impression that the number of changelog partitions is
identical to the number of task instances. If this is so, can't partitions
just be assigned based on the task number? Assuming the mapping is
necessary, it would be nice if it was deterministic. Looking at
JobCoordinator, it seems to be dependent on the order in which things come
back in the map produced by the SystemStreamPartitionGrouper. This
non-determinism seems to have been the cause of our issues. Obviously data
loss is a problem, but it seems like Samza could have recreated the
original mapping. Should I file a bug on this?

--
Tommy Becker
Senior Software Engineer

Digitalsmiths
A TiVo Company

www.digitalsmiths.com<http://www.digitalsmiths.com><http://www.digitalsmiths.com><http://www.digitalsmiths.com>
tobecker@tivo.com<ma...@tivo.com>

________________________________

This email and any attachments may contain confidential and privileged
material for the sole use of the intended recipient. Any review, copying,
or distribution of this email (or any attachments) by others is prohibited.
If you are not the intended recipient, please contact the sender
immediately and permanently delete this email and any attachments. No
employee or agent of TiVo Inc. is authorized to conclude any binding
agreement on behalf of TiVo Inc. by email. Binding agreements with TiVo
Inc. may only be made by a signed written agreement.






--
Tommy Becker
Senior Software Engineer

Digitalsmiths
A TiVo Company

www.digitalsmiths.com<http://www.digitalsmiths.com>
tobecker@tivo.com<ma...@tivo.com>

________________________________

This email and any attachments may contain confidential and privileged material for the sole use of the intended recipient. Any review, copying, or distribution of this email (or any attachments) by others is prohibited. If you are not the intended recipient, please contact the sender immediately and permanently delete this email and any attachments. No employee or agent of TiVo Inc. is authorized to conclude any binding agreement on behalf of TiVo Inc. by email. Binding agreements with TiVo Inc. may only be made by a signed written agreement.

Re: Question on changelog partition mapping

Posted by Yi Pan <ni...@gmail.com>.
Hi, Tommy,

Which version of Samza are you using? Since 0.10, the changelog partition
mapping has been moved to the coordinator stream, not in the checkpoint
topic any more.

That said, I want to ask a few more questions to understand what you
referred to as "non-deterministic" behavior. So, between the job restarts,
did the total number of tasks change? As you have observed, the total
number of partitions in a changelog topic is equivalent to the total number
of tasks in a job. And the reasons for the total number of tasks to change
include:
- the input topic partition changed
- the grouper algorithm changed
In both cases, the states are no longer considered valid, since data may
have been shuffled between the Kafka partitions, or between the tasks
already.

Could you clarify whether you saw the "non-determinism" w/ or w/o the total
number of tasks changed?

Thanks!

-Yi

On Thu, Aug 11, 2016 at 11:56 AM, Tommy Becker <to...@tivo.com> wrote:

> We recently had an issue that caused us to lose the contents of one of our
> Samza job's checkpoint topics. We were not that concerned about losing the
> checkpointed offsets and so we restarted the job. We then started seeing
> some very strange results and were able to trace it back to the fact that
> changelog paritition mapping changed. We were unaware this data was stored
> in the checkpoint topic. Can someone explain why this mapping is necessary?
> I was under the impression that the number of changelog partitions is
> identical to the number of task instances. If this is so, can't partitions
> just be assigned based on the task number? Assuming the mapping is
> necessary, it would be nice if it was deterministic. Looking at
> JobCoordinator, it seems to be dependent on the order in which things come
> back in the map produced by the SystemStreamPartitionGrouper. This
> non-determinism seems to have been the cause of our issues. Obviously data
> loss is a problem, but it seems like Samza could have recreated the
> original mapping. Should I file a bug on this?
>
> --
> Tommy Becker
> Senior Software Engineer
>
> Digitalsmiths
> A TiVo Company
>
> www.digitalsmiths.com<http://www.digitalsmiths.com>
> tobecker@tivo.com<ma...@tivo.com>
>
> ________________________________
>
> This email and any attachments may contain confidential and privileged
> material for the sole use of the intended recipient. Any review, copying,
> or distribution of this email (or any attachments) by others is prohibited.
> If you are not the intended recipient, please contact the sender
> immediately and permanently delete this email and any attachments. No
> employee or agent of TiVo Inc. is authorized to conclude any binding
> agreement on behalf of TiVo Inc. by email. Binding agreements with TiVo
> Inc. may only be made by a signed written agreement.
>