You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by Guozhang Wang <gu...@linkedin.com> on 2013/11/25 21:53:23 UTC

Re: Review Request 15805: KAFKA-1140.v2: addressed Jun's comments

-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/15805/
-----------------------------------------------------------

(Updated Nov. 25, 2013, 8:53 p.m.)


Review request for kafka.


Summary (updated)
-----------------

KAFKA-1140.v2: addressed Jun's comments


Bugs: KAFKA-1140
    https://issues.apache.org/jira/browse/KAFKA-1140


Repository: kafka


Description
-------

KAFKA-1140.v1


Dummy


Diffs (updated)
-----

  core/src/main/scala/kafka/consumer/ConsumerIterator.scala a4227a49684c7de08e07cb1f3a10d2f76ba28da7 

Diff: https://reviews.apache.org/r/15805/diff/


Testing
-------


Thanks,

Guozhang Wang


Re: Review Request 15805: KAFKA-1140.v2: addressed Jun's comments

Posted by Jun Rao <ju...@gmail.com>.
Yes, we will have to pass in the decoders to MessageAndMetadata.

Thanks,

Jun


On Tue, Nov 26, 2013 at 10:09 AM, Guozhang Wang <gu...@linkedin.com> wrote:

>  Does this enforce the key/value decoder to be passed also into
> MessageAndMetadata?
>
> Guozhang
>  ------------------------------
> *From:* Jun Rao [noreply@reviews.apache.org] on behalf of Jun Rao [
> junrao@gmail.com]
> *Sent:* Tuesday, November 26, 2013 9:30 AM
> *To:* kafka; Jun Rao; Guozhang Wang
> *Subject:* Re: Review Request 15805: KAFKA-1140.v2: addressed Jun's
> comments
>
>     This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/15805/
>
> Thinking about this a bit more. It seems that a better approach is to move the decoding into MessageAndMetadata. We define two methods key() and message() that do the decoding and throw a RuntimeException back to the caller if decoding fails. This way, the client can still get the metadata (offset, partitionId, etc) associated with a message even when decoding fails.
>
>
> - Jun Rao
>
> On November 25th, 2013, 8:55 p.m. UTC, Guozhang Wang wrote:
>   Review request for kafka.
> By Guozhang Wang.
>
> *Updated Nov. 25, 2013, 8:55 p.m.*
> *Bugs: *KAFKA-1140 <https://issues.apache.org/jira/browse/KAFKA-1140>
> *Repository: *kafka
> Description
>
> KAFKA-1140.v2
>
>
> KAFKA-1140.v1
>
>
> Dummy
>
>   Diffs
>
>    - core/src/main/scala/kafka/consumer/ConsumerIterator.scala
>    (a4227a49684c7de08e07cb1f3a10d2f76ba28da7)
>    - core/src/test/scala/unit/kafka/consumer/ConsumerIteratorTest.scala
>    (ef1de8321c713cd9d27ef937216f5b76a5d8c574)
>
> View Diff <https://reviews.apache.org/r/15805/diff/>
>

RE: Review Request 15805: KAFKA-1140.v2: addressed Jun's comments

Posted by Guozhang Wang <gu...@linkedin.com>.
Does this enforce the key/value decoder to be passed also into MessageAndMetadata?

Guozhang
________________________________
From: Jun Rao [noreply@reviews.apache.org] on behalf of Jun Rao [junrao@gmail.com]
Sent: Tuesday, November 26, 2013 9:30 AM
To: kafka; Jun Rao; Guozhang Wang
Subject: Re: Review Request 15805: KAFKA-1140.v2: addressed Jun's comments

This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/15805/


Thinking about this a bit more. It seems that a better approach is to move the decoding into MessageAndMetadata. We define two methods key() and message() that do the decoding and throw a RuntimeException back to the caller if decoding fails. This way, the client can still get the metadata (offset, partitionId, etc) associated with a message even when decoding fails.


- Jun Rao


On November 25th, 2013, 8:55 p.m. UTC, Guozhang Wang wrote:

Review request for kafka.
By Guozhang Wang.

Updated Nov. 25, 2013, 8:55 p.m.

Bugs: KAFKA-1140<https://issues.apache.org/jira/browse/KAFKA-1140>
Repository: kafka
Description

KAFKA-1140.v2


KAFKA-1140.v1


Dummy


Diffs

  *   core/src/main/scala/kafka/consumer/ConsumerIterator.scala (a4227a49684c7de08e07cb1f3a10d2f76ba28da7)
  *   core/src/test/scala/unit/kafka/consumer/ConsumerIteratorTest.scala (ef1de8321c713cd9d27ef937216f5b76a5d8c574)

View Diff<https://reviews.apache.org/r/15805/diff/>


Re: Review Request 15805: KAFKA-1140.v2: addressed Jun's comments

Posted by Jun Rao <ju...@gmail.com>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/15805/#review29438
-----------------------------------------------------------


Thinking about this a bit more. It seems that a better approach is to move the decoding into MessageAndMetadata. We define two methods key() and message() that do the decoding and throw a RuntimeException back to the caller if decoding fails. This way, the client can still get the metadata (offset, partitionId, etc) associated with a message even when decoding fails.

- Jun Rao


On Nov. 25, 2013, 8:55 p.m., Guozhang Wang wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/15805/
> -----------------------------------------------------------
> 
> (Updated Nov. 25, 2013, 8:55 p.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1140
>     https://issues.apache.org/jira/browse/KAFKA-1140
> 
> 
> Repository: kafka
> 
> 
> Description
> -------
> 
> KAFKA-1140.v2
> 
> 
> KAFKA-1140.v1
> 
> 
> Dummy
> 
> 
> Diffs
> -----
> 
>   core/src/main/scala/kafka/consumer/ConsumerIterator.scala a4227a49684c7de08e07cb1f3a10d2f76ba28da7 
>   core/src/test/scala/unit/kafka/consumer/ConsumerIteratorTest.scala ef1de8321c713cd9d27ef937216f5b76a5d8c574 
> 
> Diff: https://reviews.apache.org/r/15805/diff/
> 
> 
> Testing
> -------
> 
> 
> Thanks,
> 
> Guozhang Wang
> 
>


Re: Review Request 15805: KAFKA-1140.v4

Posted by Guozhang Wang <gu...@linkedin.com>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/15805/
-----------------------------------------------------------

(Updated Nov. 27, 2013, 2:29 a.m.)


Review request for kafka.


Summary (updated)
-----------------

KAFKA-1140.v4


Bugs: KAFKA-1140
    https://issues.apache.org/jira/browse/KAFKA-1140


Repository: kafka


Description (updated)
-------

KAFKA-1140.v4


Diffs (updated)
-----

  core/src/main/scala/kafka/consumer/ConsumerIterator.scala a4227a49684c7de08e07cb1f3a10d2f76ba28da7 
  core/src/main/scala/kafka/message/MessageAndMetadata.scala 20c0e7004282c5a71228bc04adab7f1997cb5b98 
  core/src/test/scala/unit/kafka/consumer/ConsumerIteratorTest.scala ef1de8321c713cd9d27ef937216f5b76a5d8c574 

Diff: https://reviews.apache.org/r/15805/diff/


Testing
-------


Thanks,

Guozhang Wang


Re: Review Request 15805: KAFKA-1140.v3: addressed more of Jun's comments, changed the API of MessageAndMetadata

Posted by Jun Rao <ju...@gmail.com>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/15805/#review29465
-----------------------------------------------------------



core/src/main/scala/kafka/consumer/ConsumerIterator.scala
<https://reviews.apache.org/r/15805/#comment56691>

    It seems that we can leave ConsumerIterator mostly unchanged and just instantiate MessageAndMetadata differently.



core/src/main/scala/kafka/message/MessageAndMetadata.scala
<https://reviews.apache.org/r/15805/#comment56692>

    We should name this method message() to maintain the same api. Then, there is no need for code change in consumer apps.



core/src/test/scala/unit/kafka/consumer/ConsumerIteratorTest.scala
<https://reviews.apache.org/r/15805/#comment56694>

    Could we reference it as the predefined value ConsumerConfig.ConsumerTimeoutMs?



core/src/test/scala/unit/kafka/consumer/ConsumerIteratorTest.scala
<https://reviews.apache.org/r/15805/#comment56693>

    Let's make sure iter.hasNext() returns true too.


- Jun Rao


On Nov. 26, 2013, 10:41 p.m., Guozhang Wang wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/15805/
> -----------------------------------------------------------
> 
> (Updated Nov. 26, 2013, 10:41 p.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1140
>     https://issues.apache.org/jira/browse/KAFKA-1140
> 
> 
> Repository: kafka
> 
> 
> Description
> -------
> 
> KAFKA-1140.v3
> 
> 
> KAFKA-1140.v2
> 
> 
> KAFKA-1140.v1
> 
> 
> Dummy
> 
> 
> Diffs
> -----
> 
>   core/src/main/scala/kafka/consumer/ConsoleConsumer.scala dc066c23e9c221ebe4ba9fcdcde5ba26f959095e 
>   core/src/main/scala/kafka/consumer/ConsumerIterator.scala a4227a49684c7de08e07cb1f3a10d2f76ba28da7 
>   core/src/main/scala/kafka/message/MessageAndMetadata.scala 20c0e7004282c5a71228bc04adab7f1997cb5b98 
>   core/src/main/scala/kafka/tools/MirrorMaker.scala f0f871c22dea30ae66508d0ba00597e3254eab72 
>   core/src/main/scala/kafka/tools/ReplayLogProducer.scala 814d61ae477cef5e56723e1f1a86c3b2e9b7c1ea 
>   core/src/test/scala/other/kafka/TestLogCleaning.scala 22b16e54980e30f60dbf2fbe46c16398d654ba21 
>   core/src/test/scala/other/kafka/TestZKConsumerOffsets.scala 31534ca303e91853e707644fde96783cfbe4322e 
>   core/src/test/scala/unit/kafka/consumer/ConsumerIteratorTest.scala ef1de8321c713cd9d27ef937216f5b76a5d8c574 
>   core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala 8fe7259c9abd2c883e423a1ea7ea19c367f0c1a4 
>   core/src/test/scala/unit/kafka/javaapi/consumer/ZookeeperConsumerConnectorTest.scala 43af649f3297644e8caea176a79cadde745be2a4 
>   examples/src/main/java/kafka/examples/Consumer.java 13135b954f3078eeb7394822b0db25470b746f03 
>   perf/src/main/scala/kafka/perf/ConsumerPerformance.scala ec3cd295266e162cd884799b491ae48a51ac8f10 
> 
> Diff: https://reviews.apache.org/r/15805/diff/
> 
> 
> Testing
> -------
> 
> 
> Thanks,
> 
> Guozhang Wang
> 
>


Re: Review Request 15805: KAFKA-1140.v3: addressed more of Jun's comments, changed the API of MessageAndMetadata

Posted by Guozhang Wang <gu...@linkedin.com>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/15805/
-----------------------------------------------------------

(Updated Nov. 26, 2013, 10:41 p.m.)


Review request for kafka.


Summary (updated)
-----------------

KAFKA-1140.v3: addressed more of Jun's comments, changed the API of MessageAndMetadata


Bugs: KAFKA-1140
    https://issues.apache.org/jira/browse/KAFKA-1140


Repository: kafka


Description (updated)
-------

KAFKA-1140.v3


KAFKA-1140.v2


KAFKA-1140.v1


Dummy


Diffs (updated)
-----

  core/src/main/scala/kafka/consumer/ConsoleConsumer.scala dc066c23e9c221ebe4ba9fcdcde5ba26f959095e 
  core/src/main/scala/kafka/consumer/ConsumerIterator.scala a4227a49684c7de08e07cb1f3a10d2f76ba28da7 
  core/src/main/scala/kafka/message/MessageAndMetadata.scala 20c0e7004282c5a71228bc04adab7f1997cb5b98 
  core/src/main/scala/kafka/tools/MirrorMaker.scala f0f871c22dea30ae66508d0ba00597e3254eab72 
  core/src/main/scala/kafka/tools/ReplayLogProducer.scala 814d61ae477cef5e56723e1f1a86c3b2e9b7c1ea 
  core/src/test/scala/other/kafka/TestLogCleaning.scala 22b16e54980e30f60dbf2fbe46c16398d654ba21 
  core/src/test/scala/other/kafka/TestZKConsumerOffsets.scala 31534ca303e91853e707644fde96783cfbe4322e 
  core/src/test/scala/unit/kafka/consumer/ConsumerIteratorTest.scala ef1de8321c713cd9d27ef937216f5b76a5d8c574 
  core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala 8fe7259c9abd2c883e423a1ea7ea19c367f0c1a4 
  core/src/test/scala/unit/kafka/javaapi/consumer/ZookeeperConsumerConnectorTest.scala 43af649f3297644e8caea176a79cadde745be2a4 
  examples/src/main/java/kafka/examples/Consumer.java 13135b954f3078eeb7394822b0db25470b746f03 
  perf/src/main/scala/kafka/perf/ConsumerPerformance.scala ec3cd295266e162cd884799b491ae48a51ac8f10 

Diff: https://reviews.apache.org/r/15805/diff/


Testing
-------


Thanks,

Guozhang Wang


Re: Review Request 15805: KAFKA-1140.v2: addressed Jun's comments

Posted by Guozhang Wang <gu...@linkedin.com>.

> On Nov. 26, 2013, 2:45 a.m., Jun Rao wrote:
> > core/src/test/scala/unit/kafka/consumer/ConsumerIteratorTest.scala, lines 102-103
> > <https://reviews.apache.org/r/15805/diff/3/?file=390866#file390866line102>
> >
> >     We probably should just use ConsumerConfig.ConsumerTimeoutMs here, to make it clear that we don't want to timeout.


> On Nov. 26, 2013, 2:45 a.m., Jun Rao wrote:
> > core/src/test/scala/unit/kafka/consumer/ConsumerIteratorTest.scala, lines 98-99
> > <https://reviews.apache.org/r/15805/diff/3/?file=390866#file390866line98>
> >
> >     We can just add messageSet to queue directly. In this test, topicInfo is irrelevant.
> >

We still need this for checking consumed offset.


> On Nov. 26, 2013, 2:45 a.m., Jun Rao wrote:
> > core/src/test/scala/unit/kafka/consumer/ConsumerIteratorTest.scala, lines 108-126
> > <https://reviews.apache.org/r/15805/diff/3/?file=390866#file390866line108>
> >
> >     To really test that we can iterate over messages with decoding errors, could we have the first half of messages with decoding errors and the second half without, and make sure that we get the correct offsets when iterating messages in the second half?

We can iterate all the messages with decoding errors, but first try to get their offsets and validate they are correct, and only expecting exceptions when calling value() functions.


- Guozhang


-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/15805/#review29412
-----------------------------------------------------------


On Nov. 25, 2013, 8:55 p.m., Guozhang Wang wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/15805/
> -----------------------------------------------------------
> 
> (Updated Nov. 25, 2013, 8:55 p.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1140
>     https://issues.apache.org/jira/browse/KAFKA-1140
> 
> 
> Repository: kafka
> 
> 
> Description
> -------
> 
> KAFKA-1140.v2
> 
> 
> KAFKA-1140.v1
> 
> 
> Dummy
> 
> 
> Diffs
> -----
> 
>   core/src/main/scala/kafka/consumer/ConsumerIterator.scala a4227a49684c7de08e07cb1f3a10d2f76ba28da7 
>   core/src/test/scala/unit/kafka/consumer/ConsumerIteratorTest.scala ef1de8321c713cd9d27ef937216f5b76a5d8c574 
> 
> Diff: https://reviews.apache.org/r/15805/diff/
> 
> 
> Testing
> -------
> 
> 
> Thanks,
> 
> Guozhang Wang
> 
>


Re: Review Request 15805: KAFKA-1140.v2: addressed Jun's comments

Posted by Jun Rao <ju...@gmail.com>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/15805/#review29412
-----------------------------------------------------------



core/src/test/scala/unit/kafka/consumer/ConsumerIteratorTest.scala
<https://reviews.apache.org/r/15805/#comment56636>

    We can just add messageSet to queue directly. In this test, topicInfo is irrelevant.
    



core/src/test/scala/unit/kafka/consumer/ConsumerIteratorTest.scala
<https://reviews.apache.org/r/15805/#comment56637>

    We probably should just use ConsumerConfig.ConsumerTimeoutMs here, to make it clear that we don't want to timeout.



core/src/test/scala/unit/kafka/consumer/ConsumerIteratorTest.scala
<https://reviews.apache.org/r/15805/#comment56640>

    To really test that we can iterate over messages with decoding errors, could we have the first half of messages with decoding errors and the second half without, and make sure that we get the correct offsets when iterating messages in the second half?



core/src/test/scala/unit/kafka/consumer/ConsumerIteratorTest.scala
<https://reviews.apache.org/r/15805/#comment56639>

    Instead of throwing an exception, we should fail the test.



core/src/test/scala/unit/kafka/consumer/ConsumerIteratorTest.scala
<https://reviews.apache.org/r/15805/#comment56638>

    Not sure why this is useful. ConsumedOffset is a val and will never change.


- Jun Rao


On Nov. 25, 2013, 8:55 p.m., Guozhang Wang wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/15805/
> -----------------------------------------------------------
> 
> (Updated Nov. 25, 2013, 8:55 p.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1140
>     https://issues.apache.org/jira/browse/KAFKA-1140
> 
> 
> Repository: kafka
> 
> 
> Description
> -------
> 
> KAFKA-1140.v2
> 
> 
> KAFKA-1140.v1
> 
> 
> Dummy
> 
> 
> Diffs
> -----
> 
>   core/src/main/scala/kafka/consumer/ConsumerIterator.scala a4227a49684c7de08e07cb1f3a10d2f76ba28da7 
>   core/src/test/scala/unit/kafka/consumer/ConsumerIteratorTest.scala ef1de8321c713cd9d27ef937216f5b76a5d8c574 
> 
> Diff: https://reviews.apache.org/r/15805/diff/
> 
> 
> Testing
> -------
> 
> 
> Thanks,
> 
> Guozhang Wang
> 
>


Re: Review Request 15805: KAFKA-1140.v2: addressed Jun's comments

Posted by Guozhang Wang <gu...@linkedin.com>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/15805/
-----------------------------------------------------------

(Updated Nov. 25, 2013, 8:55 p.m.)


Review request for kafka.


Bugs: KAFKA-1140
    https://issues.apache.org/jira/browse/KAFKA-1140


Repository: kafka


Description (updated)
-------

KAFKA-1140.v2


KAFKA-1140.v1


Dummy


Diffs (updated)
-----

  core/src/main/scala/kafka/consumer/ConsumerIterator.scala a4227a49684c7de08e07cb1f3a10d2f76ba28da7 
  core/src/test/scala/unit/kafka/consumer/ConsumerIteratorTest.scala ef1de8321c713cd9d27ef937216f5b76a5d8c574 

Diff: https://reviews.apache.org/r/15805/diff/


Testing
-------


Thanks,

Guozhang Wang