You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by hudeqi <16...@bjtu.edu.cn> on 2023/06/27 15:36:48 UTC

KIP-943: Add independent "offset.storage.segment.bytes" for connect-distributed.properties

Hi, all. I want to submit a minor kip, and hope get some review and good suggestions. the kip is here: https://cwiki.apache.org/confluence/x/vhw0Dw

Motivation:

As the config 'segment.bytes' for internal topic related connect cluster(offset.storage.topic), if following the default configuration of the broker or set it larger, then when the connect cluster runs many and complicated tasks(for example, Involving a lot of topic and partition replication), especially the log volume of the topic 'offset.storage.topic' is very large, it will affect the restart speed of the connect workers. 

The reason is that a consumer thread needs to be started to read the data of ‘offset.storage.topic’ at startup. Although this topic is set to compact, if the 'segment size' is set to a large value, such as the default value of 1GB, then this topic may have tens of gigabytes of data(if following default 25 partitions) that cannot be compacted and has to be read from the earliest (because the active segment cannot be cleaned), which will consume a lot of time and caused the worker to be unable to start and execute tasks for a long time.

Proposed changes:

I want to extract the “segment.bytes” settings for “offset.storage.topic” separately, just like "offsets.topic.segment.bytes" and "transaction.state.log.segment.bytes", where the size is set by the user, and if there is no explicit setting, give the default value such as 50MB. In this way to avoid receiving interference from kafka broker configuration. As for "config.storage.topic" and "status.storage.topic", It is difficult to write a large amount of data in practical use, and it may not be necessary to take similar measures.

For new connect cluster, if “offset.storage.segment.bytes” is explicitly set in connect-distributed.properties, the size is set as KIP-605, It all depends on the users themselves. Otherwise, the default size (50MB) is taken. Compared to the previous behavior, this setting has nothing to do with the configuration of kafka broker, although it is possible that the configuration value of kafka broker is smaller and better than the default value 50MB.

For existed connect cluster, if “offset.storage.segment.bytes” is explicitly set in connect-distributed.properties, we will update topic config by admin client. Otherwise, the default size (50MB) is taken to add or update topic config by admin client.




Best,

hudeqi



Re: [DISCUSS] KIP-943: Add independent "offset.storage.segment.bytes" for connect-distributed.properties

Posted by Sagar <sa...@gmail.com>.
Hey Hudeqi,

Thanks for the KIP! After reading the KIP and the comments by Yash and Greg
I agree with these aspects:

1) While I agree that having a high value for segment.btes config can lead
to higher startup times, we don't necessarily need to expose a separate
config for it(as Yash suggested). We can use the same mechanism as exposed
by KIP-605 to set it to a lower value specifically for Connect. IMO this
could be an internal config as well defined via ConfigDef#defineInternal so
they don't need to show up in the docs as well. I haven't tested it but if
the users do happen to override the config via the KIP-605 mechanism, it
should update. So, the scope of the KIP could be reduced to having an
explicit internal config for offset's topic segment.bytes with a lower
default value. WDYT?

2) I don't think we should let the configs of existing topics be updated.
If you notice both KIP-605 and KIP-154 (the one which 605 cites) don't
allow updating the configs of existing topics. It would be a good idea to
stick around with this practice imo.

3) Regarding the default value of 50 MB, tbh I am not totally aware of how
the default values for these configs were chosen in the past. But as
pointed out by Greg, __consumer_offsets topic could be a good example to
follow and a default value of 100MB could be a good starting point. Or if
needed we can be defensive and start with a slightly higher value like
250MB. Also the point about tombstone records leading to inconsistent
in-memory states across multiple workers is a good one. This happens with
status topic as well IIRC and if possible we should look to fix it. That is
outside the scope of the KIP though.

Thanks!
Sagar.


On Fri, Jul 14, 2023 at 1:49 AM Greg Harris <gr...@aiven.io.invalid>
wrote:

> Hey hudeqi,
>
> Thanks for the KIP! I did not know about the existing segment.bytes
> default value, and it does seem rather high in the context of the
> Connect internal topics.
> If I think about the segment.size as a "minimum per-partition data
> transfer on startup", 1GB is certainly not appropriate for even the
> single-partition config topic.
>
> 1. I have a concern about changing the topic configuration on startup.
>
> In the existing codebase, the *.storage.* worker configurations appear
> to only have an effect for newly created topics. If the topics were
> manually created before a Connect cluster starts, or were created by a
> previous Connect instance, then the Connect worker configuration could
> have arbitrary contents that have no effect. Updating the topic
> configurations after creation would be a new capability.
> Consider the situation where someone were to notice this log.segment
> problem, where a natural response would be to reconfigure the topic,
> diverging from the two configurations. When the worker can change the
> topic configuration after creation, that has the potential to roll
> back topic configurations that are managed externally.
> Do you think that changing the default for new Connect clusters, and
> emitting a startup warning for excessive segment.bytes is reasonable?
> We have other startup assertions that fail the startup of a worker
> based on partition and compaction requirements, and this would be
> similar in that it alerts the user to reconfigure the internal topics,
> but with a lesser severity.
>
> 2. I'm also interested to know what a reasonable value for this
> configuration would be. I did find the __consumer_offsets topic uses
> 104857600 (100 MiB) as defined in OffsetConfig.scala, so there is
> precedent for having a smaller segment.size for internal topics.
>
> 3. I believe there's a potential bug where compaction can happen
> before a worker reads a tombstone, leading the KafkaBasedLog to
> produce inconsistent in-memory states across multiple workers. Since
> the segment.size is so large, it makes me think that compaction has
> been wholly ineffective so far, and has prevented this bug from
> manifesting. By lowering the segment.size, we're increasing the
> likelihood of this failure, so it may need to finally be addressed.
>
> Thanks,
> Greg
>
>
>
>
> On Thu, Jul 6, 2023 at 5:39 AM Yash Mayya <ya...@gmail.com> wrote:
> >
> > Also, just adding to the above point - we don't necessarily need to
> > explicitly add a new worker configuration right? Instead, we could
> > potentially just use the new proposed default value internally which can
> be
> > overridden by users through setting a value for
> > "offset.storage.segment.bytes" (via the existing KIP-605 based
> mechanism).
> >
> > On Thu, Jul 6, 2023 at 6:04 PM Yash Mayya <ya...@gmail.com> wrote:
> >
> > > Hi hudeqi,
> > >
> > > Thanks for the KIP! Just to clarify - since KIP-605 (
> > >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-605%3A+Expand+Connect+Worker+Internal+Topic+Settings
> )
> > > already allows configuring "segment.bytes" for the Connect cluster's
> > > offsets topic via a worker configuration
> ("offset.storage.segment.bytes",
> > > same as what is being proposed in this KIP), the primary motivation of
> this
> > > KIP is essentially to override the default value for that topic
> > > configuration to a smaller value and decouple it from the backing Kafka
> > > cluster's "log.segment.bytes" configuration? Also, I'm curious about
> how
> > > the new default value of 50 MB was chosen (if there were any
> experiments
> > > that were run etc.)?
> > >
> > > Thanks,
> > > Yash
> > >
> > > On Mon, Jul 3, 2023 at 6:08 PM hudeqi <16...@bjtu.edu.cn> wrote:
> > >
> > >> Is anyone following this KIP? Bump this thread.
> > >
> > >
>

Re: [DISCUSS] KIP-943: Add independent "offset.storage.segment.bytes" for connect-distributed.properties

Posted by Greg Harris <gr...@aiven.io.INVALID>.
Hey hudeqi,

Thanks for the KIP! I did not know about the existing segment.bytes
default value, and it does seem rather high in the context of the
Connect internal topics.
If I think about the segment.size as a "minimum per-partition data
transfer on startup", 1GB is certainly not appropriate for even the
single-partition config topic.

1. I have a concern about changing the topic configuration on startup.

In the existing codebase, the *.storage.* worker configurations appear
to only have an effect for newly created topics. If the topics were
manually created before a Connect cluster starts, or were created by a
previous Connect instance, then the Connect worker configuration could
have arbitrary contents that have no effect. Updating the topic
configurations after creation would be a new capability.
Consider the situation where someone were to notice this log.segment
problem, where a natural response would be to reconfigure the topic,
diverging from the two configurations. When the worker can change the
topic configuration after creation, that has the potential to roll
back topic configurations that are managed externally.
Do you think that changing the default for new Connect clusters, and
emitting a startup warning for excessive segment.bytes is reasonable?
We have other startup assertions that fail the startup of a worker
based on partition and compaction requirements, and this would be
similar in that it alerts the user to reconfigure the internal topics,
but with a lesser severity.

2. I'm also interested to know what a reasonable value for this
configuration would be. I did find the __consumer_offsets topic uses
104857600 (100 MiB) as defined in OffsetConfig.scala, so there is
precedent for having a smaller segment.size for internal topics.

3. I believe there's a potential bug where compaction can happen
before a worker reads a tombstone, leading the KafkaBasedLog to
produce inconsistent in-memory states across multiple workers. Since
the segment.size is so large, it makes me think that compaction has
been wholly ineffective so far, and has prevented this bug from
manifesting. By lowering the segment.size, we're increasing the
likelihood of this failure, so it may need to finally be addressed.

Thanks,
Greg




On Thu, Jul 6, 2023 at 5:39 AM Yash Mayya <ya...@gmail.com> wrote:
>
> Also, just adding to the above point - we don't necessarily need to
> explicitly add a new worker configuration right? Instead, we could
> potentially just use the new proposed default value internally which can be
> overridden by users through setting a value for
> "offset.storage.segment.bytes" (via the existing KIP-605 based mechanism).
>
> On Thu, Jul 6, 2023 at 6:04 PM Yash Mayya <ya...@gmail.com> wrote:
>
> > Hi hudeqi,
> >
> > Thanks for the KIP! Just to clarify - since KIP-605 (
> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-605%3A+Expand+Connect+Worker+Internal+Topic+Settings)
> > already allows configuring "segment.bytes" for the Connect cluster's
> > offsets topic via a worker configuration ("offset.storage.segment.bytes",
> > same as what is being proposed in this KIP), the primary motivation of this
> > KIP is essentially to override the default value for that topic
> > configuration to a smaller value and decouple it from the backing Kafka
> > cluster's "log.segment.bytes" configuration? Also, I'm curious about how
> > the new default value of 50 MB was chosen (if there were any experiments
> > that were run etc.)?
> >
> > Thanks,
> > Yash
> >
> > On Mon, Jul 3, 2023 at 6:08 PM hudeqi <16...@bjtu.edu.cn> wrote:
> >
> >> Is anyone following this KIP? Bump this thread.
> >
> >

Re: Re: [DISCUSS] KIP-943: Add independent "offset.storage.segment.bytes" for connect-distributed.properties

Posted by Sagar <sa...@gmail.com>.
Hey Hudeqi,

I took some time to read through the PR link as well where you and Chris
had an informative discussion.

I think even over there and in this discussion thread, it seems to me that
the consensus is to reduce the scope of the KIP to reduce the default value
of segment.bytes config for offsets topic. This will prevent future workers
from having a lesser boot up time. IMO while this might not seem like a
high impact thing, the configs that we are talking about here are advanced
ones which new users for Connect might not immediately look into. So, if
they end up in a situation where there's a 23-min worker startup time, then
it might not be an overall good experience for them.

Regarding the point Greg mentioned, we will have to think about getting
around it. The approach you suggested seems unclean to me. Since you have
been testing with this config in your cluster and you already have a large
offsets topic, in your experience have you noticed any discrepancies of the
in-memory states across workers in your cluster? Would it be possible for
you to test that? That might be a good starting point to understand how we
want to fix this. Ideally we should have some kind of a Point of view(or
even a potential fix) on this before we go about implementing this change.
WDYT?

Thanks!
Sagar.

On Mon, Aug 14, 2023 at 6:09 PM hudeqi <16...@bjtu.edu.cn> wrote:

> bump this discuss thread.
>
> best,
> hudeqi
>
> &quot;hudeqi&quot; &lt;16120374@bjtu.edu.cn&gt;写道:
> > Sorry for not getting email reminders and ignoring your reply for
> getting back so late, Yash Mayya, Greg Harris, Sagar.
> >
> > Thank you for your thoughts and suggestions, I learned a lot, I will
> give my thoughts and answers in a comprehensive way:
> > 1. The default configuration of 50MB is the online configuration I
> actually used to solve this problem, and the effect is better (see the
> description of jira:
> https://issues.apache.org/jira/projects/KAFKA/issues/KAFKA-15086?filter=allopenissues.
> In fact, I think it may be better to set this value smaller, so I abandoned
> the default value like __consumer_offsets, but I don't know how much the
> default value is the best.). Secondly, I also set the default value of 50MB
> online through ConfigDef#defineInternal, and if the value configured by the
> user is greater than the default value, the warning log will be displayed,
> but the only difference from your said is that I will overwrite the value
> configured by the user with the default value (emmm, this point was denied
> by Chris Egerton: https://github.com/apache/kafka/pull/13852, in fact,
> you all agree that should not directly override the user-configured value,
> and now I agree with this).
> > 2. I think the potential bug that Greg mentioned may lead to
> inconsistent state between workers is a great point. It is true that we
> cannot directly change the configuration for an existing internal topics.
> Perhaps a more tricky and disgusting approach is that we manually find that
> the active segment sizes of all current partitions are relatively small,
> first stop all connect instances, then change the topic configuration, and
> finally start the instances.
> >
> > To sum up, I think whether the scope of the KIP could be reduced to:
> only set the default value of the 'segment.bytes' of the internal topics
> and make a warning for the bigger value configured by the user. What do you
> think? If there's a better way I'm all ears.
> >
> > best,
> > hudeqi
>

Re: Re: [DISCUSS] KIP-943: Add independent "offset.storage.segment.bytes" for connect-distributed.properties

Posted by hudeqi <16...@bjtu.edu.cn>.
bump this discuss thread.

best,
hudeqi

&quot;hudeqi&quot; &lt;16120374@bjtu.edu.cn&gt;写道:
> Sorry for not getting email reminders and ignoring your reply for getting back so late, Yash Mayya, Greg Harris, Sagar.
> 
> Thank you for your thoughts and suggestions, I learned a lot, I will give my thoughts and answers in a comprehensive way:
> 1. The default configuration of 50MB is the online configuration I actually used to solve this problem, and the effect is better (see the description of jira:https://issues.apache.org/jira/projects/KAFKA/issues/KAFKA-15086?filter=allopenissues. In fact, I think it may be better to set this value smaller, so I abandoned the default value like __consumer_offsets, but I don't know how much the default value is the best.). Secondly, I also set the default value of 50MB online through ConfigDef#defineInternal, and if the value configured by the user is greater than the default value, the warning log will be displayed, but the only difference from your said is that I will overwrite the value configured by the user with the default value (emmm, this point was denied by Chris Egerton: https://github.com/apache/kafka/pull/13852, in fact, you all agree that should not directly override the user-configured value, and now I agree with this). 
> 2. I think the potential bug that Greg mentioned may lead to inconsistent state between workers is a great point. It is true that we cannot directly change the configuration for an existing internal topics. Perhaps a more tricky and disgusting approach is that we manually find that the active segment sizes of all current partitions are relatively small, first stop all connect instances, then change the topic configuration, and finally start the instances.
> 
> To sum up, I think whether the scope of the KIP could be reduced to: only set the default value of the 'segment.bytes' of the internal topics and make a warning for the bigger value configured by the user. What do you think? If there's a better way I'm all ears.
> 
> best,
> hudeqi

Re: [DISCUSS] KIP-943: Add independent "offset.storage.segment.bytes" for connect-distributed.properties

Posted by hudeqi <16...@bjtu.edu.cn>.
Sorry for not getting email reminders and ignoring your reply for getting back so late, Yash Mayya, Greg Harris, Sagar.

Thank you for your thoughts and suggestions, I learned a lot, I will give my thoughts and answers in a comprehensive way:
1. The default configuration of 50MB is the online configuration I actually used to solve this problem, and the effect is better (see the description of jira:https://issues.apache.org/jira/projects/KAFKA/issues/KAFKA-15086?filter=allopenissues. In fact, I think it may be better to set this value smaller, so I abandoned the default value like __consumer_offsets, but I don't know how much the default value is the best.). Secondly, I also set the default value of 50MB online through ConfigDef#defineInternal, and if the value configured by the user is greater than the default value, the warning log will be displayed, but the only difference from your said is that I will overwrite the value configured by the user with the default value (emmm, this point was denied by Chris Egerton: https://github.com/apache/kafka/pull/13852, in fact, you all agree that should not directly override the user-configured value, and now I agree with this). 
2. I think the potential bug that Greg mentioned may lead to inconsistent state between workers is a great point. It is true that we cannot directly change the configuration for an existing internal topics. Perhaps a more tricky and disgusting approach is that we manually find that the active segment sizes of all current partitions are relatively small, first stop all connect instances, then change the topic configuration, and finally start the instances.

To sum up, I think whether the scope of the KIP could be reduced to: only set the default value of the 'segment.bytes' of the internal topics and make a warning for the bigger value configured by the user. What do you think? If there's a better way I'm all ears.

best,
hudeqi

Re: [DISCUSS] KIP-943: Add independent "offset.storage.segment.bytes" for connect-distributed.properties

Posted by Yash Mayya <ya...@gmail.com>.
Also, just adding to the above point - we don't necessarily need to
explicitly add a new worker configuration right? Instead, we could
potentially just use the new proposed default value internally which can be
overridden by users through setting a value for
"offset.storage.segment.bytes" (via the existing KIP-605 based mechanism).

On Thu, Jul 6, 2023 at 6:04 PM Yash Mayya <ya...@gmail.com> wrote:

> Hi hudeqi,
>
> Thanks for the KIP! Just to clarify - since KIP-605 (
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-605%3A+Expand+Connect+Worker+Internal+Topic+Settings)
> already allows configuring "segment.bytes" for the Connect cluster's
> offsets topic via a worker configuration ("offset.storage.segment.bytes",
> same as what is being proposed in this KIP), the primary motivation of this
> KIP is essentially to override the default value for that topic
> configuration to a smaller value and decouple it from the backing Kafka
> cluster's "log.segment.bytes" configuration? Also, I'm curious about how
> the new default value of 50 MB was chosen (if there were any experiments
> that were run etc.)?
>
> Thanks,
> Yash
>
> On Mon, Jul 3, 2023 at 6:08 PM hudeqi <16...@bjtu.edu.cn> wrote:
>
>> Is anyone following this KIP? Bump this thread.
>
>

Re: [DISCUSS] KIP-943: Add independent "offset.storage.segment.bytes" for connect-distributed.properties

Posted by Yash Mayya <ya...@gmail.com>.
Hi hudeqi,

Thanks for the KIP! Just to clarify - since KIP-605 (
https://cwiki.apache.org/confluence/display/KAFKA/KIP-605%3A+Expand+Connect+Worker+Internal+Topic+Settings)
already allows configuring "segment.bytes" for the Connect cluster's
offsets topic via a worker configuration ("offset.storage.segment.bytes",
same as what is being proposed in this KIP), the primary motivation of this
KIP is essentially to override the default value for that topic
configuration to a smaller value and decouple it from the backing Kafka
cluster's "log.segment.bytes" configuration? Also, I'm curious about how
the new default value of 50 MB was chosen (if there were any experiments
that were run etc.)?

Thanks,
Yash

On Mon, Jul 3, 2023 at 6:08 PM hudeqi <16...@bjtu.edu.cn> wrote:

> Is anyone following this KIP? Bump this thread.

Re:[DISCUSS] KIP-943: Add independent "offset.storage.segment.bytes" for connect-distributed.properties

Posted by hudeqi <16...@bjtu.edu.cn>.
Is anyone following this KIP? Bump this thread.