You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by Jiangjie Qin <be...@gmail.com> on 2015/01/20 05:26:34 UTC

Review Request 30063: Patch for KAFKA-1840

-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/30063/
-----------------------------------------------------------

Review request for kafka.


Bugs: KAFKA-1840
    https://issues.apache.org/jira/browse/KAFKA-1840


Repository: kafka


Description
-------

Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into KAFKA-1840


Diffs
-----

  core/src/main/scala/kafka/tools/MirrorMaker.scala 5cbc8103e33a0a234d158c048e5314e841da6249 

Diff: https://reviews.apache.org/r/30063/diff/


Testing
-------


Thanks,

Jiangjie Qin


Re: Review Request 30063: Patch for KAFKA-1840

Posted by Jiangjie Qin <be...@gmail.com>.

> On Feb. 5, 2015, 2:15 a.m., Guozhang Wang wrote:
> > core/src/main/scala/kafka/tools/MirrorMaker.scala, lines 687-689
> > <https://reviews.apache.org/r/30063/diff/4/?file=843092#file843092line687>
> >
> >     Is there an expected use case for one-to-many handlers?

Yes, for example some user might want to accumulate a trasaction until it either finishes or aborts.


> On Feb. 5, 2015, 2:15 a.m., Guozhang Wang wrote:
> > core/src/main/scala/kafka/tools/MirrorMaker.scala, lines 182-186
> > <https://reviews.apache.org/r/30063/diff/4/?file=843092#file843092line182>
> >
> >     How about use the "embeded handler config file" as we did for producer and consumer, this gives us more flexibility as well as better operatibility.

I also felt a little bit less strict for this handler argument and thought over this. For producer/consumer config are all key-value pairs, but for handler it might not be the case. In current way, we can let user to decide whether they want to use a config file (they can always put a file name as commandline argument) or just use some simple commandline config parameters without creating a config file. So we let user interpret what the command line argument. What do you think?


- Jiangjie


-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/30063/#review71132
-----------------------------------------------------------


On Feb. 1, 2015, 8:17 a.m., Jiangjie Qin wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/30063/
> -----------------------------------------------------------
> 
> (Updated Feb. 1, 2015, 8:17 a.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1840
>     https://issues.apache.org/jira/browse/KAFKA-1840
> 
> 
> Repository: kafka
> 
> 
> Description
> -------
> 
> Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into KAFKA-1840
> 
> 
> Addressed Joel's comments
> 
> 
> Allow message handler to specify partitions for produce
> 
> 
> Addressed Joel's comments
> 
> 
> Addressed previous reviews.
> 
> 
> Diffs
> -----
> 
>   core/src/main/scala/kafka/tools/MirrorMaker.scala 81ae205ef7b2050d0152f29f8da7dd91b17b8b00 
> 
> Diff: https://reviews.apache.org/r/30063/diff/
> 
> 
> Testing
> -------
> 
> 
> Thanks,
> 
> Jiangjie Qin
> 
>


Re: Review Request 30063: Patch for KAFKA-1840

Posted by Guozhang Wang <wa...@gmail.com>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/30063/#review71132
-----------------------------------------------------------



core/src/main/scala/kafka/tools/MirrorMaker.scala
<https://reviews.apache.org/r/30063/#comment116758>

    How about use the "embeded handler config file" as we did for producer and consumer, this gives us more flexibility as well as better operatibility.



core/src/main/scala/kafka/tools/MirrorMaker.scala
<https://reviews.apache.org/r/30063/#comment116761>

    Is there an expected use case for one-to-many handlers?


- Guozhang Wang


On Feb. 1, 2015, 8:17 a.m., Jiangjie Qin wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/30063/
> -----------------------------------------------------------
> 
> (Updated Feb. 1, 2015, 8:17 a.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1840
>     https://issues.apache.org/jira/browse/KAFKA-1840
> 
> 
> Repository: kafka
> 
> 
> Description
> -------
> 
> Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into KAFKA-1840
> 
> 
> Addressed Joel's comments
> 
> 
> Allow message handler to specify partitions for produce
> 
> 
> Addressed Joel's comments
> 
> 
> Addressed previous reviews.
> 
> 
> Diffs
> -----
> 
>   core/src/main/scala/kafka/tools/MirrorMaker.scala 81ae205ef7b2050d0152f29f8da7dd91b17b8b00 
> 
> Diff: https://reviews.apache.org/r/30063/diff/
> 
> 
> Testing
> -------
> 
> 
> Thanks,
> 
> Jiangjie Qin
> 
>


Re: Review Request 30063: Patch for KAFKA-1840

Posted by Guozhang Wang <wa...@gmail.com>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/30063/#review71482
-----------------------------------------------------------

Ship it!


Ship It!

- Guozhang Wang


On Feb. 1, 2015, 8:17 a.m., Jiangjie Qin wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/30063/
> -----------------------------------------------------------
> 
> (Updated Feb. 1, 2015, 8:17 a.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1840
>     https://issues.apache.org/jira/browse/KAFKA-1840
> 
> 
> Repository: kafka
> 
> 
> Description
> -------
> 
> Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into KAFKA-1840
> 
> 
> Addressed Joel's comments
> 
> 
> Allow message handler to specify partitions for produce
> 
> 
> Addressed Joel's comments
> 
> 
> Addressed previous reviews.
> 
> 
> Diffs
> -----
> 
>   core/src/main/scala/kafka/tools/MirrorMaker.scala 81ae205ef7b2050d0152f29f8da7dd91b17b8b00 
> 
> Diff: https://reviews.apache.org/r/30063/diff/
> 
> 
> Testing
> -------
> 
> 
> Thanks,
> 
> Jiangjie Qin
> 
>


Re: Review Request 30063: Patch for KAFKA-1840

Posted by Jiangjie Qin <be...@gmail.com>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/30063/
-----------------------------------------------------------

(Updated Feb. 1, 2015, 8:17 a.m.)


Review request for kafka.


Bugs: KAFKA-1840
    https://issues.apache.org/jira/browse/KAFKA-1840


Repository: kafka


Description (updated)
-------

Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into KAFKA-1840


Addressed Joel's comments


Allow message handler to specify partitions for produce


Addressed Joel's comments


Addressed previous reviews.


Diffs (updated)
-----

  core/src/main/scala/kafka/tools/MirrorMaker.scala 81ae205ef7b2050d0152f29f8da7dd91b17b8b00 

Diff: https://reviews.apache.org/r/30063/diff/


Testing
-------


Thanks,

Jiangjie Qin


Re: Review Request 30063: Patch for KAFKA-1840

Posted by Jiangjie Qin <be...@gmail.com>.

> On Feb. 1, 2015, 6:34 a.m., Gwen Shapira wrote:
> > I like the idea of a message handler in MirrorMaker, but I think we can do better. Let me know if you think I'm taking it far beyond your original scope... I can add it as a follow up jira.
> > 
> > 1. I think we need to let users pass parameters to the handlers. We need a "configure" or "init" method in the handler, which MirrorMaker will call once with the right properties and the handler can use them for basic setup. For example, imagine a "regexp filter" handler - I get a regexp from the commandline and filter messages that don't match. My "init" method will set up the regexp so it will be available for all handle() calls.
> > 2. I think the handle() method should take List<Record> as input, not just a Record. MirrorMaker will be able to consume until it fills a batch (or until we waited too long), "handle" a batch - which will be able to use Scala's Sequence operators - Filter, Map, etc, and then produce an entire list of records. This sounds more efficient to me. 
> > 
> > Another comment - I think the addition of "destination partition" is unrelated to this change and Jira?

I think it's a great idea to extend message handler to take commandline arguments. I'll add that to the this patch.
I'm not sure I understand why it will improve performance by passing in a list of record. Shouldn't the message handler decide whether it wants to buffer some records or not? Some worries about buffering messages outside message handler are:
1. It is usually very difficult to decide how many messages to buffer because it is likely depending on the business logic and could vary from time to time.
2. It might further increase the delay.

The addition of destination partition is actually mainly to serve partition to partition copy, which is something related to message handler. I feel it could be put into either KAFKA-1840 or KAFKA-1839. I put it here as it seems to be more related to the message handler because the handler has to assigne partition explicitly. 

Maybe as a follow up patch, I'm thinking that we can add a commandline option of --mirror-cluster so it does the following:
1. Automatically create same number of partitions in target cluster when new topic is created in source cluster. (Based on KAFKA-1839)
2. Produce the messages to the destination partition same as the source partition. (Follow-up patch for KAFKA-1840)


- Jiangjie


-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/30063/#review70523
-----------------------------------------------------------


On Jan. 31, 2015, 2:25 a.m., Jiangjie Qin wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/30063/
> -----------------------------------------------------------
> 
> (Updated Jan. 31, 2015, 2:25 a.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1840
>     https://issues.apache.org/jira/browse/KAFKA-1840
> 
> 
> Repository: kafka
> 
> 
> Description
> -------
> 
> Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into KAFKA-1840
> 
> 
> Addressed Joel's comments
> 
> 
> Allow message handler to specify partitions for produce
> 
> 
> Diffs
> -----
> 
>   core/src/main/scala/kafka/tools/MirrorMaker.scala 81ae205ef7b2050d0152f29f8da7dd91b17b8b00 
> 
> Diff: https://reviews.apache.org/r/30063/diff/
> 
> 
> Testing
> -------
> 
> 
> Thanks,
> 
> Jiangjie Qin
> 
>


Re: Review Request 30063: Patch for KAFKA-1840

Posted by Gwen Shapira <gs...@cloudera.com>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/30063/#review70523
-----------------------------------------------------------


I like the idea of a message handler in MirrorMaker, but I think we can do better. Let me know if you think I'm taking it far beyond your original scope... I can add it as a follow up jira.

1. I think we need to let users pass parameters to the handlers. We need a "configure" or "init" method in the handler, which MirrorMaker will call once with the right properties and the handler can use them for basic setup. For example, imagine a "regexp filter" handler - I get a regexp from the commandline and filter messages that don't match. My "init" method will set up the regexp so it will be available for all handle() calls.
2. I think the handle() method should take List<Record> as input, not just a Record. MirrorMaker will be able to consume until it fills a batch (or until we waited too long), "handle" a batch - which will be able to use Scala's Sequence operators - Filter, Map, etc, and then produce an entire list of records. This sounds more efficient to me. 

Another comment - I think the addition of "destination partition" is unrelated to this change and Jira?

- Gwen Shapira


On Jan. 31, 2015, 2:25 a.m., Jiangjie Qin wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/30063/
> -----------------------------------------------------------
> 
> (Updated Jan. 31, 2015, 2:25 a.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1840
>     https://issues.apache.org/jira/browse/KAFKA-1840
> 
> 
> Repository: kafka
> 
> 
> Description
> -------
> 
> Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into KAFKA-1840
> 
> 
> Addressed Joel's comments
> 
> 
> Allow message handler to specify partitions for produce
> 
> 
> Diffs
> -----
> 
>   core/src/main/scala/kafka/tools/MirrorMaker.scala 81ae205ef7b2050d0152f29f8da7dd91b17b8b00 
> 
> Diff: https://reviews.apache.org/r/30063/diff/
> 
> 
> Testing
> -------
> 
> 
> Thanks,
> 
> Jiangjie Qin
> 
>


Re: Review Request 30063: Patch for KAFKA-1840

Posted by Eric Olander <ol...@gmail.com>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/30063/#review70509
-----------------------------------------------------------



core/src/main/scala/kafka/tools/MirrorMaker.scala
<https://reviews.apache.org/r/30063/#comment115724>

    Class name should be capitalized.  Can this be an object instead of a class?


- Eric Olander


On Jan. 31, 2015, 2:25 a.m., Jiangjie Qin wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/30063/
> -----------------------------------------------------------
> 
> (Updated Jan. 31, 2015, 2:25 a.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1840
>     https://issues.apache.org/jira/browse/KAFKA-1840
> 
> 
> Repository: kafka
> 
> 
> Description
> -------
> 
> Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into KAFKA-1840
> 
> 
> Addressed Joel's comments
> 
> 
> Allow message handler to specify partitions for produce
> 
> 
> Diffs
> -----
> 
>   core/src/main/scala/kafka/tools/MirrorMaker.scala 81ae205ef7b2050d0152f29f8da7dd91b17b8b00 
> 
> Diff: https://reviews.apache.org/r/30063/diff/
> 
> 
> Testing
> -------
> 
> 
> Thanks,
> 
> Jiangjie Qin
> 
>


Re: Review Request 30063: Patch for KAFKA-1840

Posted by Eric Olander <ol...@gmail.com>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/30063/#review70508
-----------------------------------------------------------



core/src/main/scala/kafka/tools/MirrorMaker.scala
<https://reviews.apache.org/r/30063/#comment115723>

    Similar to below comment, lines 220-226 are equivalent to:
    
    val customRebalanceListener = Option(options.valueOf(consumerRebalanceListenerOpt)).map {
      customRebalanceListenerClass => Utils.createObject[ConsumerRebalanceListener](customRebalanceListenerClass)
    }



core/src/main/scala/kafka/tools/MirrorMaker.scala
<https://reviews.apache.org/r/30063/#comment115722>

    Something to consider:
    
    mirrorMakerMessageHandler = Option(options.valueOf(mirrorMakerMessageHandlerOpt)).map {
       mirrorMakerMessageHandlerClass => Utils.createObject[MirrorMakerMessageHandler](mirrorMakerMessageHandlerClass) 
    }.getOrElse(new defaultMirrorMakerMessageHandler)


- Eric Olander


On Jan. 31, 2015, 2:25 a.m., Jiangjie Qin wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/30063/
> -----------------------------------------------------------
> 
> (Updated Jan. 31, 2015, 2:25 a.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1840
>     https://issues.apache.org/jira/browse/KAFKA-1840
> 
> 
> Repository: kafka
> 
> 
> Description
> -------
> 
> Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into KAFKA-1840
> 
> 
> Addressed Joel's comments
> 
> 
> Allow message handler to specify partitions for produce
> 
> 
> Diffs
> -----
> 
>   core/src/main/scala/kafka/tools/MirrorMaker.scala 81ae205ef7b2050d0152f29f8da7dd91b17b8b00 
> 
> Diff: https://reviews.apache.org/r/30063/diff/
> 
> 
> Testing
> -------
> 
> 
> Thanks,
> 
> Jiangjie Qin
> 
>


Re: Review Request 30063: Patch for KAFKA-1840

Posted by Jiangjie Qin <be...@gmail.com>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/30063/
-----------------------------------------------------------

(Updated Jan. 31, 2015, 2:25 a.m.)


Review request for kafka.


Bugs: KAFKA-1840
    https://issues.apache.org/jira/browse/KAFKA-1840


Repository: kafka


Description (updated)
-------

Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into KAFKA-1840


Addressed Joel's comments


Allow message handler to specify partitions for produce


Diffs (updated)
-----

  core/src/main/scala/kafka/tools/MirrorMaker.scala 81ae205ef7b2050d0152f29f8da7dd91b17b8b00 

Diff: https://reviews.apache.org/r/30063/diff/


Testing
-------


Thanks,

Jiangjie Qin


Re: Review Request 30063: Patch for KAFKA-1840

Posted by Jiangjie Qin <be...@gmail.com>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/30063/
-----------------------------------------------------------

(Updated Jan. 20, 2015, 7:36 p.m.)


Review request for kafka.


Bugs: KAFKA-1840
    https://issues.apache.org/jira/browse/KAFKA-1840


Repository: kafka


Description (updated)
-------

Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into KAFKA-1840


Addressed Joel's comments


Diffs (updated)
-----

  core/src/main/scala/kafka/tools/MirrorMaker.scala 5cbc8103e33a0a234d158c048e5314e841da6249 

Diff: https://reviews.apache.org/r/30063/diff/


Testing
-------


Thanks,

Jiangjie Qin


Re: Review Request 30063: Patch for KAFKA-1840

Posted by Joel Koshy <jj...@gmail.com>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/30063/#review68750
-----------------------------------------------------------



core/src/main/scala/kafka/tools/MirrorMaker.scala
<https://reviews.apache.org/r/30063/#comment113160>

    ```
    val customRebalanceListenerOpt = if (customRebalanceListenerClass != null)
      Some(Utils.createObject...)
    else
      None
    ...
    consumerRebalanceListener = new InternalRebalanceListener(mirrorDataChannel, customRebalanceListenerOpt)
      
    ```



core/src/main/scala/kafka/tools/MirrorMaker.scala
<https://reviews.apache.org/r/30063/#comment113161>

    See comment above


- Joel Koshy


On Jan. 20, 2015, 4:26 a.m., Jiangjie Qin wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/30063/
> -----------------------------------------------------------
> 
> (Updated Jan. 20, 2015, 4:26 a.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1840
>     https://issues.apache.org/jira/browse/KAFKA-1840
> 
> 
> Repository: kafka
> 
> 
> Description
> -------
> 
> Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into KAFKA-1840
> 
> 
> Diffs
> -----
> 
>   core/src/main/scala/kafka/tools/MirrorMaker.scala 5cbc8103e33a0a234d158c048e5314e841da6249 
> 
> Diff: https://reviews.apache.org/r/30063/diff/
> 
> 
> Testing
> -------
> 
> 
> Thanks,
> 
> Jiangjie Qin
> 
>