You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by Ning Zhang <ni...@gmail.com> on 2020/09/03 04:51:13 UTC

Re: [DISCUSS] KIP-656: MirrorMaker2 Exactly-once Semantics

bump for another potential more discussion

On 2020/08/27 23:31:38, Ning Zhang <ni...@gmail.com> wrote: 
> Hello Mickael,
> 
> > 1. How does offset translation work with this new sink connector?
> > Should we also include a CheckpointSinkConnector?
> 
> CheckpointSourceConnector will be re-used as the same as current. When EOS is enabled, we will run 3 connectors:
> 
> MirrorSinkConnector (based on SinkConnector)
> MirrorCheckpointConnector (based on SourceConnector)
> MirrorHeartbeatConnector (based on SourceConnector)
> 
> For the last two connectors (checkpoint, heartbeat), if we do not strictly require EOS, it is probably OK to use current implementation on SourceConnector.
> 
> I will update the KIP to clarify this, if it sounds acceptable.
> 
> > 2. Migrating to this new connector could be tricky as effectively the
> > Connect runtime needs to point to the other cluster, so its state
> > (stored in the __connect topics) is lost. Unfortunately there is no
> > easy way today to prime Connect with offsets. Not necessarily a
> > blocking issue but this should be described as I think the current
> > Migration section looks really optimistic at the moment
> 
> totally agree. I will update the migration part with notes about potential service interruption, without careful planning.
> 
> > 3. We can probably find a better name than "transaction.producer".
> > Maybe we can follow a similar pattern than Streams (which uses
> > "processing.guarantee")?
> 
> "processing.guarantee" sounds better
> 
> > 4. Transactional Ids used by the producer are generated based on the
> > task assignments. If there's a single task, if it crashes and restarts
> > it would still get the same id. Can this be an issue?
> 
> From https://tgrez.github.io/posts/2019-04-13-kafka-transactions.html, the author suggests to postfix transaction.id with <topic, partition>:
> 
> "To avoid handling an external store we will use a static encoding similarly as in spring-kafka:
> The transactional.id is now the transactionIdPrefix appended with <group.id>.<topic>.<partition>."
> 
> I think as long as there is no more than one producer use same "transaction.id" at the same time, it is OK. 
> 
> Also from my tests, this "transaction.id" assignment works fine with failures. To tighten it up, I also tested to use  "connector task id" in "transaction.id". The "connector task id" is typically composed of connector_name and task_id, which is also unique across all connectors in a KC cluster.
> 
>  > 5. The logic in the KIP creates a new transaction every time put() is
> > called. Is there a performance impact?
> 
> It could be a performance hit if the transaction batch is too small under high ingestion rate. The batch size depends on how many messages that consumer poll each time. Maybe we could increase "max.poll.records" to have larger batch size.
> 
> Overall, thanks so much for the valuable feedback. If the responses sounds good, I will do a cleanup of KIP.
> 
> On 2020/08/27 09:59:57, Mickael Maison <mi...@gmail.com> wrote: 
> > Thanks Ning for the KIP. Having stronger guarantees when mirroring
> > data would be a nice improvement!
> > 
> > A few comments:
> > 1. How does offset translation work with this new sink connector?
> > Should we also include a CheckpointSinkConnector?
> > 
> > 2. Migrating to this new connector could be tricky as effectively the
> > Connect runtime needs to point to the other cluster, so its state
> > (stored in the __connect topics) is lost. Unfortunately there is no
> > easy way today to prime Connect with offsets. Not necessarily a
> > blocking issue but this should be described as I think the current
> > Migration section looks really optimistic at the moment
> > 
> > 3. We can probably find a better name than "transaction.producer".
> > Maybe we can follow a similar pattern than Streams (which uses
> > "processing.guarantee")?
> > 
> > 4. Transactional Ids used by the producer are generated based on the
> > task assignments. If there's a single task, if it crashes and restarts
> > it would still get the same id. Can this be an issue?
> > 
> > 5. The logic in the KIP creates a new transaction every time put() is
> > called. Is there a performance impact?
> > 
> > On Fri, Aug 21, 2020 at 4:58 PM Ryanne Dolan <ry...@gmail.com> wrote:
> > >
> > > Awesome, this will be a huge advancement. I also want to point out that
> > > this KIP implements MirrorSinkConnector as well, finally, which is a very
> > > often requested missing feature in my experience.
> > >
> > > Ryanne
> > >
> > > On Fri, Aug 21, 2020, 9:45 AM Ning Zhang <ni...@gmail.com> wrote:
> > >
> > > > Hello, I wrote a KIP about MirrorMaker2 Exactly-once Semantics (EOS)
> > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-656%3A+MirrorMaker2+Exactly-once+Semantics
> > > > At the high-level, it resembles the idea of how HDFS Sink Connector
> > > > achieves EOS across clusters by managing and storing the consumer offsets
> > > > in an external persistent storage, but also leverages the current Kafka EOS
> > > > guarantee within a single cluster. I have done some experiments especially
> > > > for the failure cases and I am very appreciated for comments and feedback
> > > > on this KIP from bigger audience.
> > > >
> > 
> 

Re: [DISCUSS] KIP-656: MirrorMaker2 Exactly-once Semantics

Posted by Ning Zhang <ni...@gmail.com>.
Bump up for another round of discussion.

To follow up last question raised about running Source (MirrorCheckpoint) Connector and Sink (MirrorSinkConnector) Connector simultaneously in MM2. From my testing, they all are functioning well and there seems no significant degradation or complexity of mixing Source and Sink Connector.

If there are some reasons that we have to "fork" (e.g. duplicate most codebase) MirrorCheckpoint as Sink Connector, I am happy to look into the necessity in another round.

Thanks

On 2020/09/03 15:06:00, Mickael Maison <mi...@gmail.com> wrote: 
> Hi Ning,
> 
> Thanks for the updates.
> 
> 1. If you have to run a Sink (the new MirrorSinkConnector) and Source
> (MirrorCheckpoint) connector for MM2 you will need 2 Connect runtimes.
> So this does not work well for users of Connect. I've not really
> looked into it yet but I wonder if we should include a Sink connector
> for checkpoints too
> 
> On Thu, Sep 3, 2020 at 6:51 AM Ning Zhang <ni...@gmail.com> wrote:
> >
> > bump for another potential more discussion
> >
> > On 2020/08/27 23:31:38, Ning Zhang <ni...@gmail.com> wrote:
> > > Hello Mickael,
> > >
> > > > 1. How does offset translation work with this new sink connector?
> > > > Should we also include a CheckpointSinkConnector?
> > >
> > > CheckpointSourceConnector will be re-used as the same as current. When EOS is enabled, we will run 3 connectors:
> > >
> > > MirrorSinkConnector (based on SinkConnector)
> > > MirrorCheckpointConnector (based on SourceConnector)
> > > MirrorHeartbeatConnector (based on SourceConnector)
> > >
> > > For the last two connectors (checkpoint, heartbeat), if we do not strictly require EOS, it is probably OK to use current implementation on SourceConnector.
> > >
> > > I will update the KIP to clarify this, if it sounds acceptable.
> > >
> > > > 2. Migrating to this new connector could be tricky as effectively the
> > > > Connect runtime needs to point to the other cluster, so its state
> > > > (stored in the __connect topics) is lost. Unfortunately there is no
> > > > easy way today to prime Connect with offsets. Not necessarily a
> > > > blocking issue but this should be described as I think the current
> > > > Migration section looks really optimistic at the moment
> > >
> > > totally agree. I will update the migration part with notes about potential service interruption, without careful planning.
> > >
> > > > 3. We can probably find a better name than "transaction.producer".
> > > > Maybe we can follow a similar pattern than Streams (which uses
> > > > "processing.guarantee")?
> > >
> > > "processing.guarantee" sounds better
> > >
> > > > 4. Transactional Ids used by the producer are generated based on the
> > > > task assignments. If there's a single task, if it crashes and restarts
> > > > it would still get the same id. Can this be an issue?
> > >
> > > From https://tgrez.github.io/posts/2019-04-13-kafka-transactions.html, the author suggests to postfix transaction.id with <topic, partition>:
> > >
> > > "To avoid handling an external store we will use a static encoding similarly as in spring-kafka:
> > > The transactional.id is now the transactionIdPrefix appended with <group.id>.<topic>.<partition>."
> > >
> > > I think as long as there is no more than one producer use same "transaction.id" at the same time, it is OK.
> > >
> > > Also from my tests, this "transaction.id" assignment works fine with failures. To tighten it up, I also tested to use  "connector task id" in "transaction.id". The "connector task id" is typically composed of connector_name and task_id, which is also unique across all connectors in a KC cluster.
> > >
> > >  > 5. The logic in the KIP creates a new transaction every time put() is
> > > > called. Is there a performance impact?
> > >
> > > It could be a performance hit if the transaction batch is too small under high ingestion rate. The batch size depends on how many messages that consumer poll each time. Maybe we could increase "max.poll.records" to have larger batch size.
> > >
> > > Overall, thanks so much for the valuable feedback. If the responses sounds good, I will do a cleanup of KIP.
> > >
> > > On 2020/08/27 09:59:57, Mickael Maison <mi...@gmail.com> wrote:
> > > > Thanks Ning for the KIP. Having stronger guarantees when mirroring
> > > > data would be a nice improvement!
> > > >
> > > > A few comments:
> > > > 1. How does offset translation work with this new sink connector?
> > > > Should we also include a CheckpointSinkConnector?
> > > >
> > > > 2. Migrating to this new connector could be tricky as effectively the
> > > > Connect runtime needs to point to the other cluster, so its state
> > > > (stored in the __connect topics) is lost. Unfortunately there is no
> > > > easy way today to prime Connect with offsets. Not necessarily a
> > > > blocking issue but this should be described as I think the current
> > > > Migration section looks really optimistic at the moment
> > > >
> > > > 3. We can probably find a better name than "transaction.producer".
> > > > Maybe we can follow a similar pattern than Streams (which uses
> > > > "processing.guarantee")?
> > > >
> > > > 4. Transactional Ids used by the producer are generated based on the
> > > > task assignments. If there's a single task, if it crashes and restarts
> > > > it would still get the same id. Can this be an issue?
> > > >
> > > > 5. The logic in the KIP creates a new transaction every time put() is
> > > > called. Is there a performance impact?
> > > >
> > > > On Fri, Aug 21, 2020 at 4:58 PM Ryanne Dolan <ry...@gmail.com> wrote:
> > > > >
> > > > > Awesome, this will be a huge advancement. I also want to point out that
> > > > > this KIP implements MirrorSinkConnector as well, finally, which is a very
> > > > > often requested missing feature in my experience.
> > > > >
> > > > > Ryanne
> > > > >
> > > > > On Fri, Aug 21, 2020, 9:45 AM Ning Zhang <ni...@gmail.com> wrote:
> > > > >
> > > > > > Hello, I wrote a KIP about MirrorMaker2 Exactly-once Semantics (EOS)
> > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-656%3A+MirrorMaker2+Exactly-once+Semantics
> > > > > > At the high-level, it resembles the idea of how HDFS Sink Connector
> > > > > > achieves EOS across clusters by managing and storing the consumer offsets
> > > > > > in an external persistent storage, but also leverages the current Kafka EOS
> > > > > > guarantee within a single cluster. I have done some experiments especially
> > > > > > for the failure cases and I am very appreciated for comments and feedback
> > > > > > on this KIP from bigger audience.
> > > > > >
> > > >
> > >
> 

Re: [DISCUSS] KIP-656: MirrorMaker2 Exactly-once Semantics

Posted by Ning Zhang <ni...@gmail.com>.
Hi Mickael,

Definitely we can include Sink connector for checkpoints and even heartbeats, but I am thinking if the existing 3 connectors, MirrorSourceConnector, MirrorCheckpointConnector and MirrorHeartbeatConnector are managed separately, so that we could reduce the footprint of introducing EOS, while maintaining the correctness of Checkpoint and Heartbeat, given that the checkpoint and heartbeat are very lightweight connectors in terms of traffic load and logics. Maybe Ryanne may chime in and share what is his thoughts. 

For any reason, if we need to include Sink connector for checkpoints and heartbeats, to make this KIP approved, I plan to create a PR to extract the common functions of data mirroring task, Checkpoint and Heartbeat from existing 

- MirrorSourceConnector.java
- MirrorSourceTask.java, 
- MirrorCheckpointConnector.java
- MirrorCheckpointTask.java
- MirrorHeartbeatConnector.java
- MirrorHeartbeatTask.java

into some "common" files, so that the common functions could re-used by MirrorSinkConnecotor, MirrorSinkTask, MirrorSinkand etc (based on Sink Connector and Sink Task)

- MirrorCheckpointConnCommon.java
- MirrorCheckpointTaskCommon.java
- MirrorConnectorCommon.java
- MirrorTaskCommon.java
- MirrorHeartbeatConnCommon.java
- MirrorHeartbeatTaskCommon.java

Thoughts?

On 2020/09/03 15:06:00, Mickael Maison <mi...@gmail.com> wrote: 
> Hi Ning,
> 
> Thanks for the updates.
> 
> 1. If you have to run a Sink (the new MirrorSinkConnector) and Source
> (MirrorCheckpoint) connector for MM2 you will need 2 Connect runtimes.
> So this does not work well for users of Connect. I've not really
> looked into it yet but I wonder if we should include a Sink connector
> for checkpoints too
> 
> On Thu, Sep 3, 2020 at 6:51 AM Ning Zhang <ni...@gmail.com> wrote:
> >
> > bump for another potential more discussion
> >
> > On 2020/08/27 23:31:38, Ning Zhang <ni...@gmail.com> wrote:
> > > Hello Mickael,
> > >
> > > > 1. How does offset translation work with this new sink connector?
> > > > Should we also include a CheckpointSinkConnector?
> > >
> > > CheckpointSourceConnector will be re-used as the same as current. When EOS is enabled, we will run 3 connectors:
> > >
> > > MirrorSinkConnector (based on SinkConnector)
> > > MirrorCheckpointConnector (based on SourceConnector)
> > > MirrorHeartbeatConnector (based on SourceConnector)
> > >
> > > For the last two connectors (checkpoint, heartbeat), if we do not strictly require EOS, it is probably OK to use current implementation on SourceConnector.
> > >
> > > I will update the KIP to clarify this, if it sounds acceptable.
> > >
> > > > 2. Migrating to this new connector could be tricky as effectively the
> > > > Connect runtime needs to point to the other cluster, so its state
> > > > (stored in the __connect topics) is lost. Unfortunately there is no
> > > > easy way today to prime Connect with offsets. Not necessarily a
> > > > blocking issue but this should be described as I think the current
> > > > Migration section looks really optimistic at the moment
> > >
> > > totally agree. I will update the migration part with notes about potential service interruption, without careful planning.
> > >
> > > > 3. We can probably find a better name than "transaction.producer".
> > > > Maybe we can follow a similar pattern than Streams (which uses
> > > > "processing.guarantee")?
> > >
> > > "processing.guarantee" sounds better
> > >
> > > > 4. Transactional Ids used by the producer are generated based on the
> > > > task assignments. If there's a single task, if it crashes and restarts
> > > > it would still get the same id. Can this be an issue?
> > >
> > > From https://tgrez.github.io/posts/2019-04-13-kafka-transactions.html, the author suggests to postfix transaction.id with <topic, partition>:
> > >
> > > "To avoid handling an external store we will use a static encoding similarly as in spring-kafka:
> > > The transactional.id is now the transactionIdPrefix appended with <group.id>.<topic>.<partition>."
> > >
> > > I think as long as there is no more than one producer use same "transaction.id" at the same time, it is OK.
> > >
> > > Also from my tests, this "transaction.id" assignment works fine with failures. To tighten it up, I also tested to use  "connector task id" in "transaction.id". The "connector task id" is typically composed of connector_name and task_id, which is also unique across all connectors in a KC cluster.
> > >
> > >  > 5. The logic in the KIP creates a new transaction every time put() is
> > > > called. Is there a performance impact?
> > >
> > > It could be a performance hit if the transaction batch is too small under high ingestion rate. The batch size depends on how many messages that consumer poll each time. Maybe we could increase "max.poll.records" to have larger batch size.
> > >
> > > Overall, thanks so much for the valuable feedback. If the responses sounds good, I will do a cleanup of KIP.
> > >
> > > On 2020/08/27 09:59:57, Mickael Maison <mi...@gmail.com> wrote:
> > > > Thanks Ning for the KIP. Having stronger guarantees when mirroring
> > > > data would be a nice improvement!
> > > >
> > > > A few comments:
> > > > 1. How does offset translation work with this new sink connector?
> > > > Should we also include a CheckpointSinkConnector?
> > > >
> > > > 2. Migrating to this new connector could be tricky as effectively the
> > > > Connect runtime needs to point to the other cluster, so its state
> > > > (stored in the __connect topics) is lost. Unfortunately there is no
> > > > easy way today to prime Connect with offsets. Not necessarily a
> > > > blocking issue but this should be described as I think the current
> > > > Migration section looks really optimistic at the moment
> > > >
> > > > 3. We can probably find a better name than "transaction.producer".
> > > > Maybe we can follow a similar pattern than Streams (which uses
> > > > "processing.guarantee")?
> > > >
> > > > 4. Transactional Ids used by the producer are generated based on the
> > > > task assignments. If there's a single task, if it crashes and restarts
> > > > it would still get the same id. Can this be an issue?
> > > >
> > > > 5. The logic in the KIP creates a new transaction every time put() is
> > > > called. Is there a performance impact?
> > > >
> > > > On Fri, Aug 21, 2020 at 4:58 PM Ryanne Dolan <ry...@gmail.com> wrote:
> > > > >
> > > > > Awesome, this will be a huge advancement. I also want to point out that
> > > > > this KIP implements MirrorSinkConnector as well, finally, which is a very
> > > > > often requested missing feature in my experience.
> > > > >
> > > > > Ryanne
> > > > >
> > > > > On Fri, Aug 21, 2020, 9:45 AM Ning Zhang <ni...@gmail.com> wrote:
> > > > >
> > > > > > Hello, I wrote a KIP about MirrorMaker2 Exactly-once Semantics (EOS)
> > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-656%3A+MirrorMaker2+Exactly-once+Semantics
> > > > > > At the high-level, it resembles the idea of how HDFS Sink Connector
> > > > > > achieves EOS across clusters by managing and storing the consumer offsets
> > > > > > in an external persistent storage, but also leverages the current Kafka EOS
> > > > > > guarantee within a single cluster. I have done some experiments especially
> > > > > > for the failure cases and I am very appreciated for comments and feedback
> > > > > > on this KIP from bigger audience.
> > > > > >
> > > >
> > >
> 

Re: [DISCUSS] KIP-656: MirrorMaker2 Exactly-once Semantics

Posted by Mickael Maison <mi...@gmail.com>.
Hi Ning,

Thanks for the updates.

1. If you have to run a Sink (the new MirrorSinkConnector) and Source
(MirrorCheckpoint) connector for MM2 you will need 2 Connect runtimes.
So this does not work well for users of Connect. I've not really
looked into it yet but I wonder if we should include a Sink connector
for checkpoints too

On Thu, Sep 3, 2020 at 6:51 AM Ning Zhang <ni...@gmail.com> wrote:
>
> bump for another potential more discussion
>
> On 2020/08/27 23:31:38, Ning Zhang <ni...@gmail.com> wrote:
> > Hello Mickael,
> >
> > > 1. How does offset translation work with this new sink connector?
> > > Should we also include a CheckpointSinkConnector?
> >
> > CheckpointSourceConnector will be re-used as the same as current. When EOS is enabled, we will run 3 connectors:
> >
> > MirrorSinkConnector (based on SinkConnector)
> > MirrorCheckpointConnector (based on SourceConnector)
> > MirrorHeartbeatConnector (based on SourceConnector)
> >
> > For the last two connectors (checkpoint, heartbeat), if we do not strictly require EOS, it is probably OK to use current implementation on SourceConnector.
> >
> > I will update the KIP to clarify this, if it sounds acceptable.
> >
> > > 2. Migrating to this new connector could be tricky as effectively the
> > > Connect runtime needs to point to the other cluster, so its state
> > > (stored in the __connect topics) is lost. Unfortunately there is no
> > > easy way today to prime Connect with offsets. Not necessarily a
> > > blocking issue but this should be described as I think the current
> > > Migration section looks really optimistic at the moment
> >
> > totally agree. I will update the migration part with notes about potential service interruption, without careful planning.
> >
> > > 3. We can probably find a better name than "transaction.producer".
> > > Maybe we can follow a similar pattern than Streams (which uses
> > > "processing.guarantee")?
> >
> > "processing.guarantee" sounds better
> >
> > > 4. Transactional Ids used by the producer are generated based on the
> > > task assignments. If there's a single task, if it crashes and restarts
> > > it would still get the same id. Can this be an issue?
> >
> > From https://tgrez.github.io/posts/2019-04-13-kafka-transactions.html, the author suggests to postfix transaction.id with <topic, partition>:
> >
> > "To avoid handling an external store we will use a static encoding similarly as in spring-kafka:
> > The transactional.id is now the transactionIdPrefix appended with <group.id>.<topic>.<partition>."
> >
> > I think as long as there is no more than one producer use same "transaction.id" at the same time, it is OK.
> >
> > Also from my tests, this "transaction.id" assignment works fine with failures. To tighten it up, I also tested to use  "connector task id" in "transaction.id". The "connector task id" is typically composed of connector_name and task_id, which is also unique across all connectors in a KC cluster.
> >
> >  > 5. The logic in the KIP creates a new transaction every time put() is
> > > called. Is there a performance impact?
> >
> > It could be a performance hit if the transaction batch is too small under high ingestion rate. The batch size depends on how many messages that consumer poll each time. Maybe we could increase "max.poll.records" to have larger batch size.
> >
> > Overall, thanks so much for the valuable feedback. If the responses sounds good, I will do a cleanup of KIP.
> >
> > On 2020/08/27 09:59:57, Mickael Maison <mi...@gmail.com> wrote:
> > > Thanks Ning for the KIP. Having stronger guarantees when mirroring
> > > data would be a nice improvement!
> > >
> > > A few comments:
> > > 1. How does offset translation work with this new sink connector?
> > > Should we also include a CheckpointSinkConnector?
> > >
> > > 2. Migrating to this new connector could be tricky as effectively the
> > > Connect runtime needs to point to the other cluster, so its state
> > > (stored in the __connect topics) is lost. Unfortunately there is no
> > > easy way today to prime Connect with offsets. Not necessarily a
> > > blocking issue but this should be described as I think the current
> > > Migration section looks really optimistic at the moment
> > >
> > > 3. We can probably find a better name than "transaction.producer".
> > > Maybe we can follow a similar pattern than Streams (which uses
> > > "processing.guarantee")?
> > >
> > > 4. Transactional Ids used by the producer are generated based on the
> > > task assignments. If there's a single task, if it crashes and restarts
> > > it would still get the same id. Can this be an issue?
> > >
> > > 5. The logic in the KIP creates a new transaction every time put() is
> > > called. Is there a performance impact?
> > >
> > > On Fri, Aug 21, 2020 at 4:58 PM Ryanne Dolan <ry...@gmail.com> wrote:
> > > >
> > > > Awesome, this will be a huge advancement. I also want to point out that
> > > > this KIP implements MirrorSinkConnector as well, finally, which is a very
> > > > often requested missing feature in my experience.
> > > >
> > > > Ryanne
> > > >
> > > > On Fri, Aug 21, 2020, 9:45 AM Ning Zhang <ni...@gmail.com> wrote:
> > > >
> > > > > Hello, I wrote a KIP about MirrorMaker2 Exactly-once Semantics (EOS)
> > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-656%3A+MirrorMaker2+Exactly-once+Semantics
> > > > > At the high-level, it resembles the idea of how HDFS Sink Connector
> > > > > achieves EOS across clusters by managing and storing the consumer offsets
> > > > > in an external persistent storage, but also leverages the current Kafka EOS
> > > > > guarantee within a single cluster. I have done some experiments especially
> > > > > for the failure cases and I am very appreciated for comments and feedback
> > > > > on this KIP from bigger audience.
> > > > >
> > >
> >