You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by Jun Rao <ju...@gmail.com> on 2015/02/28 17:43:21 UTC

Re: Review Request 25995: Patch for KAFKA-1650

-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/25995/#review74677
-----------------------------------------------------------


Sorry for the late review. A few more comments below.


core/src/main/scala/kafka/tools/MirrorMaker.scala
<https://reviews.apache.org/r/25995/#comment121347>

    Isn't NumUnackedMessages the same as UnackedOffsetListSize since they are always modified together?



core/src/main/scala/kafka/tools/MirrorMaker.scala
<https://reviews.apache.org/r/25995/#comment121348>

    It's kind of weird that we only catch OOME here. OOME can be thrown in other places as well.



core/src/main/scala/kafka/tools/MirrorMaker.scala
<https://reviews.apache.org/r/25995/#comment121349>

    I don't quite follow the comment on synchronization. Once producer.send is called, the callback can be called anytime. In the callback, we will remove the offset from unackedOffsets. However, the removal is not synchronized on unackedOffsets.



core/src/main/scala/kafka/tools/MirrorMaker.scala
<https://reviews.apache.org/r/25995/#comment121350>

    Should we synchronize on the removal as well?


- Jun Rao


On Dec. 24, 2014, 12:44 a.m., Jiangjie Qin wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/25995/
> -----------------------------------------------------------
> 
> (Updated Dec. 24, 2014, 12:44 a.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1650 and KAKFA-1650
>     https://issues.apache.org/jira/browse/KAFKA-1650
>     https://issues.apache.org/jira/browse/KAKFA-1650
> 
> 
> Repository: kafka
> 
> 
> Description
> -------
> 
> Addressed Guozhang's comments.
> 
> 
> Addressed Guozhang's comments
> 
> 
> commit before switch to trunk
> 
> 
> commit before rebase
> 
> 
> Rebased on trunk, Addressed Guozhang's comments.
> 
> 
> Addressed Guozhang's comments on MaxInFlightRequests
> 
> 
> Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into mirrormaker-redesign
> 
> 
> Incorporated Guozhang's comments
> 
> 
> Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into mirrormaker-redesign
> 
> 
> Merged KAFKA-345 into this patch. Incorporated Joel and Jun's comments.
> 
> 
> Added consumer rebalance listener to mirror maker, will test it later.
> 
> 
> Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into mirrormaker-redesign
> 
> Conflicts:
> 	core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
> 	core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala
> 
> added custom config for consumer rebalance listener
> 
> 
> Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into mirrormaker-redesign
> 
> 
> Add configurable consumer rebalance listener
> 
> 
> Incorporated Guozhang's comments
> 
> 
> Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into mirrormaker-redesign
> 
> 
> Incorporated Guozhang's comments.
> 
> 
> Addressed Guozhang's comment.
> 
> 
> numMessageUnacked should be decremented no matter the send was successful or not.
> 
> 
> Addressed Jun's comments.
> 
> 
> Incorporated Jun's comments
> 
> 
> Incorporated Jun's comments and rebased on trunk
> 
> 
> Rebased on current trunk
> 
> 
> Addressed Joel's comments.
> 
> 
> Addressed Joel's comments.
> 
> 
> Incorporated Joel's comments
> 
> 
> Incorporated Joel's comments
> 
> 
> Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into mirrormaker-redesign
> 
> 
> Incorporated Joel's comments
> 
> 
> Fix a bug in metric.
> 
> 
> Missed some change in the prvevious patch submission, submit patch again.
> 
> 
> change offset commit thread to use scheduler.
> 
> 
> Addressed Joel's comments.
> 
> 
> Incorporated Joel's comments
> 
> 
> set acks=-1 if --no.data.loss is specified.
> 
> 
> Diffs
> -----
> 
>   core/src/main/scala/kafka/consumer/ConsumerConnector.scala 62c0686e816d2888772d5a911becf625eedee397 
>   core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala e991d2187d03241f639eeaf6769fb59c8c99664c 
>   core/src/main/scala/kafka/javaapi/consumer/ZookeeperConsumerConnector.scala 9baad34a9793e5067d11289ece2154ba87b388af 
>   core/src/main/scala/kafka/tools/MirrorMaker.scala 53cb16c2949e0ac36a0e943564fc9fc9b4c84caa 
>   core/src/main/scala/kafka/utils/DoublyLinkedList.scala PRE-CREATION 
>   core/src/test/scala/unit/kafka/utils/UtilsTest.scala 066553cad290c3d3821537a964c7d713c122d9fc 
> 
> Diff: https://reviews.apache.org/r/25995/diff/
> 
> 
> Testing
> -------
> 
> 
> Thanks,
> 
> Jiangjie Qin
> 
>


Re: Review Request 25995: Patch for KAFKA-1650

Posted by Jiangjie Qin <be...@gmail.com>.

> On Feb. 28, 2015, 4:43 p.m., Jun Rao wrote:
> > Sorry for the late review. A few more comments below.

Thanks a lot for the review, Jun. We actually have a new design for mirror maker based on the flush() call of producer (KAFKA-1865). The design is updated in KIP-3 and it is in voting process. It would be great if you can take a look at that. We plan to refactor mirror maker after the flush() call is checked in. But really appreciate the review and please see the answers below.


> On Feb. 28, 2015, 4:43 p.m., Jun Rao wrote:
> > core/src/main/scala/kafka/tools/MirrorMaker.scala, lines 608-616
> > <https://reviews.apache.org/r/25995/diff/21/?file=799841#file799841line608>
> >
> >     I don't quite follow the comment on synchronization. Once producer.send is called, the callback can be called anytime. In the callback, we will remove the offset from unackedOffsets. However, the removal is not synchronized on unackedOffsets.

Yes... That's my bad. I somehow missed putting synchronization on the removal... I realized this after we decided to move to the new design using flush(), so I did not submit follow up patch.


> On Feb. 28, 2015, 4:43 p.m., Jun Rao wrote:
> > core/src/main/scala/kafka/tools/MirrorMaker.scala, lines 729-731
> > <https://reviews.apache.org/r/25995/diff/21/?file=799841#file799841line729>
> >
> >     Should we synchronize on the removal as well?

Please see above.


> On Feb. 28, 2015, 4:43 p.m., Jun Rao wrote:
> > core/src/main/scala/kafka/tools/MirrorMaker.scala, lines 580-582
> > <https://reviews.apache.org/r/25995/diff/21/?file=799841#file799841line580>
> >
> >     It's kind of weird that we only catch OOME here. OOME can be thrown in other places as well.

We used to catch all the throwables and exit, but later on we thought it might be better to just exit on fatal exception and ignore the other exceptions to let offset commit thread move on. So we fail on one commit but the next commit might succeed.


> On Feb. 28, 2015, 4:43 p.m., Jun Rao wrote:
> > core/src/main/scala/kafka/tools/MirrorMaker.scala, lines 81-92
> > <https://reviews.apache.org/r/25995/diff/21/?file=799841#file799841line81>
> >
> >     Isn't NumUnackedMessages the same as UnackedOffsetListSize since they are always modified together?

Good point! We probably should just use the UnackedOffsetListSize. It is more clear.


- Jiangjie


-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/25995/#review74677
-----------------------------------------------------------


On Dec. 24, 2014, 12:44 a.m., Jiangjie Qin wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/25995/
> -----------------------------------------------------------
> 
> (Updated Dec. 24, 2014, 12:44 a.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1650 and KAKFA-1650
>     https://issues.apache.org/jira/browse/KAFKA-1650
>     https://issues.apache.org/jira/browse/KAKFA-1650
> 
> 
> Repository: kafka
> 
> 
> Description
> -------
> 
> Addressed Guozhang's comments.
> 
> 
> Addressed Guozhang's comments
> 
> 
> commit before switch to trunk
> 
> 
> commit before rebase
> 
> 
> Rebased on trunk, Addressed Guozhang's comments.
> 
> 
> Addressed Guozhang's comments on MaxInFlightRequests
> 
> 
> Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into mirrormaker-redesign
> 
> 
> Incorporated Guozhang's comments
> 
> 
> Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into mirrormaker-redesign
> 
> 
> Merged KAFKA-345 into this patch. Incorporated Joel and Jun's comments.
> 
> 
> Added consumer rebalance listener to mirror maker, will test it later.
> 
> 
> Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into mirrormaker-redesign
> 
> Conflicts:
> 	core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
> 	core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala
> 
> added custom config for consumer rebalance listener
> 
> 
> Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into mirrormaker-redesign
> 
> 
> Add configurable consumer rebalance listener
> 
> 
> Incorporated Guozhang's comments
> 
> 
> Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into mirrormaker-redesign
> 
> 
> Incorporated Guozhang's comments.
> 
> 
> Addressed Guozhang's comment.
> 
> 
> numMessageUnacked should be decremented no matter the send was successful or not.
> 
> 
> Addressed Jun's comments.
> 
> 
> Incorporated Jun's comments
> 
> 
> Incorporated Jun's comments and rebased on trunk
> 
> 
> Rebased on current trunk
> 
> 
> Addressed Joel's comments.
> 
> 
> Addressed Joel's comments.
> 
> 
> Incorporated Joel's comments
> 
> 
> Incorporated Joel's comments
> 
> 
> Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into mirrormaker-redesign
> 
> 
> Incorporated Joel's comments
> 
> 
> Fix a bug in metric.
> 
> 
> Missed some change in the prvevious patch submission, submit patch again.
> 
> 
> change offset commit thread to use scheduler.
> 
> 
> Addressed Joel's comments.
> 
> 
> Incorporated Joel's comments
> 
> 
> set acks=-1 if --no.data.loss is specified.
> 
> 
> Diffs
> -----
> 
>   core/src/main/scala/kafka/consumer/ConsumerConnector.scala 62c0686e816d2888772d5a911becf625eedee397 
>   core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala e991d2187d03241f639eeaf6769fb59c8c99664c 
>   core/src/main/scala/kafka/javaapi/consumer/ZookeeperConsumerConnector.scala 9baad34a9793e5067d11289ece2154ba87b388af 
>   core/src/main/scala/kafka/tools/MirrorMaker.scala 53cb16c2949e0ac36a0e943564fc9fc9b4c84caa 
>   core/src/main/scala/kafka/utils/DoublyLinkedList.scala PRE-CREATION 
>   core/src/test/scala/unit/kafka/utils/UtilsTest.scala 066553cad290c3d3821537a964c7d713c122d9fc 
> 
> Diff: https://reviews.apache.org/r/25995/diff/
> 
> 
> Testing
> -------
> 
> 
> Thanks,
> 
> Jiangjie Qin
> 
>