You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by Guozhang Wang <wa...@gmail.com> on 2014/12/03 19:57:43 UTC

Re: Review Request 25995: Patch for KAFKA-1650

-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/25995/#review63703
-----------------------------------------------------------



core/src/main/scala/kafka/tools/MirrorMaker.scala
<https://reviews.apache.org/r/25995/#comment105980>

    Could you add a comment here about the in-flight-request config and its effects?



core/src/main/scala/kafka/tools/MirrorMaker.scala
<https://reviews.apache.org/r/25995/#comment105982>

    I am wondering are there some scenarios we want to allow customized rebalance listener? Also if we decide to make this customizable we need to make it clear that the customized listener would expect the datachannel as its constructor since this is not checked at compile time.



core/src/main/scala/kafka/tools/MirrorMaker.scala
<https://reviews.apache.org/r/25995/#comment105984>

    I think there may be a race condition here, for example consider this sequence:
    
    1. data channel only contain one message.
    2. producer take the message from channel.
    3. dataChannel.clear() called.
    4. numMessageUnacked.get() == 0, offsets committed.
    5. producer.send() called, increment numMessageUnacked.
    6. data duplicate happens when the rebalance finished.
    
    I think on line 599 we should use "while" instead of "if", but this alone does not fix this.


- Guozhang Wang


On Nov. 24, 2014, 4:15 p.m., Jiangjie Qin wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/25995/
> -----------------------------------------------------------
> 
> (Updated Nov. 24, 2014, 4:15 p.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1650 and KAKFA-1650
>     https://issues.apache.org/jira/browse/KAFKA-1650
>     https://issues.apache.org/jira/browse/KAKFA-1650
> 
> 
> Repository: kafka
> 
> 
> Description
> -------
> 
> Addressed Guozhang's comments.
> 
> 
> Addressed Guozhang's comments
> 
> 
> commit before switch to trunk
> 
> 
> commit before rebase
> 
> 
> Rebased on trunk, Addressed Guozhang's comments.
> 
> 
> Addressed Guozhang's comments on MaxInFlightRequests
> 
> 
> Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into mirrormaker-redesign
> 
> 
> Incorporated Guozhang's comments
> 
> 
> Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into mirrormaker-redesign
> 
> 
> Merged KAFKA-345 into this patch. Incorporated Joel and Jun's comments.
> 
> 
> Added consumer rebalance listener to mirror maker, will test it later.
> 
> 
> Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into mirrormaker-redesign
> 
> Conflicts:
> 	core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
> 	core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala
> 
> added custom config for consumer rebalance listener
> 
> 
> Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into mirrormaker-redesign
> 
> 
> Add configurable consumer rebalance listener
> 
> 
> Diffs
> -----
> 
>   core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala 3e1718bc7ca6c835a59ca7c6879f558ec06866ee 
>   core/src/main/scala/kafka/javaapi/consumer/ConsumerRebalanceListener.java PRE-CREATION 
>   core/src/main/scala/kafka/javaapi/consumer/ZookeeperConsumerConnector.scala 9d5a47fb8e04d0055cce820afde7f73affc0a984 
>   core/src/main/scala/kafka/tools/MirrorMaker.scala f399105087588946987bbc84e3759935d9498b6a 
>   core/src/main/scala/kafka/utils/ByteBoundedBlockingQueue.scala 6a85d7e494f6c88798133a17f6180b61029dff58 
>   core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala 8c4687b2c96fddab7c8454a5a8011c3bab0897a0 
> 
> Diff: https://reviews.apache.org/r/25995/diff/
> 
> 
> Testing
> -------
> 
> 
> Thanks,
> 
> Jiangjie Qin
> 
>


Re: Review Request 25995: Patch for KAFKA-1650

Posted by Guozhang Wang <wa...@gmail.com>.

> On Dec. 3, 2014, 6:57 p.m., Guozhang Wang wrote:
> > core/src/main/scala/kafka/tools/MirrorMaker.scala, line 613
> > <https://reviews.apache.org/r/25995/diff/7/?file=774325#file774325line613>
> >
> >     I think there may be a race condition here, for example consider this sequence:
> >     
> >     1. data channel only contain one message.
> >     2. producer take the message from channel.
> >     3. dataChannel.clear() called.
> >     4. numMessageUnacked.get() == 0, offsets committed.
> >     5. producer.send() called, increment numMessageUnacked.
> >     6. data duplicate happens when the rebalance finished.
> >     
> >     I think on line 599 we should use "while" instead of "if", but this alone does not fix this.
> 
> Jiangjie Qin wrote:
>     Yes, I actually have comment on this race condition in line 581. The reason I'm not handling it here is:
>     1. The chance of this situation is very slight.
>     2. A single duplicate message does not really hurt.
>     3. The fix increase the complexity of the code (looking into the producer thread status) and I'm not sure if it worth doing.
>     4. Even if we fix this, from the producer side, duplicates could still happen.

Shall we change line 691 to "while (numMessageUnacked.get() > 0)" at least?


- Guozhang


-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/25995/#review63703
-----------------------------------------------------------


On Dec. 3, 2014, 11:02 p.m., Jiangjie Qin wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/25995/
> -----------------------------------------------------------
> 
> (Updated Dec. 3, 2014, 11:02 p.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1650 and KAKFA-1650
>     https://issues.apache.org/jira/browse/KAFKA-1650
>     https://issues.apache.org/jira/browse/KAKFA-1650
> 
> 
> Repository: kafka
> 
> 
> Description
> -------
> 
> Addressed Guozhang's comments.
> 
> 
> Addressed Guozhang's comments
> 
> 
> commit before switch to trunk
> 
> 
> commit before rebase
> 
> 
> Rebased on trunk, Addressed Guozhang's comments.
> 
> 
> Addressed Guozhang's comments on MaxInFlightRequests
> 
> 
> Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into mirrormaker-redesign
> 
> 
> Incorporated Guozhang's comments
> 
> 
> Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into mirrormaker-redesign
> 
> 
> Merged KAFKA-345 into this patch. Incorporated Joel and Jun's comments.
> 
> 
> Added consumer rebalance listener to mirror maker, will test it later.
> 
> 
> Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into mirrormaker-redesign
> 
> Conflicts:
> 	core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
> 	core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala
> 
> added custom config for consumer rebalance listener
> 
> 
> Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into mirrormaker-redesign
> 
> 
> Add configurable consumer rebalance listener
> 
> 
> Incorporated Guozhang's comments
> 
> 
> Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into mirrormaker-redesign
> 
> 
> Incorporated Guozhang's comments.
> 
> 
> Diffs
> -----
> 
>   core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala da29a8cb461099eb675161db2f11a9937424a5c6 
>   core/src/main/scala/kafka/javaapi/consumer/ConsumerRebalanceListener.java PRE-CREATION 
>   core/src/main/scala/kafka/javaapi/consumer/ZookeeperConsumerConnector.scala 9d5a47fb8e04d0055cce820afde7f73affc0a984 
>   core/src/main/scala/kafka/tools/MirrorMaker.scala f399105087588946987bbc84e3759935d9498b6a 
>   core/src/main/scala/kafka/utils/ByteBoundedBlockingQueue.scala 6a85d7e494f6c88798133a17f6180b61029dff58 
>   core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala 8c4687b2c96fddab7c8454a5a8011c3bab0897a0 
> 
> Diff: https://reviews.apache.org/r/25995/diff/
> 
> 
> Testing
> -------
> 
> 
> Thanks,
> 
> Jiangjie Qin
> 
>


Re: Review Request 25995: Patch for KAFKA-1650

Posted by Jiangjie Qin <be...@gmail.com>.

> On Dec. 3, 2014, 6:57 p.m., Guozhang Wang wrote:
> > core/src/main/scala/kafka/tools/MirrorMaker.scala, line 613
> > <https://reviews.apache.org/r/25995/diff/7/?file=774325#file774325line613>
> >
> >     I think there may be a race condition here, for example consider this sequence:
> >     
> >     1. data channel only contain one message.
> >     2. producer take the message from channel.
> >     3. dataChannel.clear() called.
> >     4. numMessageUnacked.get() == 0, offsets committed.
> >     5. producer.send() called, increment numMessageUnacked.
> >     6. data duplicate happens when the rebalance finished.
> >     
> >     I think on line 599 we should use "while" instead of "if", but this alone does not fix this.

Yes, I actually have comment on this race condition in line 581. The reason I'm not handling it here is:
1. The chance of this situation is very slight.
2. A single duplicate message does not really hurt.
3. The fix increase the complexity of the code (looking into the producer thread status) and I'm not sure if it worth doing.
4. Even if we fix this, from the producer side, duplicates could still happen.


> On Dec. 3, 2014, 6:57 p.m., Guozhang Wang wrote:
> > core/src/main/scala/kafka/tools/MirrorMaker.scala, line 96
> > <https://reviews.apache.org/r/25995/diff/7/?file=774325#file774325line96>
> >
> >     Could you add a comment here about the in-flight-request config and its effects?

The comments was put at the very beginning with a note. Maybe we can put a comment referring to that note.


> On Dec. 3, 2014, 6:57 p.m., Guozhang Wang wrote:
> > core/src/main/scala/kafka/tools/MirrorMaker.scala, line 186
> > <https://reviews.apache.org/r/25995/diff/7/?file=774325#file774325line186>
> >
> >     I am wondering are there some scenarios we want to allow customized rebalance listener? Also if we decide to make this customizable we need to make it clear that the customized listener would expect the datachannel as its constructor since this is not checked at compile time.

Yes, we do foresee some usecases for this customized rebalance listener. I'll add the following comments:
"Customized consumer rebalance listener should extends MirrorMakerConsumerRebalanceListener and take datachannel as argument."


- Jiangjie


-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/25995/#review63703
-----------------------------------------------------------


On Nov. 24, 2014, 4:15 p.m., Jiangjie Qin wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/25995/
> -----------------------------------------------------------
> 
> (Updated Nov. 24, 2014, 4:15 p.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1650 and KAKFA-1650
>     https://issues.apache.org/jira/browse/KAFKA-1650
>     https://issues.apache.org/jira/browse/KAKFA-1650
> 
> 
> Repository: kafka
> 
> 
> Description
> -------
> 
> Addressed Guozhang's comments.
> 
> 
> Addressed Guozhang's comments
> 
> 
> commit before switch to trunk
> 
> 
> commit before rebase
> 
> 
> Rebased on trunk, Addressed Guozhang's comments.
> 
> 
> Addressed Guozhang's comments on MaxInFlightRequests
> 
> 
> Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into mirrormaker-redesign
> 
> 
> Incorporated Guozhang's comments
> 
> 
> Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into mirrormaker-redesign
> 
> 
> Merged KAFKA-345 into this patch. Incorporated Joel and Jun's comments.
> 
> 
> Added consumer rebalance listener to mirror maker, will test it later.
> 
> 
> Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into mirrormaker-redesign
> 
> Conflicts:
> 	core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
> 	core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala
> 
> added custom config for consumer rebalance listener
> 
> 
> Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into mirrormaker-redesign
> 
> 
> Add configurable consumer rebalance listener
> 
> 
> Diffs
> -----
> 
>   core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala 3e1718bc7ca6c835a59ca7c6879f558ec06866ee 
>   core/src/main/scala/kafka/javaapi/consumer/ConsumerRebalanceListener.java PRE-CREATION 
>   core/src/main/scala/kafka/javaapi/consumer/ZookeeperConsumerConnector.scala 9d5a47fb8e04d0055cce820afde7f73affc0a984 
>   core/src/main/scala/kafka/tools/MirrorMaker.scala f399105087588946987bbc84e3759935d9498b6a 
>   core/src/main/scala/kafka/utils/ByteBoundedBlockingQueue.scala 6a85d7e494f6c88798133a17f6180b61029dff58 
>   core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala 8c4687b2c96fddab7c8454a5a8011c3bab0897a0 
> 
> Diff: https://reviews.apache.org/r/25995/diff/
> 
> 
> Testing
> -------
> 
> 
> Thanks,
> 
> Jiangjie Qin
> 
>


Re: Review Request 25995: Patch for KAFKA-1650

Posted by Jiangjie Qin <be...@gmail.com>.

> On Dec. 3, 2014, 6:57 p.m., Guozhang Wang wrote:
> > core/src/main/scala/kafka/tools/MirrorMaker.scala, line 613
> > <https://reviews.apache.org/r/25995/diff/7/?file=774325#file774325line613>
> >
> >     I think there may be a race condition here, for example consider this sequence:
> >     
> >     1. data channel only contain one message.
> >     2. producer take the message from channel.
> >     3. dataChannel.clear() called.
> >     4. numMessageUnacked.get() == 0, offsets committed.
> >     5. producer.send() called, increment numMessageUnacked.
> >     6. data duplicate happens when the rebalance finished.
> >     
> >     I think on line 599 we should use "while" instead of "if", but this alone does not fix this.
> 
> Jiangjie Qin wrote:
>     Yes, I actually have comment on this race condition in line 581. The reason I'm not handling it here is:
>     1. The chance of this situation is very slight.
>     2. A single duplicate message does not really hurt.
>     3. The fix increase the complexity of the code (looking into the producer thread status) and I'm not sure if it worth doing.
>     4. Even if we fix this, from the producer side, duplicates could still happen.
> 
> Guozhang Wang wrote:
>     Shall we change line 691 to "while (numMessageUnacked.get() > 0)" at least?

Yes, it should be a while loop, forgot to change it...


- Jiangjie


-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/25995/#review63703
-----------------------------------------------------------


On Dec. 4, 2014, 3:02 a.m., Jiangjie Qin wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/25995/
> -----------------------------------------------------------
> 
> (Updated Dec. 4, 2014, 3:02 a.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1650 and KAKFA-1650
>     https://issues.apache.org/jira/browse/KAFKA-1650
>     https://issues.apache.org/jira/browse/KAKFA-1650
> 
> 
> Repository: kafka
> 
> 
> Description
> -------
> 
> Addressed Guozhang's comments.
> 
> 
> Addressed Guozhang's comments
> 
> 
> commit before switch to trunk
> 
> 
> commit before rebase
> 
> 
> Rebased on trunk, Addressed Guozhang's comments.
> 
> 
> Addressed Guozhang's comments on MaxInFlightRequests
> 
> 
> Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into mirrormaker-redesign
> 
> 
> Incorporated Guozhang's comments
> 
> 
> Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into mirrormaker-redesign
> 
> 
> Merged KAFKA-345 into this patch. Incorporated Joel and Jun's comments.
> 
> 
> Added consumer rebalance listener to mirror maker, will test it later.
> 
> 
> Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into mirrormaker-redesign
> 
> Conflicts:
> 	core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
> 	core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala
> 
> added custom config for consumer rebalance listener
> 
> 
> Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into mirrormaker-redesign
> 
> 
> Add configurable consumer rebalance listener
> 
> 
> Incorporated Guozhang's comments
> 
> 
> Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into mirrormaker-redesign
> 
> 
> Incorporated Guozhang's comments.
> 
> 
> Addressed Guozhang's comment.
> 
> 
> Diffs
> -----
> 
>   core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala da29a8cb461099eb675161db2f11a9937424a5c6 
>   core/src/main/scala/kafka/javaapi/consumer/ConsumerRebalanceListener.java PRE-CREATION 
>   core/src/main/scala/kafka/javaapi/consumer/ZookeeperConsumerConnector.scala 9d5a47fb8e04d0055cce820afde7f73affc0a984 
>   core/src/main/scala/kafka/tools/MirrorMaker.scala f399105087588946987bbc84e3759935d9498b6a 
>   core/src/main/scala/kafka/utils/ByteBoundedBlockingQueue.scala 6a85d7e494f6c88798133a17f6180b61029dff58 
>   core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala 8c4687b2c96fddab7c8454a5a8011c3bab0897a0 
> 
> Diff: https://reviews.apache.org/r/25995/diff/
> 
> 
> Testing
> -------
> 
> 
> Thanks,
> 
> Jiangjie Qin
> 
>