You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by Dong Lin <li...@gmail.com> on 2014/08/06 06:25:48 UTC

Re: Review Request 23568: Patch for KAFKA-1523

-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/23568/
-----------------------------------------------------------

(Updated Aug. 6, 2014, 4:25 a.m.)


Review request for kafka.


Bugs: KAFKA-1523
    https://issues.apache.org/jira/browse/KAFKA-1523


Repository: kafka


Description (updated)
-------

KAFKA-1523 Transaction manager and its failover handling.


Diffs (updated)
-----

  core/src/main/scala/kafka/admin/TopicCommand.scala 003a09c6160618bc94858ebc0d806b2aa4158e0a 
  core/src/main/scala/kafka/cluster/Partition.scala 134aef9c88068443d4d465189f376dd78605b4f8 
  core/src/main/scala/kafka/common/Topic.scala ad759786d1c22f67c47808c0b8f227eb2b1a9aa8 
  core/src/main/scala/kafka/controller/ControllerChannelManager.scala ecbfa0f328ba6a652a758ab20cacef324a8b2fb8 
  core/src/main/scala/kafka/message/Message.scala d2a7293c7be4022af30884330924791340acc5c1 
  core/src/main/scala/kafka/server/KafkaApis.scala fd5f12ee31e78cdcda8e24a0ab3e1740b3928884 
  core/src/main/scala/kafka/server/KafkaConfig.scala 1a45f8716ccc0398cf9395d91d66199d16882aae 
  core/src/main/scala/kafka/server/KafkaServer.scala 28711182aaa70eaa623de858bc063cb2613b2a4d 
  core/src/main/scala/kafka/server/ReplicaManager.scala 897783cb756de548a8b634876f729b63ffe9925e 
  core/src/main/scala/kafka/server/RequestPurgatory.scala 3d0ff1e2dbd6a5c3587cffa283db70415af83a7b 
  core/src/main/scala/kafka/server/TransactionManager.scala PRE-CREATION 
  core/src/main/scala/kafka/utils/ZkUtils.scala dcdc1ce2b02c996294e19cf480736106aaf29511 

Diff: https://reviews.apache.org/r/23568/diff/


Testing
-------


Thanks,

Dong Lin


Re: Review Request 23568: Patch for KAFKA-1523

Posted by Dong Lin <li...@gmail.com>.

> On Aug. 6, 2014, 5:32 a.m., Timothy Chen wrote:
> > core/src/main/scala/kafka/server/TransactionManager.scala, line 300
> > <https://reviews.apache.org/r/23568/diff/6/?file=653418#file653418line300>
> >
> >     I'm just trying to think if there is a chance to have overlapping tx ids especially around failover. I'm not familiar with the tx changes so just wondering if the range size can cause issues. Seems like you're incrementing the zk version on each id fetch and allow a range of the batch size of them. I was thinking if I sent 999 messages and then failed over, and the new broker that took over has a small batch size now, even though the version has incremented the new range now overlaps then right? It might already been handled but just want to throw it out there.

Sure. Thanks for taking time to help think about it.

Here I assume the batch size should be the same across producers regardless of when they start. If a producer sends 999 messages, say with txid in range 1000 to 1999, and fails over, the next producer will get the txid in range 2000 to 2999. The range is guaranteed to be disjoint from ranges used by previous/existing producers. Does this make sense?


- Dong


-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/23568/#review49696
-----------------------------------------------------------


On Aug. 6, 2014, 4:25 a.m., Dong Lin wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/23568/
> -----------------------------------------------------------
> 
> (Updated Aug. 6, 2014, 4:25 a.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1523
>     https://issues.apache.org/jira/browse/KAFKA-1523
> 
> 
> Repository: kafka
> 
> 
> Description
> -------
> 
> KAFKA-1523 Transaction manager and its failover handling.
> 
> 
> Diffs
> -----
> 
>   core/src/main/scala/kafka/admin/TopicCommand.scala 003a09c6160618bc94858ebc0d806b2aa4158e0a 
>   core/src/main/scala/kafka/cluster/Partition.scala 134aef9c88068443d4d465189f376dd78605b4f8 
>   core/src/main/scala/kafka/common/Topic.scala ad759786d1c22f67c47808c0b8f227eb2b1a9aa8 
>   core/src/main/scala/kafka/controller/ControllerChannelManager.scala ecbfa0f328ba6a652a758ab20cacef324a8b2fb8 
>   core/src/main/scala/kafka/message/Message.scala d2a7293c7be4022af30884330924791340acc5c1 
>   core/src/main/scala/kafka/server/KafkaApis.scala fd5f12ee31e78cdcda8e24a0ab3e1740b3928884 
>   core/src/main/scala/kafka/server/KafkaConfig.scala 1a45f8716ccc0398cf9395d91d66199d16882aae 
>   core/src/main/scala/kafka/server/KafkaServer.scala 28711182aaa70eaa623de858bc063cb2613b2a4d 
>   core/src/main/scala/kafka/server/ReplicaManager.scala 897783cb756de548a8b634876f729b63ffe9925e 
>   core/src/main/scala/kafka/server/RequestPurgatory.scala 3d0ff1e2dbd6a5c3587cffa283db70415af83a7b 
>   core/src/main/scala/kafka/server/TransactionManager.scala PRE-CREATION 
>   core/src/main/scala/kafka/utils/ZkUtils.scala dcdc1ce2b02c996294e19cf480736106aaf29511 
> 
> Diff: https://reviews.apache.org/r/23568/diff/
> 
> 
> Testing
> -------
> 
> 
> Thanks,
> 
> Dong Lin
> 
>


Re: Review Request 23568: Patch for KAFKA-1523

Posted by Dong Lin <li...@gmail.com>.

> On Aug. 6, 2014, 5:32 a.m., Timothy Chen wrote:
> > core/src/main/scala/kafka/server/TransactionManager.scala, line 300
> > <https://reviews.apache.org/r/23568/diff/6/?file=653418#file653418line300>
> >
> >     I'm just trying to think if there is a chance to have overlapping tx ids especially around failover. I'm not familiar with the tx changes so just wondering if the range size can cause issues. Seems like you're incrementing the zk version on each id fetch and allow a range of the batch size of them. I was thinking if I sent 999 messages and then failed over, and the new broker that took over has a small batch size now, even though the version has incremented the new range now overlaps then right? It might already been handled but just want to throw it out there.
> 
> Dong Lin wrote:
>     Sure. Thanks for taking time to help think about it.
>     
>     Here I assume the batch size should be the same across producers regardless of when they start. If a producer sends 999 messages, say with txid in range 1000 to 1999, and fails over, the next producer will get the txid in range 2000 to 2999. The range is guaranteed to be disjoint from ranges used by previous/existing producers. Does this make sense?

Correction: I actually mean broker where I say producer in my response.


- Dong


-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/23568/#review49696
-----------------------------------------------------------


On Aug. 6, 2014, 4:25 a.m., Dong Lin wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/23568/
> -----------------------------------------------------------
> 
> (Updated Aug. 6, 2014, 4:25 a.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1523
>     https://issues.apache.org/jira/browse/KAFKA-1523
> 
> 
> Repository: kafka
> 
> 
> Description
> -------
> 
> KAFKA-1523 Transaction manager and its failover handling.
> 
> 
> Diffs
> -----
> 
>   core/src/main/scala/kafka/admin/TopicCommand.scala 003a09c6160618bc94858ebc0d806b2aa4158e0a 
>   core/src/main/scala/kafka/cluster/Partition.scala 134aef9c88068443d4d465189f376dd78605b4f8 
>   core/src/main/scala/kafka/common/Topic.scala ad759786d1c22f67c47808c0b8f227eb2b1a9aa8 
>   core/src/main/scala/kafka/controller/ControllerChannelManager.scala ecbfa0f328ba6a652a758ab20cacef324a8b2fb8 
>   core/src/main/scala/kafka/message/Message.scala d2a7293c7be4022af30884330924791340acc5c1 
>   core/src/main/scala/kafka/server/KafkaApis.scala fd5f12ee31e78cdcda8e24a0ab3e1740b3928884 
>   core/src/main/scala/kafka/server/KafkaConfig.scala 1a45f8716ccc0398cf9395d91d66199d16882aae 
>   core/src/main/scala/kafka/server/KafkaServer.scala 28711182aaa70eaa623de858bc063cb2613b2a4d 
>   core/src/main/scala/kafka/server/ReplicaManager.scala 897783cb756de548a8b634876f729b63ffe9925e 
>   core/src/main/scala/kafka/server/RequestPurgatory.scala 3d0ff1e2dbd6a5c3587cffa283db70415af83a7b 
>   core/src/main/scala/kafka/server/TransactionManager.scala PRE-CREATION 
>   core/src/main/scala/kafka/utils/ZkUtils.scala dcdc1ce2b02c996294e19cf480736106aaf29511 
> 
> Diff: https://reviews.apache.org/r/23568/diff/
> 
> 
> Testing
> -------
> 
> 
> Thanks,
> 
> Dong Lin
> 
>


Re: Review Request 23568: Patch for KAFKA-1523

Posted by Timothy Chen <tn...@apache.org>.

> On Aug. 6, 2014, 5:32 a.m., Timothy Chen wrote:
> > core/src/main/scala/kafka/server/TransactionManager.scala, line 300
> > <https://reviews.apache.org/r/23568/diff/6/?file=653418#file653418line300>
> >
> >     I'm just trying to think if there is a chance to have overlapping tx ids especially around failover. I'm not familiar with the tx changes so just wondering if the range size can cause issues. Seems like you're incrementing the zk version on each id fetch and allow a range of the batch size of them. I was thinking if I sent 999 messages and then failed over, and the new broker that took over has a small batch size now, even though the version has incremented the new range now overlaps then right? It might already been handled but just want to throw it out there.
> 
> Dong Lin wrote:
>     Sure. Thanks for taking time to help think about it.
>     
>     Here I assume the batch size should be the same across producers regardless of when they start. If a producer sends 999 messages, say with txid in range 1000 to 1999, and fails over, the next producer will get the txid in range 2000 to 2999. The range is guaranteed to be disjoint from ranges used by previous/existing producers. Does this make sense?
> 
> Dong Lin wrote:
>     Correction: I actually mean broker where I say producer in my response.
> 
> Timothy Chen wrote:
>     The next producer will do so by looking at the default batch size with the version, which is what I was thinking there really isn't anything guarding aganist the range being changed from the code.
>     The safest thing to do is perhaps write the range into zk node data and when the current range is smaller have a big warning.
>     But minimally we will want to leave a comment in the code on the batch size to have this warning so this isn't something to experiment with as it can mess up existing transactions.
> 
> Dong Lin wrote:
>     It is the broker who looks at the default batch size with the version, and allocate unique txId to producer upon request. In other words, this code on the server side and we don't need to worry about Kafka user changing this code. Am I right?
> 
> Timothy Chen wrote:
>     You're right we don't need to worry about it for the user, but we need to worry about it when we change the default and do a rolling upgrade.
> 
> Dong Lin wrote:
>     Oh, I see your point. Sure, I agree that we should add a comment in the TransactionManger code so that any future developer should be aware of the effect of changing DefaultTransactionsIdBatchSize. But since this is just one of the many other configuration, I am not sure if it is worth saving this default value in zk node.
>     
>     Does that address your concern? Thanks!

Yes I don't think we need to do write it out, just a comment should be good IMO.


- Timothy


-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/23568/#review49696
-----------------------------------------------------------


On Aug. 6, 2014, 4:25 a.m., Dong Lin wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/23568/
> -----------------------------------------------------------
> 
> (Updated Aug. 6, 2014, 4:25 a.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1523
>     https://issues.apache.org/jira/browse/KAFKA-1523
> 
> 
> Repository: kafka
> 
> 
> Description
> -------
> 
> KAFKA-1523 Transaction manager and its failover handling.
> 
> 
> Diffs
> -----
> 
>   core/src/main/scala/kafka/admin/TopicCommand.scala 003a09c6160618bc94858ebc0d806b2aa4158e0a 
>   core/src/main/scala/kafka/cluster/Partition.scala 134aef9c88068443d4d465189f376dd78605b4f8 
>   core/src/main/scala/kafka/common/Topic.scala ad759786d1c22f67c47808c0b8f227eb2b1a9aa8 
>   core/src/main/scala/kafka/controller/ControllerChannelManager.scala ecbfa0f328ba6a652a758ab20cacef324a8b2fb8 
>   core/src/main/scala/kafka/message/Message.scala d2a7293c7be4022af30884330924791340acc5c1 
>   core/src/main/scala/kafka/server/KafkaApis.scala fd5f12ee31e78cdcda8e24a0ab3e1740b3928884 
>   core/src/main/scala/kafka/server/KafkaConfig.scala 1a45f8716ccc0398cf9395d91d66199d16882aae 
>   core/src/main/scala/kafka/server/KafkaServer.scala 28711182aaa70eaa623de858bc063cb2613b2a4d 
>   core/src/main/scala/kafka/server/ReplicaManager.scala 897783cb756de548a8b634876f729b63ffe9925e 
>   core/src/main/scala/kafka/server/RequestPurgatory.scala 3d0ff1e2dbd6a5c3587cffa283db70415af83a7b 
>   core/src/main/scala/kafka/server/TransactionManager.scala PRE-CREATION 
>   core/src/main/scala/kafka/utils/ZkUtils.scala dcdc1ce2b02c996294e19cf480736106aaf29511 
> 
> Diff: https://reviews.apache.org/r/23568/diff/
> 
> 
> Testing
> -------
> 
> 
> Thanks,
> 
> Dong Lin
> 
>


Re: Review Request 23568: Patch for KAFKA-1523

Posted by Dong Lin <li...@gmail.com>.

> On Aug. 6, 2014, 5:32 a.m., Timothy Chen wrote:
> > core/src/main/scala/kafka/server/TransactionManager.scala, line 300
> > <https://reviews.apache.org/r/23568/diff/6/?file=653418#file653418line300>
> >
> >     I'm just trying to think if there is a chance to have overlapping tx ids especially around failover. I'm not familiar with the tx changes so just wondering if the range size can cause issues. Seems like you're incrementing the zk version on each id fetch and allow a range of the batch size of them. I was thinking if I sent 999 messages and then failed over, and the new broker that took over has a small batch size now, even though the version has incremented the new range now overlaps then right? It might already been handled but just want to throw it out there.
> 
> Dong Lin wrote:
>     Sure. Thanks for taking time to help think about it.
>     
>     Here I assume the batch size should be the same across producers regardless of when they start. If a producer sends 999 messages, say with txid in range 1000 to 1999, and fails over, the next producer will get the txid in range 2000 to 2999. The range is guaranteed to be disjoint from ranges used by previous/existing producers. Does this make sense?
> 
> Dong Lin wrote:
>     Correction: I actually mean broker where I say producer in my response.
> 
> Timothy Chen wrote:
>     The next producer will do so by looking at the default batch size with the version, which is what I was thinking there really isn't anything guarding aganist the range being changed from the code.
>     The safest thing to do is perhaps write the range into zk node data and when the current range is smaller have a big warning.
>     But minimally we will want to leave a comment in the code on the batch size to have this warning so this isn't something to experiment with as it can mess up existing transactions.
> 
> Dong Lin wrote:
>     It is the broker who looks at the default batch size with the version, and allocate unique txId to producer upon request. In other words, this code on the server side and we don't need to worry about Kafka user changing this code. Am I right?
> 
> Timothy Chen wrote:
>     You're right we don't need to worry about it for the user, but we need to worry about it when we change the default and do a rolling upgrade.

Oh, I see your point. Sure, I agree that we should add a comment in the TransactionManger code so that any future developer should be aware of the effect of changing DefaultTransactionsIdBatchSize. But since this is just one of the many other configuration, I am not sure if it is worth saving this default value in zk node.

Does that address your concern? Thanks!


- Dong


-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/23568/#review49696
-----------------------------------------------------------


On Aug. 6, 2014, 4:25 a.m., Dong Lin wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/23568/
> -----------------------------------------------------------
> 
> (Updated Aug. 6, 2014, 4:25 a.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1523
>     https://issues.apache.org/jira/browse/KAFKA-1523
> 
> 
> Repository: kafka
> 
> 
> Description
> -------
> 
> KAFKA-1523 Transaction manager and its failover handling.
> 
> 
> Diffs
> -----
> 
>   core/src/main/scala/kafka/admin/TopicCommand.scala 003a09c6160618bc94858ebc0d806b2aa4158e0a 
>   core/src/main/scala/kafka/cluster/Partition.scala 134aef9c88068443d4d465189f376dd78605b4f8 
>   core/src/main/scala/kafka/common/Topic.scala ad759786d1c22f67c47808c0b8f227eb2b1a9aa8 
>   core/src/main/scala/kafka/controller/ControllerChannelManager.scala ecbfa0f328ba6a652a758ab20cacef324a8b2fb8 
>   core/src/main/scala/kafka/message/Message.scala d2a7293c7be4022af30884330924791340acc5c1 
>   core/src/main/scala/kafka/server/KafkaApis.scala fd5f12ee31e78cdcda8e24a0ab3e1740b3928884 
>   core/src/main/scala/kafka/server/KafkaConfig.scala 1a45f8716ccc0398cf9395d91d66199d16882aae 
>   core/src/main/scala/kafka/server/KafkaServer.scala 28711182aaa70eaa623de858bc063cb2613b2a4d 
>   core/src/main/scala/kafka/server/ReplicaManager.scala 897783cb756de548a8b634876f729b63ffe9925e 
>   core/src/main/scala/kafka/server/RequestPurgatory.scala 3d0ff1e2dbd6a5c3587cffa283db70415af83a7b 
>   core/src/main/scala/kafka/server/TransactionManager.scala PRE-CREATION 
>   core/src/main/scala/kafka/utils/ZkUtils.scala dcdc1ce2b02c996294e19cf480736106aaf29511 
> 
> Diff: https://reviews.apache.org/r/23568/diff/
> 
> 
> Testing
> -------
> 
> 
> Thanks,
> 
> Dong Lin
> 
>


Re: Review Request 23568: Patch for KAFKA-1523

Posted by Dong Lin <li...@gmail.com>.

> On Aug. 6, 2014, 5:32 a.m., Timothy Chen wrote:
> > core/src/main/scala/kafka/server/TransactionManager.scala, line 300
> > <https://reviews.apache.org/r/23568/diff/6/?file=653418#file653418line300>
> >
> >     I'm just trying to think if there is a chance to have overlapping tx ids especially around failover. I'm not familiar with the tx changes so just wondering if the range size can cause issues. Seems like you're incrementing the zk version on each id fetch and allow a range of the batch size of them. I was thinking if I sent 999 messages and then failed over, and the new broker that took over has a small batch size now, even though the version has incremented the new range now overlaps then right? It might already been handled but just want to throw it out there.
> 
> Dong Lin wrote:
>     Sure. Thanks for taking time to help think about it.
>     
>     Here I assume the batch size should be the same across producers regardless of when they start. If a producer sends 999 messages, say with txid in range 1000 to 1999, and fails over, the next producer will get the txid in range 2000 to 2999. The range is guaranteed to be disjoint from ranges used by previous/existing producers. Does this make sense?
> 
> Dong Lin wrote:
>     Correction: I actually mean broker where I say producer in my response.
> 
> Timothy Chen wrote:
>     The next producer will do so by looking at the default batch size with the version, which is what I was thinking there really isn't anything guarding aganist the range being changed from the code.
>     The safest thing to do is perhaps write the range into zk node data and when the current range is smaller have a big warning.
>     But minimally we will want to leave a comment in the code on the batch size to have this warning so this isn't something to experiment with as it can mess up existing transactions.

It is the broker who looks at the default batch size with the version, and allocate unique txId to producer upon request. In other words, this code on the server side and we don't need to worry about Kafka user changing this code. Am I right?


- Dong


-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/23568/#review49696
-----------------------------------------------------------


On Aug. 6, 2014, 4:25 a.m., Dong Lin wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/23568/
> -----------------------------------------------------------
> 
> (Updated Aug. 6, 2014, 4:25 a.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1523
>     https://issues.apache.org/jira/browse/KAFKA-1523
> 
> 
> Repository: kafka
> 
> 
> Description
> -------
> 
> KAFKA-1523 Transaction manager and its failover handling.
> 
> 
> Diffs
> -----
> 
>   core/src/main/scala/kafka/admin/TopicCommand.scala 003a09c6160618bc94858ebc0d806b2aa4158e0a 
>   core/src/main/scala/kafka/cluster/Partition.scala 134aef9c88068443d4d465189f376dd78605b4f8 
>   core/src/main/scala/kafka/common/Topic.scala ad759786d1c22f67c47808c0b8f227eb2b1a9aa8 
>   core/src/main/scala/kafka/controller/ControllerChannelManager.scala ecbfa0f328ba6a652a758ab20cacef324a8b2fb8 
>   core/src/main/scala/kafka/message/Message.scala d2a7293c7be4022af30884330924791340acc5c1 
>   core/src/main/scala/kafka/server/KafkaApis.scala fd5f12ee31e78cdcda8e24a0ab3e1740b3928884 
>   core/src/main/scala/kafka/server/KafkaConfig.scala 1a45f8716ccc0398cf9395d91d66199d16882aae 
>   core/src/main/scala/kafka/server/KafkaServer.scala 28711182aaa70eaa623de858bc063cb2613b2a4d 
>   core/src/main/scala/kafka/server/ReplicaManager.scala 897783cb756de548a8b634876f729b63ffe9925e 
>   core/src/main/scala/kafka/server/RequestPurgatory.scala 3d0ff1e2dbd6a5c3587cffa283db70415af83a7b 
>   core/src/main/scala/kafka/server/TransactionManager.scala PRE-CREATION 
>   core/src/main/scala/kafka/utils/ZkUtils.scala dcdc1ce2b02c996294e19cf480736106aaf29511 
> 
> Diff: https://reviews.apache.org/r/23568/diff/
> 
> 
> Testing
> -------
> 
> 
> Thanks,
> 
> Dong Lin
> 
>


Re: Review Request 23568: Patch for KAFKA-1523

Posted by Timothy Chen <tn...@apache.org>.

> On Aug. 6, 2014, 5:32 a.m., Timothy Chen wrote:
> > core/src/main/scala/kafka/server/TransactionManager.scala, line 300
> > <https://reviews.apache.org/r/23568/diff/6/?file=653418#file653418line300>
> >
> >     I'm just trying to think if there is a chance to have overlapping tx ids especially around failover. I'm not familiar with the tx changes so just wondering if the range size can cause issues. Seems like you're incrementing the zk version on each id fetch and allow a range of the batch size of them. I was thinking if I sent 999 messages and then failed over, and the new broker that took over has a small batch size now, even though the version has incremented the new range now overlaps then right? It might already been handled but just want to throw it out there.
> 
> Dong Lin wrote:
>     Sure. Thanks for taking time to help think about it.
>     
>     Here I assume the batch size should be the same across producers regardless of when they start. If a producer sends 999 messages, say with txid in range 1000 to 1999, and fails over, the next producer will get the txid in range 2000 to 2999. The range is guaranteed to be disjoint from ranges used by previous/existing producers. Does this make sense?
> 
> Dong Lin wrote:
>     Correction: I actually mean broker where I say producer in my response.

The next producer will do so by looking at the default batch size with the version, which is what I was thinking there really isn't anything guarding aganist the range being changed from the code.
The safest thing to do is perhaps write the range into zk node data and when the current range is smaller have a big warning.
But minimally we will want to leave a comment in the code on the batch size to have this warning so this isn't something to experiment with as it can mess up existing transactions.


- Timothy


-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/23568/#review49696
-----------------------------------------------------------


On Aug. 6, 2014, 4:25 a.m., Dong Lin wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/23568/
> -----------------------------------------------------------
> 
> (Updated Aug. 6, 2014, 4:25 a.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1523
>     https://issues.apache.org/jira/browse/KAFKA-1523
> 
> 
> Repository: kafka
> 
> 
> Description
> -------
> 
> KAFKA-1523 Transaction manager and its failover handling.
> 
> 
> Diffs
> -----
> 
>   core/src/main/scala/kafka/admin/TopicCommand.scala 003a09c6160618bc94858ebc0d806b2aa4158e0a 
>   core/src/main/scala/kafka/cluster/Partition.scala 134aef9c88068443d4d465189f376dd78605b4f8 
>   core/src/main/scala/kafka/common/Topic.scala ad759786d1c22f67c47808c0b8f227eb2b1a9aa8 
>   core/src/main/scala/kafka/controller/ControllerChannelManager.scala ecbfa0f328ba6a652a758ab20cacef324a8b2fb8 
>   core/src/main/scala/kafka/message/Message.scala d2a7293c7be4022af30884330924791340acc5c1 
>   core/src/main/scala/kafka/server/KafkaApis.scala fd5f12ee31e78cdcda8e24a0ab3e1740b3928884 
>   core/src/main/scala/kafka/server/KafkaConfig.scala 1a45f8716ccc0398cf9395d91d66199d16882aae 
>   core/src/main/scala/kafka/server/KafkaServer.scala 28711182aaa70eaa623de858bc063cb2613b2a4d 
>   core/src/main/scala/kafka/server/ReplicaManager.scala 897783cb756de548a8b634876f729b63ffe9925e 
>   core/src/main/scala/kafka/server/RequestPurgatory.scala 3d0ff1e2dbd6a5c3587cffa283db70415af83a7b 
>   core/src/main/scala/kafka/server/TransactionManager.scala PRE-CREATION 
>   core/src/main/scala/kafka/utils/ZkUtils.scala dcdc1ce2b02c996294e19cf480736106aaf29511 
> 
> Diff: https://reviews.apache.org/r/23568/diff/
> 
> 
> Testing
> -------
> 
> 
> Thanks,
> 
> Dong Lin
> 
>


Re: Review Request 23568: Patch for KAFKA-1523

Posted by Timothy Chen <tn...@apache.org>.

> On Aug. 6, 2014, 5:32 a.m., Timothy Chen wrote:
> > core/src/main/scala/kafka/server/TransactionManager.scala, line 300
> > <https://reviews.apache.org/r/23568/diff/6/?file=653418#file653418line300>
> >
> >     I'm just trying to think if there is a chance to have overlapping tx ids especially around failover. I'm not familiar with the tx changes so just wondering if the range size can cause issues. Seems like you're incrementing the zk version on each id fetch and allow a range of the batch size of them. I was thinking if I sent 999 messages and then failed over, and the new broker that took over has a small batch size now, even though the version has incremented the new range now overlaps then right? It might already been handled but just want to throw it out there.
> 
> Dong Lin wrote:
>     Sure. Thanks for taking time to help think about it.
>     
>     Here I assume the batch size should be the same across producers regardless of when they start. If a producer sends 999 messages, say with txid in range 1000 to 1999, and fails over, the next producer will get the txid in range 2000 to 2999. The range is guaranteed to be disjoint from ranges used by previous/existing producers. Does this make sense?
> 
> Dong Lin wrote:
>     Correction: I actually mean broker where I say producer in my response.
> 
> Timothy Chen wrote:
>     The next producer will do so by looking at the default batch size with the version, which is what I was thinking there really isn't anything guarding aganist the range being changed from the code.
>     The safest thing to do is perhaps write the range into zk node data and when the current range is smaller have a big warning.
>     But minimally we will want to leave a comment in the code on the batch size to have this warning so this isn't something to experiment with as it can mess up existing transactions.
> 
> Dong Lin wrote:
>     It is the broker who looks at the default batch size with the version, and allocate unique txId to producer upon request. In other words, this code on the server side and we don't need to worry about Kafka user changing this code. Am I right?

You're right we don't need to worry about it for the user, but we need to worry about it when we change the default and do a rolling upgrade.


- Timothy


-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/23568/#review49696
-----------------------------------------------------------


On Aug. 6, 2014, 4:25 a.m., Dong Lin wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/23568/
> -----------------------------------------------------------
> 
> (Updated Aug. 6, 2014, 4:25 a.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1523
>     https://issues.apache.org/jira/browse/KAFKA-1523
> 
> 
> Repository: kafka
> 
> 
> Description
> -------
> 
> KAFKA-1523 Transaction manager and its failover handling.
> 
> 
> Diffs
> -----
> 
>   core/src/main/scala/kafka/admin/TopicCommand.scala 003a09c6160618bc94858ebc0d806b2aa4158e0a 
>   core/src/main/scala/kafka/cluster/Partition.scala 134aef9c88068443d4d465189f376dd78605b4f8 
>   core/src/main/scala/kafka/common/Topic.scala ad759786d1c22f67c47808c0b8f227eb2b1a9aa8 
>   core/src/main/scala/kafka/controller/ControllerChannelManager.scala ecbfa0f328ba6a652a758ab20cacef324a8b2fb8 
>   core/src/main/scala/kafka/message/Message.scala d2a7293c7be4022af30884330924791340acc5c1 
>   core/src/main/scala/kafka/server/KafkaApis.scala fd5f12ee31e78cdcda8e24a0ab3e1740b3928884 
>   core/src/main/scala/kafka/server/KafkaConfig.scala 1a45f8716ccc0398cf9395d91d66199d16882aae 
>   core/src/main/scala/kafka/server/KafkaServer.scala 28711182aaa70eaa623de858bc063cb2613b2a4d 
>   core/src/main/scala/kafka/server/ReplicaManager.scala 897783cb756de548a8b634876f729b63ffe9925e 
>   core/src/main/scala/kafka/server/RequestPurgatory.scala 3d0ff1e2dbd6a5c3587cffa283db70415af83a7b 
>   core/src/main/scala/kafka/server/TransactionManager.scala PRE-CREATION 
>   core/src/main/scala/kafka/utils/ZkUtils.scala dcdc1ce2b02c996294e19cf480736106aaf29511 
> 
> Diff: https://reviews.apache.org/r/23568/diff/
> 
> 
> Testing
> -------
> 
> 
> Thanks,
> 
> Dong Lin
> 
>


Re: Review Request 23568: Patch for KAFKA-1523

Posted by Timothy Chen <tn...@apache.org>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/23568/#review49696
-----------------------------------------------------------



core/src/main/scala/kafka/server/TransactionManager.scala
<https://reviews.apache.org/r/23568/#comment86986>

    I'm just trying to think if there is a chance to have overlapping tx ids especially around failover. I'm not familiar with the tx changes so just wondering if the range size can cause issues. Seems like you're incrementing the zk version on each id fetch and allow a range of the batch size of them. I was thinking if I sent 999 messages and then failed over, and the new broker that took over has a small batch size now, even though the version has incremented the new range now overlaps then right? It might already been handled but just want to throw it out there. 


- Timothy Chen


On Aug. 6, 2014, 4:25 a.m., Dong Lin wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/23568/
> -----------------------------------------------------------
> 
> (Updated Aug. 6, 2014, 4:25 a.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1523
>     https://issues.apache.org/jira/browse/KAFKA-1523
> 
> 
> Repository: kafka
> 
> 
> Description
> -------
> 
> KAFKA-1523 Transaction manager and its failover handling.
> 
> 
> Diffs
> -----
> 
>   core/src/main/scala/kafka/admin/TopicCommand.scala 003a09c6160618bc94858ebc0d806b2aa4158e0a 
>   core/src/main/scala/kafka/cluster/Partition.scala 134aef9c88068443d4d465189f376dd78605b4f8 
>   core/src/main/scala/kafka/common/Topic.scala ad759786d1c22f67c47808c0b8f227eb2b1a9aa8 
>   core/src/main/scala/kafka/controller/ControllerChannelManager.scala ecbfa0f328ba6a652a758ab20cacef324a8b2fb8 
>   core/src/main/scala/kafka/message/Message.scala d2a7293c7be4022af30884330924791340acc5c1 
>   core/src/main/scala/kafka/server/KafkaApis.scala fd5f12ee31e78cdcda8e24a0ab3e1740b3928884 
>   core/src/main/scala/kafka/server/KafkaConfig.scala 1a45f8716ccc0398cf9395d91d66199d16882aae 
>   core/src/main/scala/kafka/server/KafkaServer.scala 28711182aaa70eaa623de858bc063cb2613b2a4d 
>   core/src/main/scala/kafka/server/ReplicaManager.scala 897783cb756de548a8b634876f729b63ffe9925e 
>   core/src/main/scala/kafka/server/RequestPurgatory.scala 3d0ff1e2dbd6a5c3587cffa283db70415af83a7b 
>   core/src/main/scala/kafka/server/TransactionManager.scala PRE-CREATION 
>   core/src/main/scala/kafka/utils/ZkUtils.scala dcdc1ce2b02c996294e19cf480736106aaf29511 
> 
> Diff: https://reviews.apache.org/r/23568/diff/
> 
> 
> Testing
> -------
> 
> 
> Thanks,
> 
> Dong Lin
> 
>


Re: Review Request 23568: Patch for KAFKA-1523

Posted by Dong Lin <li...@gmail.com>.

> On Aug. 6, 2014, 4:39 a.m., Timothy Chen wrote:
> > core/src/main/scala/kafka/server/TransactionManager.scala, line 300
> > <https://reviews.apache.org/r/23568/diff/6/?file=653418#file653418line300>
> >
> >     If the batch size changes to a smaller number does the tx Id range overlap then?

Why? The tx Id range should not overlap regardless of the actual value of batch size.


> On Aug. 6, 2014, 4:39 a.m., Timothy Chen wrote:
> > core/src/main/scala/kafka/admin/TopicCommand.scala, line 109
> > <https://reviews.apache.org/r/23568/diff/6/?file=653408#file653408line109>
> >
> >     Why not use the InternalTopics array here?

Thanks! This is a good point, and I will use InternalTopics in the updated patch.


> On Aug. 6, 2014, 4:39 a.m., Timothy Chen wrote:
> > core/src/main/scala/kafka/server/KafkaApis.scala, line 255
> > <https://reviews.apache.org/r/23568/diff/6/?file=653413#file653413line255>
> >
> >     that is quite a mouthful, should it be called just producerTxRequestToCoordinator?

I agree, it is quite mouthful. When naming the functions producerRequestFromTxRequestToBroker and producerRequestFromTxRequestToBroker, I just want to be consistent with the name producerRequestFromOffsetCommit.


- Dong


-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/23568/#review49692
-----------------------------------------------------------


On Aug. 6, 2014, 4:25 a.m., Dong Lin wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/23568/
> -----------------------------------------------------------
> 
> (Updated Aug. 6, 2014, 4:25 a.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1523
>     https://issues.apache.org/jira/browse/KAFKA-1523
> 
> 
> Repository: kafka
> 
> 
> Description
> -------
> 
> KAFKA-1523 Transaction manager and its failover handling.
> 
> 
> Diffs
> -----
> 
>   core/src/main/scala/kafka/admin/TopicCommand.scala 003a09c6160618bc94858ebc0d806b2aa4158e0a 
>   core/src/main/scala/kafka/cluster/Partition.scala 134aef9c88068443d4d465189f376dd78605b4f8 
>   core/src/main/scala/kafka/common/Topic.scala ad759786d1c22f67c47808c0b8f227eb2b1a9aa8 
>   core/src/main/scala/kafka/controller/ControllerChannelManager.scala ecbfa0f328ba6a652a758ab20cacef324a8b2fb8 
>   core/src/main/scala/kafka/message/Message.scala d2a7293c7be4022af30884330924791340acc5c1 
>   core/src/main/scala/kafka/server/KafkaApis.scala fd5f12ee31e78cdcda8e24a0ab3e1740b3928884 
>   core/src/main/scala/kafka/server/KafkaConfig.scala 1a45f8716ccc0398cf9395d91d66199d16882aae 
>   core/src/main/scala/kafka/server/KafkaServer.scala 28711182aaa70eaa623de858bc063cb2613b2a4d 
>   core/src/main/scala/kafka/server/ReplicaManager.scala 897783cb756de548a8b634876f729b63ffe9925e 
>   core/src/main/scala/kafka/server/RequestPurgatory.scala 3d0ff1e2dbd6a5c3587cffa283db70415af83a7b 
>   core/src/main/scala/kafka/server/TransactionManager.scala PRE-CREATION 
>   core/src/main/scala/kafka/utils/ZkUtils.scala dcdc1ce2b02c996294e19cf480736106aaf29511 
> 
> Diff: https://reviews.apache.org/r/23568/diff/
> 
> 
> Testing
> -------
> 
> 
> Thanks,
> 
> Dong Lin
> 
>


Re: Review Request 23568: Patch for KAFKA-1523

Posted by Timothy Chen <tn...@apache.org>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/23568/#review49692
-----------------------------------------------------------



core/src/main/scala/kafka/admin/TopicCommand.scala
<https://reviews.apache.org/r/23568/#comment86976>

    Why not use the InternalTopics array here?



core/src/main/scala/kafka/server/KafkaApis.scala
<https://reviews.apache.org/r/23568/#comment86978>

    that is quite a mouthful, should it be called just producerTxRequestToCoordinator?



core/src/main/scala/kafka/server/TransactionManager.scala
<https://reviews.apache.org/r/23568/#comment86979>

    If the batch size changes to a smaller number does the tx Id range overlap then?


- Timothy Chen


On Aug. 6, 2014, 4:25 a.m., Dong Lin wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/23568/
> -----------------------------------------------------------
> 
> (Updated Aug. 6, 2014, 4:25 a.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1523
>     https://issues.apache.org/jira/browse/KAFKA-1523
> 
> 
> Repository: kafka
> 
> 
> Description
> -------
> 
> KAFKA-1523 Transaction manager and its failover handling.
> 
> 
> Diffs
> -----
> 
>   core/src/main/scala/kafka/admin/TopicCommand.scala 003a09c6160618bc94858ebc0d806b2aa4158e0a 
>   core/src/main/scala/kafka/cluster/Partition.scala 134aef9c88068443d4d465189f376dd78605b4f8 
>   core/src/main/scala/kafka/common/Topic.scala ad759786d1c22f67c47808c0b8f227eb2b1a9aa8 
>   core/src/main/scala/kafka/controller/ControllerChannelManager.scala ecbfa0f328ba6a652a758ab20cacef324a8b2fb8 
>   core/src/main/scala/kafka/message/Message.scala d2a7293c7be4022af30884330924791340acc5c1 
>   core/src/main/scala/kafka/server/KafkaApis.scala fd5f12ee31e78cdcda8e24a0ab3e1740b3928884 
>   core/src/main/scala/kafka/server/KafkaConfig.scala 1a45f8716ccc0398cf9395d91d66199d16882aae 
>   core/src/main/scala/kafka/server/KafkaServer.scala 28711182aaa70eaa623de858bc063cb2613b2a4d 
>   core/src/main/scala/kafka/server/ReplicaManager.scala 897783cb756de548a8b634876f729b63ffe9925e 
>   core/src/main/scala/kafka/server/RequestPurgatory.scala 3d0ff1e2dbd6a5c3587cffa283db70415af83a7b 
>   core/src/main/scala/kafka/server/TransactionManager.scala PRE-CREATION 
>   core/src/main/scala/kafka/utils/ZkUtils.scala dcdc1ce2b02c996294e19cf480736106aaf29511 
> 
> Diff: https://reviews.apache.org/r/23568/diff/
> 
> 
> Testing
> -------
> 
> 
> Thanks,
> 
> Dong Lin
> 
>


Re: Review Request 23568: Patch for KAFKA-1523

Posted by Dong Lin <li...@gmail.com>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/23568/
-----------------------------------------------------------

(Updated Aug. 9, 2014, 4:36 a.m.)


Review request for kafka.


Bugs: KAFKA-1523
    https://issues.apache.org/jira/browse/KAFKA-1523


Repository: kafka


Description (updated)
-------

KAFKA-1523; Transaction manager and its failover handling


Diffs (updated)
-----

  core/src/main/scala/kafka/admin/TopicCommand.scala 003a09c6160618bc94858ebc0d806b2aa4158e0a 
  core/src/main/scala/kafka/cluster/Partition.scala 134aef9c88068443d4d465189f376dd78605b4f8 
  core/src/main/scala/kafka/common/Topic.scala ad759786d1c22f67c47808c0b8f227eb2b1a9aa8 
  core/src/main/scala/kafka/controller/ControllerChannelManager.scala ecbfa0f328ba6a652a758ab20cacef324a8b2fb8 
  core/src/main/scala/kafka/message/Message.scala d2a7293c7be4022af30884330924791340acc5c1 
  core/src/main/scala/kafka/server/KafkaApis.scala fd5f12ee31e78cdcda8e24a0ab3e1740b3928884 
  core/src/main/scala/kafka/server/KafkaConfig.scala 1a45f8716ccc0398cf9395d91d66199d16882aae 
  core/src/main/scala/kafka/server/KafkaServer.scala 28711182aaa70eaa623de858bc063cb2613b2a4d 
  core/src/main/scala/kafka/server/ReplicaManager.scala 897783cb756de548a8b634876f729b63ffe9925e 
  core/src/main/scala/kafka/server/RequestPurgatory.scala 3d0ff1e2dbd6a5c3587cffa283db70415af83a7b 
  core/src/main/scala/kafka/server/TransactionManager.scala PRE-CREATION 
  core/src/main/scala/kafka/utils/ZkUtils.scala dcdc1ce2b02c996294e19cf480736106aaf29511 
  core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala b1c4ce9f66aa86c68f2e6988f67546fa63ed1f9b 

Diff: https://reviews.apache.org/r/23568/diff/


Testing
-------


Thanks,

Dong Lin