You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by Guozhang Wang <wa...@gmail.com> on 2014/12/01 20:44:39 UTC

Re: Review Request 27391: Fix KAFKA-1634

-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/27391/
-----------------------------------------------------------

(Updated Dec. 1, 2014, 7:44 p.m.)


Review request for kafka.


Bugs: KAFKA-1634
    https://issues.apache.org/jira/browse/KAFKA-1634


Repository: kafka


Description (updated)
-------

Add another api in offset manager to return the struct, and the cache layer will only read its expiration timestamp while the offset formatter will read the struct as a whole


Diffs (updated)
-----

  clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java 7517b879866fc5dad5f8d8ad30636da8bbe7784a 
  clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java 3ee5cbad55ce836fd04bb954dcf6ef2f9bc3288f 
  clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java df37fc6d8f0db0b8192a948426af603be3444da4 
  core/src/main/scala/kafka/api/OffsetCommitRequest.scala 050615c72efe7dbaa4634f53943bd73273d20ffb 
  core/src/main/scala/kafka/api/OffsetFetchRequest.scala c7604b9cdeb8f46507f04ed7d35fcb3d5f150713 
  core/src/main/scala/kafka/common/OffsetMetadataAndError.scala 4cabffeacea09a49913505db19a96a55d58c0909 
  core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala da29a8cb461099eb675161db2f11a9937424a5c6 
  core/src/main/scala/kafka/server/KafkaApis.scala 2a1c0326b6e6966d8b8254bd6a1cb83ad98a3b80 
  core/src/main/scala/kafka/server/KafkaServer.scala 1bf7d10cef23a77e716666eb16bf6d0e68bc4ebe 
  core/src/main/scala/kafka/server/OffsetManager.scala 3c79428962604800983415f6f705e04f52acb8fb 
  core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala cd16ced5465d098be7a60498326b2a98c248f343 
  core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala 8c5364fa97da1be09973c176d1baeb339455d319 

Diff: https://reviews.apache.org/r/27391/diff/


Testing
-------


Thanks,

Guozhang Wang


Re: Review Request 27391: Fix KAFKA-1634

Posted by Guozhang Wang <wa...@gmail.com>.

> On Jan. 8, 2015, 2:24 a.m., Jun Rao wrote:
> > clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java, lines 79-85
> > <https://reviews.apache.org/r/27391/diff/1/?file=743505#file743505line79>
> >
> >     Perhaps these code can just be changed to 
> >     
> >     this(groupId, DEFAULT_GENERATION_ID, DEFAULT_CONSUMER_ID, offsetData);

This cannot be forwarded as in super() call we need to specify the version id.


> On Jan. 8, 2015, 2:24 a.m., Jun Rao wrote:
> > clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java, lines 97-106
> > <https://reviews.apache.org/r/27391/diff/1/?file=743505#file743505line97>
> >
> >     Same here. These code can just be replaced by forwarding the request to the next constructor.

Ditto above.


> On Jan. 8, 2015, 2:24 a.m., Jun Rao wrote:
> > core/src/main/scala/kafka/server/OffsetManager.scala, lines 524-525
> > <https://reviews.apache.org/r/27391/diff/7/?file=779891#file779891line524>
> >
> >     Shouldn't we just set the expiration time field to expirationTimestamp, instead of taking it from offsetAndMetadata?

For v0/1, we should just take the value of the offsetAndMetadata.timestamp, for v2 we will take the value of expirationTimestamp. This has been changed in the latest patch where offsetAndMetadata.timestamp is updated accordingly before calling offsetCommitValue().


- Guozhang


-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/27391/#review67114
-----------------------------------------------------------


On Dec. 2, 2014, 2:03 a.m., Guozhang Wang wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/27391/
> -----------------------------------------------------------
> 
> (Updated Dec. 2, 2014, 2:03 a.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1634
>     https://issues.apache.org/jira/browse/KAFKA-1634
> 
> 
> Repository: kafka
> 
> 
> Description
> -------
> 
> Add another api in offset manager to return the struct, and the cache layer will only read its expiration timestamp while the offset formatter will read the struct as a whole
> 
> 
> Diffs
> -----
> 
>   clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java 7517b879866fc5dad5f8d8ad30636da8bbe7784a 
>   clients/src/main/java/org/apache/kafka/common/protocol/types/Struct.java 121e880a941fcd3e6392859edba11a94236494cc 
>   clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java 3ee5cbad55ce836fd04bb954dcf6ef2f9bc3288f 
>   clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java df37fc6d8f0db0b8192a948426af603be3444da4 
>   core/src/main/scala/kafka/api/OffsetCommitRequest.scala 050615c72efe7dbaa4634f53943bd73273d20ffb 
>   core/src/main/scala/kafka/api/OffsetFetchRequest.scala c7604b9cdeb8f46507f04ed7d35fcb3d5f150713 
>   core/src/main/scala/kafka/common/OffsetMetadataAndError.scala 4cabffeacea09a49913505db19a96a55d58c0909 
>   core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala da29a8cb461099eb675161db2f11a9937424a5c6 
>   core/src/main/scala/kafka/server/KafkaApis.scala 2a1c0326b6e6966d8b8254bd6a1cb83ad98a3b80 
>   core/src/main/scala/kafka/server/KafkaServer.scala 1bf7d10cef23a77e716666eb16bf6d0e68bc4ebe 
>   core/src/main/scala/kafka/server/OffsetManager.scala 3c79428962604800983415f6f705e04f52acb8fb 
>   core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala cd16ced5465d098be7a60498326b2a98c248f343 
>   core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala 8c5364fa97da1be09973c176d1baeb339455d319 
> 
> Diff: https://reviews.apache.org/r/27391/diff/
> 
> 
> Testing
> -------
> 
> 
> Thanks,
> 
> Guozhang Wang
> 
>


Re: Review Request 27391: Fix KAFKA-1634

Posted by Jun Rao <ju...@gmail.com>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/27391/#review67114
-----------------------------------------------------------



clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java
<https://reviews.apache.org/r/27391/#comment110938>

    Perhaps these code can just be changed to 
    
    this(groupId, DEFAULT_GENERATION_ID, DEFAULT_CONSUMER_ID, offsetData);



clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java
<https://reviews.apache.org/r/27391/#comment110939>

    Same here. These code can just be replaced by forwarding the request to the next constructor.



core/src/main/scala/kafka/server/OffsetManager.scala
<https://reviews.apache.org/r/27391/#comment111015>

    Shouldn't we just set the expiration time field to expirationTimestamp, instead of taking it from offsetAndMetadata?



core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala
<https://reviews.apache.org/r/27391/#comment111016>

    Should we remove the commented out code?


- Jun Rao


On Dec. 2, 2014, 2:03 a.m., Guozhang Wang wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/27391/
> -----------------------------------------------------------
> 
> (Updated Dec. 2, 2014, 2:03 a.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1634
>     https://issues.apache.org/jira/browse/KAFKA-1634
> 
> 
> Repository: kafka
> 
> 
> Description
> -------
> 
> Add another api in offset manager to return the struct, and the cache layer will only read its expiration timestamp while the offset formatter will read the struct as a whole
> 
> 
> Diffs
> -----
> 
>   clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java 7517b879866fc5dad5f8d8ad30636da8bbe7784a 
>   clients/src/main/java/org/apache/kafka/common/protocol/types/Struct.java 121e880a941fcd3e6392859edba11a94236494cc 
>   clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java 3ee5cbad55ce836fd04bb954dcf6ef2f9bc3288f 
>   clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java df37fc6d8f0db0b8192a948426af603be3444da4 
>   core/src/main/scala/kafka/api/OffsetCommitRequest.scala 050615c72efe7dbaa4634f53943bd73273d20ffb 
>   core/src/main/scala/kafka/api/OffsetFetchRequest.scala c7604b9cdeb8f46507f04ed7d35fcb3d5f150713 
>   core/src/main/scala/kafka/common/OffsetMetadataAndError.scala 4cabffeacea09a49913505db19a96a55d58c0909 
>   core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala da29a8cb461099eb675161db2f11a9937424a5c6 
>   core/src/main/scala/kafka/server/KafkaApis.scala 2a1c0326b6e6966d8b8254bd6a1cb83ad98a3b80 
>   core/src/main/scala/kafka/server/KafkaServer.scala 1bf7d10cef23a77e716666eb16bf6d0e68bc4ebe 
>   core/src/main/scala/kafka/server/OffsetManager.scala 3c79428962604800983415f6f705e04f52acb8fb 
>   core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala cd16ced5465d098be7a60498326b2a98c248f343 
>   core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala 8c5364fa97da1be09973c176d1baeb339455d319 
> 
> Diff: https://reviews.apache.org/r/27391/diff/
> 
> 
> Testing
> -------
> 
> 
> Thanks,
> 
> Guozhang Wang
> 
>


Re: Review Request 27391: Fix KAFKA-1634

Posted by Guozhang Wang <wa...@gmail.com>.

> On Jan. 22, 2015, 1:56 a.m., Jun Rao wrote:
> > core/src/main/scala/kafka/server/KafkaApis.scala, lines 145-166
> > <https://reviews.apache.org/r/27391/diff/9/?file=829147#file829147line145>
> >
> >     I am not sure that we should change the timestamp for offsets produced in V0 and V1. There could be data in the offset topic already written by 0.8.2 code. See the other comment in OffsetManager on expiring.

I think if it (the commit timestamp) is set to default value -1, we should override it according to the wiki:

https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-OffsetCommitRequest

Otherwise it should not be overriden.


> On Jan. 22, 2015, 1:56 a.m., Jun Rao wrote:
> > core/src/main/scala/kafka/server/OffsetManager.scala, lines 121-123
> > <https://reviews.apache.org/r/27391/diff/9/?file=829150#file829150line121>
> >
> >     Does that change work correctly with offsets already stored in v0 and v1 format using 0.8.2 code? Would those offsets still be expired at the right time?

Changed the logic of overriding commit / expire timestamps as the following:

1. If version <= 1 or retention time is default (-1) override retention time to server default value.
2. If the original time stamp (i.e. the commit timestamp) is set to default (-1), override to the current time.
3. After 2) is done, compute the expire time to be commit timestamp + retention time.
4. Hence the above logic of checking expiration will be compatible (i.e. expiration time < now).


- Guozhang


-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/27391/#review69106
-----------------------------------------------------------


On Jan. 22, 2015, 12:43 a.m., Guozhang Wang wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/27391/
> -----------------------------------------------------------
> 
> (Updated Jan. 22, 2015, 12:43 a.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1634
>     https://issues.apache.org/jira/browse/KAFKA-1634
> 
> 
> Repository: kafka
> 
> 
> Description
> -------
> 
> Incorporated Joel's comments
> 
> 
> Diffs
> -----
> 
>   clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java 7517b879866fc5dad5f8d8ad30636da8bbe7784a 
>   clients/src/main/java/org/apache/kafka/common/protocol/types/Struct.java 121e880a941fcd3e6392859edba11a94236494cc 
>   clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java 3ee5cbad55ce836fd04bb954dcf6ef2f9bc3288f 
>   clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java df37fc6d8f0db0b8192a948426af603be3444da4 
>   core/src/main/scala/kafka/api/OffsetCommitRequest.scala 050615c72efe7dbaa4634f53943bd73273d20ffb 
>   core/src/main/scala/kafka/api/OffsetFetchRequest.scala c7604b9cdeb8f46507f04ed7d35fcb3d5f150713 
>   core/src/main/scala/kafka/common/OffsetMetadataAndError.scala 4cabffeacea09a49913505db19a96a55d58c0909 
>   core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala 191a8677444e53b043e9ad6e94c5a9191c32599e 
>   core/src/main/scala/kafka/server/KafkaApis.scala ec8d9f7ba44741db40875458bd524c4062ad6a26 
>   core/src/main/scala/kafka/server/KafkaConfig.scala 6d74983472249eac808d361344c58cc2858ec971 
>   core/src/main/scala/kafka/server/KafkaServer.scala 89200da30a04943f0b9befe84ab17e62b747c8c4 
>   core/src/main/scala/kafka/server/OffsetManager.scala 0bdd42fea931cddd072c0fff765b10526db6840a 
>   core/src/main/scala/kafka/server/ReplicaManager.scala e58fbb922e93b0c31dff04f187fcadb4ece986d7 
>   core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala cd16ced5465d098be7a60498326b2a98c248f343 
>   core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala 5b93239cdc26b5be7696f4e7863adb9fbe5f0ed5 
>   core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala ba1e48e4300c9fb32e36e7266cb05294f2a481e5 
> 
> Diff: https://reviews.apache.org/r/27391/diff/
> 
> 
> Testing
> -------
> 
> 
> Thanks,
> 
> Guozhang Wang
> 
>


Re: Review Request 27391: Fix KAFKA-1634

Posted by Jun Rao <ju...@gmail.com>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/27391/#review69106
-----------------------------------------------------------


Thanks for the patch. A few more comments.


clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java
<https://reviews.apache.org/r/27391/#comment113727>

    Would it be better to use -1L as the default retention time? MAX_VALUE could be useful for the case when a client wants the offset never to be expired.



core/src/main/scala/kafka/api/OffsetCommitRequest.scala
<https://reviews.apache.org/r/27391/#comment113724>

    It seems that our coding convention has been not to use {} on a single line in the body. So, we use
    if ()
      do sth
    instead of 
    if () {
      do sth
    }



core/src/main/scala/kafka/server/KafkaApis.scala
<https://reviews.apache.org/r/27391/#comment113730>

    I am not sure that we should change the timestamp for offsets produced in V0 and V1. There could be data in the offset topic already written by 0.8.2 code. See the other comment in OffsetManager on expiring.



core/src/main/scala/kafka/server/OffsetManager.scala
<https://reviews.apache.org/r/27391/#comment113729>

    Does that change work correctly with offsets already stored in v0 and v1 format using 0.8.2 code? Would those offsets still be expired at the right time?


- Jun Rao


On Jan. 22, 2015, 12:43 a.m., Guozhang Wang wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/27391/
> -----------------------------------------------------------
> 
> (Updated Jan. 22, 2015, 12:43 a.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1634
>     https://issues.apache.org/jira/browse/KAFKA-1634
> 
> 
> Repository: kafka
> 
> 
> Description
> -------
> 
> Incorporated Joel's comments
> 
> 
> Diffs
> -----
> 
>   clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java 7517b879866fc5dad5f8d8ad30636da8bbe7784a 
>   clients/src/main/java/org/apache/kafka/common/protocol/types/Struct.java 121e880a941fcd3e6392859edba11a94236494cc 
>   clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java 3ee5cbad55ce836fd04bb954dcf6ef2f9bc3288f 
>   clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java df37fc6d8f0db0b8192a948426af603be3444da4 
>   core/src/main/scala/kafka/api/OffsetCommitRequest.scala 050615c72efe7dbaa4634f53943bd73273d20ffb 
>   core/src/main/scala/kafka/api/OffsetFetchRequest.scala c7604b9cdeb8f46507f04ed7d35fcb3d5f150713 
>   core/src/main/scala/kafka/common/OffsetMetadataAndError.scala 4cabffeacea09a49913505db19a96a55d58c0909 
>   core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala 191a8677444e53b043e9ad6e94c5a9191c32599e 
>   core/src/main/scala/kafka/server/KafkaApis.scala ec8d9f7ba44741db40875458bd524c4062ad6a26 
>   core/src/main/scala/kafka/server/KafkaConfig.scala 6d74983472249eac808d361344c58cc2858ec971 
>   core/src/main/scala/kafka/server/KafkaServer.scala 89200da30a04943f0b9befe84ab17e62b747c8c4 
>   core/src/main/scala/kafka/server/OffsetManager.scala 0bdd42fea931cddd072c0fff765b10526db6840a 
>   core/src/main/scala/kafka/server/ReplicaManager.scala e58fbb922e93b0c31dff04f187fcadb4ece986d7 
>   core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala cd16ced5465d098be7a60498326b2a98c248f343 
>   core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala 5b93239cdc26b5be7696f4e7863adb9fbe5f0ed5 
>   core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala ba1e48e4300c9fb32e36e7266cb05294f2a481e5 
> 
> Diff: https://reviews.apache.org/r/27391/diff/
> 
> 
> Testing
> -------
> 
> 
> Thanks,
> 
> Guozhang Wang
> 
>


Re: Review Request 27391: Rebase patch and incorporate Joel's comments

Posted by Joel Koshy <jj...@gmail.com>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/27391/#review77970
-----------------------------------------------------------

Ship it!


Thank you for the updated patch. Just one minor comment below on suggested comments to add. Otherwise I think this looks good!


core/src/main/scala/kafka/server/KafkaApis.scala
<https://reviews.apache.org/r/27391/#comment126341>

    may be worth adding comments to describe these scenarios:
    ```
    // - Commit timestamp is always set to now.
    // - "Default" expiration timestamp is now + retention (and retention may be overridden if v2)
    // - Expire timestamp is computed differently for v1 and v2.
    //   - If v1 and no explicit commit timestamp is provided we use default expiration timestamp.
    //   - If v1 and explicit commit timestamp is provided we calculate retention from that explicit commit timestamp
    //   - If v2 we use the default expiration timestamp
    ```


- Joel Koshy


On March 26, 2015, 7:28 p.m., Guozhang Wang wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/27391/
> -----------------------------------------------------------
> 
> (Updated March 26, 2015, 7:28 p.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1634
>     https://issues.apache.org/jira/browse/KAFKA-1634
> 
> 
> Repository: kafka
> 
> 
> Description
> -------
> 
> 1. Always override commit time to now, only change expire time accorinding to the commit time.
> 2. Override expire time upon loading offsets of version 1.
> 3. Change new consumer to use the new version 2.
> 
> 
> Diffs
> -----
> 
>   clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java 436f9b2a843bc8c44d17403f5880b6736a5d56a8 
>   clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java 101f382170ad6740b3f8ff2d27b93a64874a857f 
>   clients/src/main/java/org/apache/kafka/common/protocol/types/Struct.java 7672a3a0d674d101078651956d7122059e59e6d5 
>   clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java 94e9d376235b3288836807d8e8d2547b3743aad5 
>   clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java 13237fd72da5448a3d596b882fef141f336f827d 
>   core/src/main/scala/kafka/api/OffsetCommitRequest.scala 050615c72efe7dbaa4634f53943bd73273d20ffb 
>   core/src/main/scala/kafka/api/OffsetFetchRequest.scala c7604b9cdeb8f46507f04ed7d35fcb3d5f150713 
>   core/src/main/scala/kafka/common/OffsetMetadataAndError.scala 1584a92447d276b6206d212b0b5487c352044154 
>   core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala cca815a128419e146feff53adaeddc901bb5de1f 
>   core/src/main/scala/kafka/server/KafkaApis.scala 35af98f0bc1b6a50bd1d97a30147593f8c6a422d 
>   core/src/main/scala/kafka/server/KafkaConfig.scala 46d21c73f1feb3410751899380b35da0c37c975c 
>   core/src/main/scala/kafka/server/KafkaServer.scala dddef938fabae157ed8644536eb1a2f329fb42b7 
>   core/src/main/scala/kafka/server/OffsetManager.scala d05e14d2018c0a0b5d22697313e9abde4363d65d 
>   core/src/main/scala/kafka/server/ReplicaManager.scala c5274822c57bf3c1f9e4135c0bdcaa87ee50ce20 
>   core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala fba852afa1b2f46b61e2fd12c38c821ba04e9cc6 
>   core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala e4d0435eb4213597c2fb9c3f2093c227de53a417 
>   core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala b46daa436231d5aa5c1e2992fd5c2d9a73a30c80 
> 
> Diff: https://reviews.apache.org/r/27391/diff/
> 
> 
> Testing
> -------
> 
> 
> Thanks,
> 
> Guozhang Wang
> 
>


Re: Review Request 27391: Rebase patch and incorporate Joel's comments

Posted by Guozhang Wang <wa...@gmail.com>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/27391/
-----------------------------------------------------------

(Updated March 26, 2015, 7:28 p.m.)


Review request for kafka.


Bugs: KAFKA-1634
    https://issues.apache.org/jira/browse/KAFKA-1634


Repository: kafka


Description (updated)
-------

1. Always override commit time to now, only change expire time accorinding to the commit time.
2. Override expire time upon loading offsets of version 1.
3. Change new consumer to use the new version 2.


Diffs
-----

  clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java 436f9b2a843bc8c44d17403f5880b6736a5d56a8 
  clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java 101f382170ad6740b3f8ff2d27b93a64874a857f 
  clients/src/main/java/org/apache/kafka/common/protocol/types/Struct.java 7672a3a0d674d101078651956d7122059e59e6d5 
  clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java 94e9d376235b3288836807d8e8d2547b3743aad5 
  clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java 13237fd72da5448a3d596b882fef141f336f827d 
  core/src/main/scala/kafka/api/OffsetCommitRequest.scala 050615c72efe7dbaa4634f53943bd73273d20ffb 
  core/src/main/scala/kafka/api/OffsetFetchRequest.scala c7604b9cdeb8f46507f04ed7d35fcb3d5f150713 
  core/src/main/scala/kafka/common/OffsetMetadataAndError.scala 1584a92447d276b6206d212b0b5487c352044154 
  core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala cca815a128419e146feff53adaeddc901bb5de1f 
  core/src/main/scala/kafka/server/KafkaApis.scala 35af98f0bc1b6a50bd1d97a30147593f8c6a422d 
  core/src/main/scala/kafka/server/KafkaConfig.scala 46d21c73f1feb3410751899380b35da0c37c975c 
  core/src/main/scala/kafka/server/KafkaServer.scala dddef938fabae157ed8644536eb1a2f329fb42b7 
  core/src/main/scala/kafka/server/OffsetManager.scala d05e14d2018c0a0b5d22697313e9abde4363d65d 
  core/src/main/scala/kafka/server/ReplicaManager.scala c5274822c57bf3c1f9e4135c0bdcaa87ee50ce20 
  core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala fba852afa1b2f46b61e2fd12c38c821ba04e9cc6 
  core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala e4d0435eb4213597c2fb9c3f2093c227de53a417 
  core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala b46daa436231d5aa5c1e2992fd5c2d9a73a30c80 

Diff: https://reviews.apache.org/r/27391/diff/


Testing
-------


Thanks,

Guozhang Wang


Re: Review Request 27391: Rebase patch and incorporate Joel's comments

Posted by Guozhang Wang <wa...@gmail.com>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/27391/
-----------------------------------------------------------

(Updated March 26, 2015, 7:27 p.m.)


Review request for kafka.


Bugs: KAFKA-1634
    https://issues.apache.org/jira/browse/KAFKA-1634


Repository: kafka


Description (updated)
-------

TBD


Diffs (updated)
-----

  clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java 436f9b2a843bc8c44d17403f5880b6736a5d56a8 
  clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java 101f382170ad6740b3f8ff2d27b93a64874a857f 
  clients/src/main/java/org/apache/kafka/common/protocol/types/Struct.java 7672a3a0d674d101078651956d7122059e59e6d5 
  clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java 94e9d376235b3288836807d8e8d2547b3743aad5 
  clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java 13237fd72da5448a3d596b882fef141f336f827d 
  core/src/main/scala/kafka/api/OffsetCommitRequest.scala 050615c72efe7dbaa4634f53943bd73273d20ffb 
  core/src/main/scala/kafka/api/OffsetFetchRequest.scala c7604b9cdeb8f46507f04ed7d35fcb3d5f150713 
  core/src/main/scala/kafka/common/OffsetMetadataAndError.scala 1584a92447d276b6206d212b0b5487c352044154 
  core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala cca815a128419e146feff53adaeddc901bb5de1f 
  core/src/main/scala/kafka/server/KafkaApis.scala 35af98f0bc1b6a50bd1d97a30147593f8c6a422d 
  core/src/main/scala/kafka/server/KafkaConfig.scala 46d21c73f1feb3410751899380b35da0c37c975c 
  core/src/main/scala/kafka/server/KafkaServer.scala dddef938fabae157ed8644536eb1a2f329fb42b7 
  core/src/main/scala/kafka/server/OffsetManager.scala d05e14d2018c0a0b5d22697313e9abde4363d65d 
  core/src/main/scala/kafka/server/ReplicaManager.scala c5274822c57bf3c1f9e4135c0bdcaa87ee50ce20 
  core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala fba852afa1b2f46b61e2fd12c38c821ba04e9cc6 
  core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala e4d0435eb4213597c2fb9c3f2093c227de53a417 
  core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala b46daa436231d5aa5c1e2992fd5c2d9a73a30c80 

Diff: https://reviews.apache.org/r/27391/diff/


Testing
-------


Thanks,

Guozhang Wang


Re: Review Request 27391: Rebase patch and incorporate Joel's comments

Posted by Guozhang Wang <wa...@gmail.com>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/27391/
-----------------------------------------------------------

(Updated March 26, 2015, 7:17 p.m.)


Review request for kafka.


Bugs: KAFKA-1634
    https://issues.apache.org/jira/browse/KAFKA-1634


Repository: kafka


Description (updated)
-------

1. Always override commit time to now, only change expire time accorinding to the commit time.
2. Override expire time upon loading offsets of version 1.
3. Change new consumer to use the new version 2.


Diffs
-----

  clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java 436f9b2a843bc8c44d17403f5880b6736a5d56a8 
  clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java 101f382170ad6740b3f8ff2d27b93a64874a857f 
  clients/src/main/java/org/apache/kafka/common/protocol/types/Struct.java 7672a3a0d674d101078651956d7122059e59e6d5 
  clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java 94e9d376235b3288836807d8e8d2547b3743aad5 
  clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java 13237fd72da5448a3d596b882fef141f336f827d 
  core/src/main/scala/kafka/api/OffsetCommitRequest.scala 050615c72efe7dbaa4634f53943bd73273d20ffb 
  core/src/main/scala/kafka/api/OffsetFetchRequest.scala c7604b9cdeb8f46507f04ed7d35fcb3d5f150713 
  core/src/main/scala/kafka/common/OffsetMetadataAndError.scala 1584a92447d276b6206d212b0b5487c352044154 
  core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala cca815a128419e146feff53adaeddc901bb5de1f 
  core/src/main/scala/kafka/server/KafkaApis.scala 35af98f0bc1b6a50bd1d97a30147593f8c6a422d 
  core/src/main/scala/kafka/server/KafkaServer.scala dddef938fabae157ed8644536eb1a2f329fb42b7 
  core/src/main/scala/kafka/server/OffsetManager.scala d05e14d2018c0a0b5d22697313e9abde4363d65d 
  core/src/main/scala/kafka/server/ReplicaManager.scala c5274822c57bf3c1f9e4135c0bdcaa87ee50ce20 
  core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala fba852afa1b2f46b61e2fd12c38c821ba04e9cc6 
  core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala e4d0435eb4213597c2fb9c3f2093c227de53a417 
  core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala b46daa436231d5aa5c1e2992fd5c2d9a73a30c80 

Diff: https://reviews.apache.org/r/27391/diff/


Testing
-------


Thanks,

Guozhang Wang


Re: Review Request 27391: Rebase patch and incorporate Joel's comments

Posted by Guozhang Wang <wa...@gmail.com>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/27391/
-----------------------------------------------------------

(Updated March 26, 2015, 7:16 p.m.)


Review request for kafka.


Summary (updated)
-----------------

Rebase patch and incorporate Joel's comments


Bugs: KAFKA-1634
    https://issues.apache.org/jira/browse/KAFKA-1634


Repository: kafka


Description (updated)
-------

v1


last pass


minor TODO


Incorporate Joel's comments


incorporate Joel's comments round two


Joel's comment round three


Joel round four


dummy


Joel round four


unit tests


Joel and Jun's comments


incorporate Joel and Jun's comments


Joel's comments


Jun's comments


minor test fix


address comments


rebase and fixes


revert log4j


Diffs (updated)
-----

  clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java 436f9b2a843bc8c44d17403f5880b6736a5d56a8 
  clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java 101f382170ad6740b3f8ff2d27b93a64874a857f 
  clients/src/main/java/org/apache/kafka/common/protocol/types/Struct.java 7672a3a0d674d101078651956d7122059e59e6d5 
  clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java 94e9d376235b3288836807d8e8d2547b3743aad5 
  clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java 13237fd72da5448a3d596b882fef141f336f827d 
  core/src/main/scala/kafka/api/OffsetCommitRequest.scala 050615c72efe7dbaa4634f53943bd73273d20ffb 
  core/src/main/scala/kafka/api/OffsetFetchRequest.scala c7604b9cdeb8f46507f04ed7d35fcb3d5f150713 
  core/src/main/scala/kafka/common/OffsetMetadataAndError.scala 1584a92447d276b6206d212b0b5487c352044154 
  core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala cca815a128419e146feff53adaeddc901bb5de1f 
  core/src/main/scala/kafka/server/KafkaApis.scala 35af98f0bc1b6a50bd1d97a30147593f8c6a422d 
  core/src/main/scala/kafka/server/KafkaServer.scala dddef938fabae157ed8644536eb1a2f329fb42b7 
  core/src/main/scala/kafka/server/OffsetManager.scala d05e14d2018c0a0b5d22697313e9abde4363d65d 
  core/src/main/scala/kafka/server/ReplicaManager.scala c5274822c57bf3c1f9e4135c0bdcaa87ee50ce20 
  core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala fba852afa1b2f46b61e2fd12c38c821ba04e9cc6 
  core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala e4d0435eb4213597c2fb9c3f2093c227de53a417 
  core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala b46daa436231d5aa5c1e2992fd5c2d9a73a30c80 

Diff: https://reviews.apache.org/r/27391/diff/


Testing
-------


Thanks,

Guozhang Wang


Re: Review Request 27391: Fix KAFKA-1634

Posted by Guozhang Wang <wa...@gmail.com>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/27391/
-----------------------------------------------------------

(Updated Feb. 6, 2015, 7:01 p.m.)


Review request for kafka.


Bugs: KAFKA-1634
    https://issues.apache.org/jira/browse/KAFKA-1634


Repository: kafka


Description (updated)
-------

Incorporated Joel's comments


Diffs (updated)
-----

  clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java 09a6f11163ecb1e733c604ade04646e83bbc0c85 
  clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java 101f382170ad6740b3f8ff2d27b93a64874a857f 
  clients/src/main/java/org/apache/kafka/common/protocol/types/Struct.java ff89f0e37d5fa787b0218eff86d169aaeae2107b 
  clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java 94e9d376235b3288836807d8e8d2547b3743aad5 
  clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java 13237fd72da5448a3d596b882fef141f336f827d 
  core/src/main/scala/kafka/api/OffsetCommitRequest.scala 050615c72efe7dbaa4634f53943bd73273d20ffb 
  core/src/main/scala/kafka/api/OffsetFetchRequest.scala c7604b9cdeb8f46507f04ed7d35fcb3d5f150713 
  core/src/main/scala/kafka/common/OffsetMetadataAndError.scala 4cabffeacea09a49913505db19a96a55d58c0909 
  core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala 5487259751ebe19f137948249aa1fd2637d2deb4 
  core/src/main/scala/kafka/server/KafkaApis.scala f2b027bf944e735fd52cc282690ec1b8395f9290 
  core/src/main/scala/kafka/server/KafkaConfig.scala 6d74983472249eac808d361344c58cc2858ec971 
  core/src/main/scala/kafka/server/KafkaServer.scala 89200da30a04943f0b9befe84ab17e62b747c8c4 
  core/src/main/scala/kafka/server/OffsetManager.scala 0bdd42fea931cddd072c0fff765b10526db6840a 
  core/src/main/scala/kafka/server/ReplicaManager.scala fb948b9ab28c516e81dab14dcbe211dcd99842b6 
  core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala a1f72f8c2042ff2a43af503b2e06f84706dad9db 
  core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala 5b93239cdc26b5be7696f4e7863adb9fbe5f0ed5 
  core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala ba1e48e4300c9fb32e36e7266cb05294f2a481e5 

Diff: https://reviews.apache.org/r/27391/diff/


Testing
-------


Thanks,

Guozhang Wang


Re: Review Request 27391: Fix KAFKA-1634

Posted by Guozhang Wang <wa...@gmail.com>.

> On Feb. 4, 2015, 2:15 a.m., Joel Koshy wrote:
> > core/src/main/scala/kafka/api/OffsetCommitRequest.scala, line 48
> > <https://reviews.apache.org/r/27391/diff/11/?file=832423#file832423line48>
> >
> >     I our convention is to include the if in the previous line.

I checked the code base and it seems we do not have a consensus here.. and personally I would prefer this as it actually make the logic clearer.


> On Feb. 4, 2015, 2:15 a.m., Joel Koshy wrote:
> > core/src/main/scala/kafka/common/OffsetMetadataAndError.scala, line 36
> > <https://reviews.apache.org/r/27391/diff/11/?file=832425#file832425line36>
> >
> >     (This is also a public API change - although you did add an Object wrapper further down that comes close to the original API.)

I think the wrapper MessageAndMetadata preserves the existing public API right?


> On Feb. 4, 2015, 2:15 a.m., Joel Koshy wrote:
> > core/src/main/scala/kafka/server/OffsetManager.scala, line 65
> > <https://reviews.apache.org/r/27391/diff/11/?file=832430#file832430line65>
> >
> >     Should we call this maxOffsetRetentionMs instead?

Not exactly, as it is just the default offset retention, not the upper limit: users can specify a value larger than this default and it will still be accepted.


> On Feb. 4, 2015, 2:15 a.m., Joel Koshy wrote:
> > core/src/main/scala/kafka/server/KafkaApis.scala, line 163
> > <https://reviews.apache.org/r/27391/diff/11/?file=832427#file832427line163>
> >
> >     Shouldn't the commit timestamp _always_ be set to the current time?
> >     
> >     What I was thinking is this:
> >     If v0:
> >     - An explicit timestamp is provided only to override the v0 default retention which is add the server-side retention to the current timestamp. The (true) commit timestamp - i.e., receive time is useful for debugging purposes. So if an explicit timestamp is provided in v0 then use that to compute the absolute expire timestamp which will be the given commit timestamp; so you would store (commitTimestamp = now; expireTimestamp = given commitTimeStamp); if v0 and commit timestamp is default, then you would store (commitTimestamp = now, expireTimestamp = now + offsetRetention)
> >     - if v1: (commitTimestamp = now, expireTimestamp = now + offsetRetention)
> >     
> >     This way, you should have correct expiration behavior for v0, v1 and v2 and at the same time have the true commit timestamp - i.e., the receive time at the broker which is useful for debugging. (also see comment in OffsetManager)

In v0/v1, the commit timestamp can be specified as a future timestamp so the expiration timestamp = commit timestamp + retention (in v0/v1 it is always the default value).

This behavior should not be respected, i.e. offsets already stored in v0 and v1 format should be expired correctly using 0.8.2 code. Details can be found in Jun's comments and my replies.


> On Feb. 4, 2015, 2:15 a.m., Joel Koshy wrote:
> > core/src/main/scala/kafka/server/OffsetManager.scala, line 557
> > <https://reviews.apache.org/r/27391/diff/11/?file=832430#file832430line557>
> >
> >     follow-up from above comment...
> >     and here you would set:
> >     commitTimestamp = timestamp
> >     expireTimestamp = timestamp
> >     
> >     So do you think this would work overall?
> >     
> >     I could be wrong - this patch has proven to be much trickier than we originally thought.

See the comments above.


- Guozhang


-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/27391/#review70836
-----------------------------------------------------------


On Jan. 24, 2015, 12:06 a.m., Guozhang Wang wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/27391/
> -----------------------------------------------------------
> 
> (Updated Jan. 24, 2015, 12:06 a.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1634
>     https://issues.apache.org/jira/browse/KAFKA-1634
> 
> 
> Repository: kafka
> 
> 
> Description
> -------
> 
> Incorporated Jun's comments
> 
> 
> Diffs
> -----
> 
>   clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java 7517b879866fc5dad5f8d8ad30636da8bbe7784a 
>   clients/src/main/java/org/apache/kafka/common/protocol/types/Struct.java 121e880a941fcd3e6392859edba11a94236494cc 
>   clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java 3ee5cbad55ce836fd04bb954dcf6ef2f9bc3288f 
>   clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java df37fc6d8f0db0b8192a948426af603be3444da4 
>   core/src/main/scala/kafka/api/OffsetCommitRequest.scala 050615c72efe7dbaa4634f53943bd73273d20ffb 
>   core/src/main/scala/kafka/api/OffsetFetchRequest.scala c7604b9cdeb8f46507f04ed7d35fcb3d5f150713 
>   core/src/main/scala/kafka/common/OffsetMetadataAndError.scala 4cabffeacea09a49913505db19a96a55d58c0909 
>   core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala 5487259751ebe19f137948249aa1fd2637d2deb4 
>   core/src/main/scala/kafka/server/KafkaApis.scala ec8d9f7ba44741db40875458bd524c4062ad6a26 
>   core/src/main/scala/kafka/server/KafkaConfig.scala 6d74983472249eac808d361344c58cc2858ec971 
>   core/src/main/scala/kafka/server/KafkaServer.scala 89200da30a04943f0b9befe84ab17e62b747c8c4 
>   core/src/main/scala/kafka/server/OffsetManager.scala 0bdd42fea931cddd072c0fff765b10526db6840a 
>   core/src/main/scala/kafka/server/ReplicaManager.scala e58fbb922e93b0c31dff04f187fcadb4ece986d7 
>   core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala cd16ced5465d098be7a60498326b2a98c248f343 
>   core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala 5b93239cdc26b5be7696f4e7863adb9fbe5f0ed5 
>   core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala ba1e48e4300c9fb32e36e7266cb05294f2a481e5 
> 
> Diff: https://reviews.apache.org/r/27391/diff/
> 
> 
> Testing
> -------
> 
> 
> Thanks,
> 
> Guozhang Wang
> 
>


Re: Review Request 27391: Fix KAFKA-1634

Posted by Joel Koshy <jj...@gmail.com>.

> On Feb. 4, 2015, 2:15 a.m., Joel Koshy wrote:
> > core/src/main/scala/kafka/api/OffsetCommitRequest.scala, line 48
> > <https://reviews.apache.org/r/27391/diff/11/?file=832423#file832423line48>
> >
> >     I our convention is to include the if in the previous line.
> 
> Guozhang Wang wrote:
>     I checked the code base and it seems we do not have a consensus here.. and personally I would prefer this as it actually make the logic clearer.

We don't have a formal convention here but I think we should and incorporate it into our coding guidelines. The problem with a separate line is that at first glance (especially with just two character indentation) it does not seem to be associated with the assignment. Also, most current occurrences put the if on the same line.
```
find . -name "*.scala" -exec pcregrep -c '=(\s)*if' {} \; | grep -v 0 | paste -s -d+ | bc
61
find . -name "*.scala" -exec pcregrep -Mc '=(\s)*\n(\s)*if' {} \; | grep -v 0 | paste -s -d+ | bc
36
```


> On Feb. 4, 2015, 2:15 a.m., Joel Koshy wrote:
> > core/src/main/scala/kafka/common/OffsetMetadataAndError.scala, line 36
> > <https://reviews.apache.org/r/27391/diff/11/?file=832425#file832425line36>
> >
> >     (This is also a public API change - although you did add an Object wrapper further down that comes close to the original API.)
> 
> Guozhang Wang wrote:
>     I think the wrapper MessageAndMetadata preserves the existing public API right?

You mean the wrapper object? It comes close, but not quite - since you can instantiate a case class with a `new` keyword or without. You need it for the secondary constructors of the case class. With the object wrapper we assume that the objects were being constructed without the new. I don't know how many people actually used it though, but it was part of the public API since you would need to create those objects to form an OffsetCommitRequest.


> On Feb. 4, 2015, 2:15 a.m., Joel Koshy wrote:
> > core/src/main/scala/kafka/server/KafkaApis.scala, line 163
> > <https://reviews.apache.org/r/27391/diff/11/?file=832427#file832427line163>
> >
> >     Shouldn't the commit timestamp _always_ be set to the current time?
> >     
> >     What I was thinking is this:
> >     If v0:
> >     - An explicit timestamp is provided only to override the v0 default retention which is add the server-side retention to the current timestamp. The (true) commit timestamp - i.e., receive time is useful for debugging purposes. So if an explicit timestamp is provided in v0 then use that to compute the absolute expire timestamp which will be the given commit timestamp; so you would store (commitTimestamp = now; expireTimestamp = given commitTimeStamp); if v0 and commit timestamp is default, then you would store (commitTimestamp = now, expireTimestamp = now + offsetRetention)
> >     - if v1: (commitTimestamp = now, expireTimestamp = now + offsetRetention)
> >     
> >     This way, you should have correct expiration behavior for v0, v1 and v2 and at the same time have the true commit timestamp - i.e., the receive time at the broker which is useful for debugging. (also see comment in OffsetManager)
> 
> Guozhang Wang wrote:
>     In v0/v1, the commit timestamp can be specified as a future timestamp so the expiration timestamp = commit timestamp + retention (in v0/v1 it is always the default value).
>     
>     This behavior should not be respected, i.e. offsets already stored in v0 and v1 format should be expired correctly using 0.8.2 code. Details can be found in Jun's comments and my replies.

I don't think we are on the same page here. Let's discuss offline to follow-up.


> On Feb. 4, 2015, 2:15 a.m., Joel Koshy wrote:
> > core/src/main/scala/kafka/server/OffsetManager.scala, line 65
> > <https://reviews.apache.org/r/27391/diff/11/?file=832430#file832430line65>
> >
> >     Should we call this maxOffsetRetentionMs instead?
> 
> Guozhang Wang wrote:
>     Not exactly, as it is just the default offset retention, not the upper limit: users can specify a value larger than this default and it will still be accepted.

Yes you are right.


- Joel


-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/27391/#review70836
-----------------------------------------------------------


On Feb. 6, 2015, 7:01 p.m., Guozhang Wang wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/27391/
> -----------------------------------------------------------
> 
> (Updated Feb. 6, 2015, 7:01 p.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1634
>     https://issues.apache.org/jira/browse/KAFKA-1634
> 
> 
> Repository: kafka
> 
> 
> Description
> -------
> 
> Incorporated Joel's comments
> 
> 
> Diffs
> -----
> 
>   clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java 09a6f11163ecb1e733c604ade04646e83bbc0c85 
>   clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java 101f382170ad6740b3f8ff2d27b93a64874a857f 
>   clients/src/main/java/org/apache/kafka/common/protocol/types/Struct.java ff89f0e37d5fa787b0218eff86d169aaeae2107b 
>   clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java 94e9d376235b3288836807d8e8d2547b3743aad5 
>   clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java 13237fd72da5448a3d596b882fef141f336f827d 
>   core/src/main/scala/kafka/api/OffsetCommitRequest.scala 050615c72efe7dbaa4634f53943bd73273d20ffb 
>   core/src/main/scala/kafka/api/OffsetFetchRequest.scala c7604b9cdeb8f46507f04ed7d35fcb3d5f150713 
>   core/src/main/scala/kafka/common/OffsetMetadataAndError.scala 4cabffeacea09a49913505db19a96a55d58c0909 
>   core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala 5487259751ebe19f137948249aa1fd2637d2deb4 
>   core/src/main/scala/kafka/server/KafkaApis.scala f2b027bf944e735fd52cc282690ec1b8395f9290 
>   core/src/main/scala/kafka/server/KafkaConfig.scala 6d74983472249eac808d361344c58cc2858ec971 
>   core/src/main/scala/kafka/server/KafkaServer.scala 89200da30a04943f0b9befe84ab17e62b747c8c4 
>   core/src/main/scala/kafka/server/OffsetManager.scala 0bdd42fea931cddd072c0fff765b10526db6840a 
>   core/src/main/scala/kafka/server/ReplicaManager.scala fb948b9ab28c516e81dab14dcbe211dcd99842b6 
>   core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala a1f72f8c2042ff2a43af503b2e06f84706dad9db 
>   core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala 5b93239cdc26b5be7696f4e7863adb9fbe5f0ed5 
>   core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala ba1e48e4300c9fb32e36e7266cb05294f2a481e5 
> 
> Diff: https://reviews.apache.org/r/27391/diff/
> 
> 
> Testing
> -------
> 
> 
> Thanks,
> 
> Guozhang Wang
> 
>


Re: Review Request 27391: Fix KAFKA-1634

Posted by Joel Koshy <jj...@gmail.com>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/27391/#review70836
-----------------------------------------------------------



clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java
<https://reviews.apache.org/r/27391/#comment116277>

    remove both



clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java
<https://reviews.apache.org/r/27391/#comment116310>

    Should we revert this rename since this is part of the public API? I would be surprised if people are using it though - but still.



core/src/main/scala/kafka/api/OffsetCommitRequest.scala
<https://reviews.apache.org/r/27391/#comment116314>

    I our convention is to include the if in the previous line.



core/src/main/scala/kafka/api/OffsetCommitRequest.scala
<https://reviews.apache.org/r/27391/#comment116315>

    same here



core/src/main/scala/kafka/api/OffsetCommitRequest.scala
<https://reviews.apache.org/r/27391/#comment116316>

    and here



core/src/main/scala/kafka/api/OffsetCommitRequest.scala
<https://reviews.apache.org/r/27391/#comment116317>

    and here



core/src/main/scala/kafka/api/OffsetFetchRequest.scala
<https://reviews.apache.org/r/27391/#comment116318>

    Can you run "organize imports"? Some of these seem redundant/unnecessary.



core/src/main/scala/kafka/common/OffsetMetadataAndError.scala
<https://reviews.apache.org/r/27391/#comment116320>

    (This is also a public API change - although you did add an Object wrapper further down that comes close to the original API.)



core/src/main/scala/kafka/server/KafkaApis.scala
<https://reviews.apache.org/r/27391/#comment116322>

    Shouldn't the commit timestamp _always_ be set to the current time?
    
    What I was thinking is this:
    If v0:
    - An explicit timestamp is provided only to override the v0 default retention which is add the server-side retention to the current timestamp. The (true) commit timestamp - i.e., receive time is useful for debugging purposes. So if an explicit timestamp is provided in v0 then use that to compute the absolute expire timestamp which will be the given commit timestamp; so you would store (commitTimestamp = now; expireTimestamp = given commitTimeStamp); if v0 and commit timestamp is default, then you would store (commitTimestamp = now, expireTimestamp = now + offsetRetention)
    - if v1: (commitTimestamp = now, expireTimestamp = now + offsetRetention)
    
    This way, you should have correct expiration behavior for v0, v1 and v2 and at the same time have the true commit timestamp - i.e., the receive time at the broker which is useful for debugging. (also see comment in OffsetManager)



core/src/main/scala/kafka/server/OffsetManager.scala
<https://reviews.apache.org/r/27391/#comment116374>

    Should we call this maxOffsetRetentionMs instead?



core/src/main/scala/kafka/server/OffsetManager.scala
<https://reviews.apache.org/r/27391/#comment116372>

    follow-up from above comment...
    and here you would set:
    commitTimestamp = timestamp
    expireTimestamp = timestamp
    
    So do you think this would work overall?
    
    I could be wrong - this patch has proven to be much trickier than we originally thought.


- Joel Koshy


On Jan. 24, 2015, 12:06 a.m., Guozhang Wang wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/27391/
> -----------------------------------------------------------
> 
> (Updated Jan. 24, 2015, 12:06 a.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1634
>     https://issues.apache.org/jira/browse/KAFKA-1634
> 
> 
> Repository: kafka
> 
> 
> Description
> -------
> 
> Incorporated Jun's comments
> 
> 
> Diffs
> -----
> 
>   clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java 7517b879866fc5dad5f8d8ad30636da8bbe7784a 
>   clients/src/main/java/org/apache/kafka/common/protocol/types/Struct.java 121e880a941fcd3e6392859edba11a94236494cc 
>   clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java 3ee5cbad55ce836fd04bb954dcf6ef2f9bc3288f 
>   clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java df37fc6d8f0db0b8192a948426af603be3444da4 
>   core/src/main/scala/kafka/api/OffsetCommitRequest.scala 050615c72efe7dbaa4634f53943bd73273d20ffb 
>   core/src/main/scala/kafka/api/OffsetFetchRequest.scala c7604b9cdeb8f46507f04ed7d35fcb3d5f150713 
>   core/src/main/scala/kafka/common/OffsetMetadataAndError.scala 4cabffeacea09a49913505db19a96a55d58c0909 
>   core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala 5487259751ebe19f137948249aa1fd2637d2deb4 
>   core/src/main/scala/kafka/server/KafkaApis.scala ec8d9f7ba44741db40875458bd524c4062ad6a26 
>   core/src/main/scala/kafka/server/KafkaConfig.scala 6d74983472249eac808d361344c58cc2858ec971 
>   core/src/main/scala/kafka/server/KafkaServer.scala 89200da30a04943f0b9befe84ab17e62b747c8c4 
>   core/src/main/scala/kafka/server/OffsetManager.scala 0bdd42fea931cddd072c0fff765b10526db6840a 
>   core/src/main/scala/kafka/server/ReplicaManager.scala e58fbb922e93b0c31dff04f187fcadb4ece986d7 
>   core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala cd16ced5465d098be7a60498326b2a98c248f343 
>   core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala 5b93239cdc26b5be7696f4e7863adb9fbe5f0ed5 
>   core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala ba1e48e4300c9fb32e36e7266cb05294f2a481e5 
> 
> Diff: https://reviews.apache.org/r/27391/diff/
> 
> 
> Testing
> -------
> 
> 
> Thanks,
> 
> Guozhang Wang
> 
>


Re: Review Request 27391: Fix KAFKA-1634

Posted by Guozhang Wang <wa...@gmail.com>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/27391/
-----------------------------------------------------------

(Updated Jan. 24, 2015, 12:06 a.m.)


Review request for kafka.


Bugs: KAFKA-1634
    https://issues.apache.org/jira/browse/KAFKA-1634


Repository: kafka


Description
-------

Incorporated Jun's comments


Diffs (updated)
-----

  clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java 7517b879866fc5dad5f8d8ad30636da8bbe7784a 
  clients/src/main/java/org/apache/kafka/common/protocol/types/Struct.java 121e880a941fcd3e6392859edba11a94236494cc 
  clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java 3ee5cbad55ce836fd04bb954dcf6ef2f9bc3288f 
  clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java df37fc6d8f0db0b8192a948426af603be3444da4 
  core/src/main/scala/kafka/api/OffsetCommitRequest.scala 050615c72efe7dbaa4634f53943bd73273d20ffb 
  core/src/main/scala/kafka/api/OffsetFetchRequest.scala c7604b9cdeb8f46507f04ed7d35fcb3d5f150713 
  core/src/main/scala/kafka/common/OffsetMetadataAndError.scala 4cabffeacea09a49913505db19a96a55d58c0909 
  core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala 5487259751ebe19f137948249aa1fd2637d2deb4 
  core/src/main/scala/kafka/server/KafkaApis.scala ec8d9f7ba44741db40875458bd524c4062ad6a26 
  core/src/main/scala/kafka/server/KafkaConfig.scala 6d74983472249eac808d361344c58cc2858ec971 
  core/src/main/scala/kafka/server/KafkaServer.scala 89200da30a04943f0b9befe84ab17e62b747c8c4 
  core/src/main/scala/kafka/server/OffsetManager.scala 0bdd42fea931cddd072c0fff765b10526db6840a 
  core/src/main/scala/kafka/server/ReplicaManager.scala e58fbb922e93b0c31dff04f187fcadb4ece986d7 
  core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala cd16ced5465d098be7a60498326b2a98c248f343 
  core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala 5b93239cdc26b5be7696f4e7863adb9fbe5f0ed5 
  core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala ba1e48e4300c9fb32e36e7266cb05294f2a481e5 

Diff: https://reviews.apache.org/r/27391/diff/


Testing
-------


Thanks,

Guozhang Wang


Re: Review Request 27391: Fix KAFKA-1634

Posted by Guozhang Wang <wa...@gmail.com>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/27391/
-----------------------------------------------------------

(Updated Jan. 23, 2015, 2:47 a.m.)


Review request for kafka.


Bugs: KAFKA-1634
    https://issues.apache.org/jira/browse/KAFKA-1634


Repository: kafka


Description (updated)
-------

Incorporated Jun's comments


Diffs (updated)
-----

  clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java 7517b879866fc5dad5f8d8ad30636da8bbe7784a 
  clients/src/main/java/org/apache/kafka/common/protocol/types/Struct.java 121e880a941fcd3e6392859edba11a94236494cc 
  clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java 3ee5cbad55ce836fd04bb954dcf6ef2f9bc3288f 
  clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java df37fc6d8f0db0b8192a948426af603be3444da4 
  core/src/main/scala/kafka/api/OffsetCommitRequest.scala 050615c72efe7dbaa4634f53943bd73273d20ffb 
  core/src/main/scala/kafka/api/OffsetFetchRequest.scala c7604b9cdeb8f46507f04ed7d35fcb3d5f150713 
  core/src/main/scala/kafka/common/OffsetMetadataAndError.scala 4cabffeacea09a49913505db19a96a55d58c0909 
  core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala 5487259751ebe19f137948249aa1fd2637d2deb4 
  core/src/main/scala/kafka/server/KafkaApis.scala ec8d9f7ba44741db40875458bd524c4062ad6a26 
  core/src/main/scala/kafka/server/KafkaConfig.scala 6d74983472249eac808d361344c58cc2858ec971 
  core/src/main/scala/kafka/server/KafkaServer.scala 89200da30a04943f0b9befe84ab17e62b747c8c4 
  core/src/main/scala/kafka/server/OffsetManager.scala 0bdd42fea931cddd072c0fff765b10526db6840a 
  core/src/main/scala/kafka/server/ReplicaManager.scala e58fbb922e93b0c31dff04f187fcadb4ece986d7 
  core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala cd16ced5465d098be7a60498326b2a98c248f343 
  core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala 5b93239cdc26b5be7696f4e7863adb9fbe5f0ed5 
  core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala ba1e48e4300c9fb32e36e7266cb05294f2a481e5 

Diff: https://reviews.apache.org/r/27391/diff/


Testing
-------


Thanks,

Guozhang Wang


Re: Review Request 27391: Fix KAFKA-1634

Posted by Guozhang Wang <wa...@gmail.com>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/27391/
-----------------------------------------------------------

(Updated Jan. 22, 2015, 12:43 a.m.)


Review request for kafka.


Bugs: KAFKA-1634
    https://issues.apache.org/jira/browse/KAFKA-1634


Repository: kafka


Description (updated)
-------

Incorporated Joel's comments


Diffs (updated)
-----

  clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java 7517b879866fc5dad5f8d8ad30636da8bbe7784a 
  clients/src/main/java/org/apache/kafka/common/protocol/types/Struct.java 121e880a941fcd3e6392859edba11a94236494cc 
  clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java 3ee5cbad55ce836fd04bb954dcf6ef2f9bc3288f 
  clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java df37fc6d8f0db0b8192a948426af603be3444da4 
  core/src/main/scala/kafka/api/OffsetCommitRequest.scala 050615c72efe7dbaa4634f53943bd73273d20ffb 
  core/src/main/scala/kafka/api/OffsetFetchRequest.scala c7604b9cdeb8f46507f04ed7d35fcb3d5f150713 
  core/src/main/scala/kafka/common/OffsetMetadataAndError.scala 4cabffeacea09a49913505db19a96a55d58c0909 
  core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala 191a8677444e53b043e9ad6e94c5a9191c32599e 
  core/src/main/scala/kafka/server/KafkaApis.scala ec8d9f7ba44741db40875458bd524c4062ad6a26 
  core/src/main/scala/kafka/server/KafkaConfig.scala 6d74983472249eac808d361344c58cc2858ec971 
  core/src/main/scala/kafka/server/KafkaServer.scala 89200da30a04943f0b9befe84ab17e62b747c8c4 
  core/src/main/scala/kafka/server/OffsetManager.scala 0bdd42fea931cddd072c0fff765b10526db6840a 
  core/src/main/scala/kafka/server/ReplicaManager.scala e58fbb922e93b0c31dff04f187fcadb4ece986d7 
  core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala cd16ced5465d098be7a60498326b2a98c248f343 
  core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala 5b93239cdc26b5be7696f4e7863adb9fbe5f0ed5 
  core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala ba1e48e4300c9fb32e36e7266cb05294f2a481e5 

Diff: https://reviews.apache.org/r/27391/diff/


Testing
-------


Thanks,

Guozhang Wang


Re: Review Request 27391: Fix KAFKA-1634

Posted by Guozhang Wang <wa...@gmail.com>.

> On Jan. 20, 2015, 4:35 p.m., Joel Koshy wrote:
> > core/src/main/scala/kafka/server/KafkaApis.scala, line 147
> > <https://reviews.apache.org/r/27391/diff/8/?file=822017#file822017line147>
> >
> >     I just realized that if we have a v0 or v1 request then we use the offset manager default retention which is one day.
> >     
> >     However, if it is v2 and the user does not override it in the offset commit request, then the retention defaults to Long.MaxValue. I think that default makes sense for OffsetCommitRequest. However, I think the broker needs to protect itself and have an upper threshold for retention. i.e., maybe we should have a maxRetentionMs config in the broker.
> >     
> >     What do you think?

Agreed, I change the behavior to be "use the default value if it is < v2 or if the retention period is default value (meaning user did not specify it)".


- Guozhang


-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/27391/#review68729
-----------------------------------------------------------


On Jan. 14, 2015, 11:50 p.m., Guozhang Wang wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/27391/
> -----------------------------------------------------------
> 
> (Updated Jan. 14, 2015, 11:50 p.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1634
>     https://issues.apache.org/jira/browse/KAFKA-1634
> 
> 
> Repository: kafka
> 
> 
> Description
> -------
> 
> Incorporated Joel and Jun's comments
> 
> 
> Diffs
> -----
> 
>   clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java 7517b879866fc5dad5f8d8ad30636da8bbe7784a 
>   clients/src/main/java/org/apache/kafka/common/protocol/types/Struct.java 121e880a941fcd3e6392859edba11a94236494cc 
>   clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java 3ee5cbad55ce836fd04bb954dcf6ef2f9bc3288f 
>   clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java df37fc6d8f0db0b8192a948426af603be3444da4 
>   core/src/main/scala/kafka/api/OffsetCommitRequest.scala 050615c72efe7dbaa4634f53943bd73273d20ffb 
>   core/src/main/scala/kafka/api/OffsetFetchRequest.scala c7604b9cdeb8f46507f04ed7d35fcb3d5f150713 
>   core/src/main/scala/kafka/common/OffsetMetadataAndError.scala 4cabffeacea09a49913505db19a96a55d58c0909 
>   core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala 191a8677444e53b043e9ad6e94c5a9191c32599e 
>   core/src/main/scala/kafka/server/KafkaApis.scala c011a1b79bd6c4e832fe7d097daacb0d647d1cd4 
>   core/src/main/scala/kafka/server/KafkaServer.scala a069eb9272c92ef62387304b60de1fe473d7ff49 
>   core/src/main/scala/kafka/server/OffsetManager.scala 3c79428962604800983415f6f705e04f52acb8fb 
>   core/src/main/scala/kafka/server/ReplicaManager.scala e58fbb922e93b0c31dff04f187fcadb4ece986d7 
>   core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala cd16ced5465d098be7a60498326b2a98c248f343 
>   core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala 4a3a5b264a021e55c39f4d7424ce04ee591503ef 
>   core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala ba1e48e4300c9fb32e36e7266cb05294f2a481e5 
> 
> Diff: https://reviews.apache.org/r/27391/diff/
> 
> 
> Testing
> -------
> 
> 
> Thanks,
> 
> Guozhang Wang
> 
>


Re: Review Request 27391: Fix KAFKA-1634

Posted by Joel Koshy <jj...@gmail.com>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/27391/#review68729
-----------------------------------------------------------



core/src/main/scala/kafka/server/KafkaApis.scala
<https://reviews.apache.org/r/27391/#comment113113>

    I just realized that if we have a v0 or v1 request then we use the offset manager default retention which is one day.
    
    However, if it is v2 and the user does not override it in the offset commit request, then the retention defaults to Long.MaxValue. I think that default makes sense for OffsetCommitRequest. However, I think the broker needs to protect itself and have an upper threshold for retention. i.e., maybe we should have a maxRetentionMs config in the broker.
    
    What do you think?



core/src/main/scala/kafka/server/KafkaApis.scala
<https://reviews.apache.org/r/27391/#comment113111>

    if it is _after_ v2



core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala
<https://reviews.apache.org/r/27391/#comment113114>

    This file needs to be rebased.


- Joel Koshy


On Jan. 14, 2015, 11:50 p.m., Guozhang Wang wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/27391/
> -----------------------------------------------------------
> 
> (Updated Jan. 14, 2015, 11:50 p.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1634
>     https://issues.apache.org/jira/browse/KAFKA-1634
> 
> 
> Repository: kafka
> 
> 
> Description
> -------
> 
> Incorporated Joel and Jun's comments
> 
> 
> Diffs
> -----
> 
>   clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java 7517b879866fc5dad5f8d8ad30636da8bbe7784a 
>   clients/src/main/java/org/apache/kafka/common/protocol/types/Struct.java 121e880a941fcd3e6392859edba11a94236494cc 
>   clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java 3ee5cbad55ce836fd04bb954dcf6ef2f9bc3288f 
>   clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java df37fc6d8f0db0b8192a948426af603be3444da4 
>   core/src/main/scala/kafka/api/OffsetCommitRequest.scala 050615c72efe7dbaa4634f53943bd73273d20ffb 
>   core/src/main/scala/kafka/api/OffsetFetchRequest.scala c7604b9cdeb8f46507f04ed7d35fcb3d5f150713 
>   core/src/main/scala/kafka/common/OffsetMetadataAndError.scala 4cabffeacea09a49913505db19a96a55d58c0909 
>   core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala 191a8677444e53b043e9ad6e94c5a9191c32599e 
>   core/src/main/scala/kafka/server/KafkaApis.scala c011a1b79bd6c4e832fe7d097daacb0d647d1cd4 
>   core/src/main/scala/kafka/server/KafkaServer.scala a069eb9272c92ef62387304b60de1fe473d7ff49 
>   core/src/main/scala/kafka/server/OffsetManager.scala 3c79428962604800983415f6f705e04f52acb8fb 
>   core/src/main/scala/kafka/server/ReplicaManager.scala e58fbb922e93b0c31dff04f187fcadb4ece986d7 
>   core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala cd16ced5465d098be7a60498326b2a98c248f343 
>   core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala 4a3a5b264a021e55c39f4d7424ce04ee591503ef 
>   core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala ba1e48e4300c9fb32e36e7266cb05294f2a481e5 
> 
> Diff: https://reviews.apache.org/r/27391/diff/
> 
> 
> Testing
> -------
> 
> 
> Thanks,
> 
> Guozhang Wang
> 
>


Re: Review Request 27391: Fix KAFKA-1634

Posted by Guozhang Wang <wa...@gmail.com>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/27391/
-----------------------------------------------------------

(Updated Jan. 14, 2015, 11:50 p.m.)


Review request for kafka.


Bugs: KAFKA-1634
    https://issues.apache.org/jira/browse/KAFKA-1634


Repository: kafka


Description (updated)
-------

Incorporated Joel and Jun's comments


Diffs (updated)
-----

  clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java 7517b879866fc5dad5f8d8ad30636da8bbe7784a 
  clients/src/main/java/org/apache/kafka/common/protocol/types/Struct.java 121e880a941fcd3e6392859edba11a94236494cc 
  clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java 3ee5cbad55ce836fd04bb954dcf6ef2f9bc3288f 
  clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java df37fc6d8f0db0b8192a948426af603be3444da4 
  core/src/main/scala/kafka/api/OffsetCommitRequest.scala 050615c72efe7dbaa4634f53943bd73273d20ffb 
  core/src/main/scala/kafka/api/OffsetFetchRequest.scala c7604b9cdeb8f46507f04ed7d35fcb3d5f150713 
  core/src/main/scala/kafka/common/OffsetMetadataAndError.scala 4cabffeacea09a49913505db19a96a55d58c0909 
  core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala 191a8677444e53b043e9ad6e94c5a9191c32599e 
  core/src/main/scala/kafka/server/KafkaApis.scala c011a1b79bd6c4e832fe7d097daacb0d647d1cd4 
  core/src/main/scala/kafka/server/KafkaServer.scala a069eb9272c92ef62387304b60de1fe473d7ff49 
  core/src/main/scala/kafka/server/OffsetManager.scala 3c79428962604800983415f6f705e04f52acb8fb 
  core/src/main/scala/kafka/server/ReplicaManager.scala e58fbb922e93b0c31dff04f187fcadb4ece986d7 
  core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala cd16ced5465d098be7a60498326b2a98c248f343 
  core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala 4a3a5b264a021e55c39f4d7424ce04ee591503ef 
  core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala ba1e48e4300c9fb32e36e7266cb05294f2a481e5 

Diff: https://reviews.apache.org/r/27391/diff/


Testing
-------


Thanks,

Guozhang Wang


Re: Review Request 27391: Fix KAFKA-1634

Posted by Guozhang Wang <wa...@gmail.com>.

> On Dec. 18, 2014, 8:42 a.m., Joel Koshy wrote:
> > core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala, line 224
> > <https://reviews.apache.org/r/27391/diff/7/?file=779893#file779893line224>
> >
> >     If the offset in fact did expire, the assertion itself won't fail - i.e., you will get a NoSuchElementException
> >     
> >     Same comments apply to checks below.

Not sure I understand this: When the offset expire the server will return an error code with offset set to -1 upon receiving offset fetch request.


- Guozhang


-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/27391/#review65462
-----------------------------------------------------------


On Dec. 2, 2014, 2:03 a.m., Guozhang Wang wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/27391/
> -----------------------------------------------------------
> 
> (Updated Dec. 2, 2014, 2:03 a.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1634
>     https://issues.apache.org/jira/browse/KAFKA-1634
> 
> 
> Repository: kafka
> 
> 
> Description
> -------
> 
> Add another api in offset manager to return the struct, and the cache layer will only read its expiration timestamp while the offset formatter will read the struct as a whole
> 
> 
> Diffs
> -----
> 
>   clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java 7517b879866fc5dad5f8d8ad30636da8bbe7784a 
>   clients/src/main/java/org/apache/kafka/common/protocol/types/Struct.java 121e880a941fcd3e6392859edba11a94236494cc 
>   clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java 3ee5cbad55ce836fd04bb954dcf6ef2f9bc3288f 
>   clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java df37fc6d8f0db0b8192a948426af603be3444da4 
>   core/src/main/scala/kafka/api/OffsetCommitRequest.scala 050615c72efe7dbaa4634f53943bd73273d20ffb 
>   core/src/main/scala/kafka/api/OffsetFetchRequest.scala c7604b9cdeb8f46507f04ed7d35fcb3d5f150713 
>   core/src/main/scala/kafka/common/OffsetMetadataAndError.scala 4cabffeacea09a49913505db19a96a55d58c0909 
>   core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala da29a8cb461099eb675161db2f11a9937424a5c6 
>   core/src/main/scala/kafka/server/KafkaApis.scala 2a1c0326b6e6966d8b8254bd6a1cb83ad98a3b80 
>   core/src/main/scala/kafka/server/KafkaServer.scala 1bf7d10cef23a77e716666eb16bf6d0e68bc4ebe 
>   core/src/main/scala/kafka/server/OffsetManager.scala 3c79428962604800983415f6f705e04f52acb8fb 
>   core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala cd16ced5465d098be7a60498326b2a98c248f343 
>   core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala 8c5364fa97da1be09973c176d1baeb339455d319 
> 
> Diff: https://reviews.apache.org/r/27391/diff/
> 
> 
> Testing
> -------
> 
> 
> Thanks,
> 
> Guozhang Wang
> 
>


Re: Review Request 27391: Fix KAFKA-1634

Posted by Joel Koshy <jj...@gmail.com>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/27391/#review65462
-----------------------------------------------------------



clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java
<https://reviews.apache.org/r/27391/#comment108626>

    can you add a comment: // only v0, v1 of offsetcommitrequest



clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java
<https://reviews.apache.org/r/27391/#comment108637>

    Not introduced by your patch, but it is odd that these are named topicResponseObj and partitionResponse below - probably an artifact of copy/paste. Can you do a rename here before checking in?



core/src/main/scala/kafka/common/OffsetMetadataAndError.scala
<https://reviews.apache.org/r/27391/#comment108638>

    I think we discussed before that timestamp does not need to be a var. We can use the case class copy method to make a copy + edit.



core/src/main/scala/kafka/server/KafkaServer.scala
<https://reviews.apache.org/r/27391/#comment108642>

    Thanks for fixing this



core/src/main/scala/kafka/server/OffsetManager.scala
<https://reviews.apache.org/r/27391/#comment108643>

    I think it would be better to move this to just before the call to offsetCommitValue in the loop in line 228. This method should only be responsible for taking the offsetAndMetadata and converting that into the on-disk bytes and should not concern itself with setting a critical field like the expiration timestamp. I was actually looking for where this happens (i.e., setting the expiration time) and took me a while to realize it was hidden in here.



core/src/main/scala/kafka/server/OffsetManager.scala
<https://reviews.apache.org/r/27391/#comment108644>

    I think we can make this and some other methods here private.



core/src/main/scala/kafka/server/OffsetManager.scala
<https://reviews.apache.org/r/27391/#comment108645>

    private



core/src/main/scala/kafka/server/OffsetManager.scala
<https://reviews.apache.org/r/27391/#comment108646>

    Also, let us use a case class instead of a tuple



core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala
<https://reviews.apache.org/r/27391/#comment108655>

    Rather than sleep, we should improve OffsetManager to take in a MockScheduler instance - we can pass through the time instance from KafkaServer to offsetManager as we do for LogManager and replicaManager. That way we can advance time with MockTime. This test will need to change from OffsetCommitTest to OffsetManagerTest and we will just test the OffsetManager. Can you file a jira for that? Although that would make sense only after you check this in.



core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala
<https://reviews.apache.org/r/27391/#comment108647>

    If the offset in fact did expire, the assertion itself won't fail - i.e., you will get a NoSuchElementException
    
    Same comments apply to checks below.


- Joel Koshy


On Dec. 2, 2014, 2:03 a.m., Guozhang Wang wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/27391/
> -----------------------------------------------------------
> 
> (Updated Dec. 2, 2014, 2:03 a.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1634
>     https://issues.apache.org/jira/browse/KAFKA-1634
> 
> 
> Repository: kafka
> 
> 
> Description
> -------
> 
> Add another api in offset manager to return the struct, and the cache layer will only read its expiration timestamp while the offset formatter will read the struct as a whole
> 
> 
> Diffs
> -----
> 
>   clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java 7517b879866fc5dad5f8d8ad30636da8bbe7784a 
>   clients/src/main/java/org/apache/kafka/common/protocol/types/Struct.java 121e880a941fcd3e6392859edba11a94236494cc 
>   clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java 3ee5cbad55ce836fd04bb954dcf6ef2f9bc3288f 
>   clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java df37fc6d8f0db0b8192a948426af603be3444da4 
>   core/src/main/scala/kafka/api/OffsetCommitRequest.scala 050615c72efe7dbaa4634f53943bd73273d20ffb 
>   core/src/main/scala/kafka/api/OffsetFetchRequest.scala c7604b9cdeb8f46507f04ed7d35fcb3d5f150713 
>   core/src/main/scala/kafka/common/OffsetMetadataAndError.scala 4cabffeacea09a49913505db19a96a55d58c0909 
>   core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala da29a8cb461099eb675161db2f11a9937424a5c6 
>   core/src/main/scala/kafka/server/KafkaApis.scala 2a1c0326b6e6966d8b8254bd6a1cb83ad98a3b80 
>   core/src/main/scala/kafka/server/KafkaServer.scala 1bf7d10cef23a77e716666eb16bf6d0e68bc4ebe 
>   core/src/main/scala/kafka/server/OffsetManager.scala 3c79428962604800983415f6f705e04f52acb8fb 
>   core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala cd16ced5465d098be7a60498326b2a98c248f343 
>   core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala 8c5364fa97da1be09973c176d1baeb339455d319 
> 
> Diff: https://reviews.apache.org/r/27391/diff/
> 
> 
> Testing
> -------
> 
> 
> Thanks,
> 
> Guozhang Wang
> 
>


Re: Review Request 27391: Fix KAFKA-1634

Posted by Guozhang Wang <wa...@gmail.com>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/27391/
-----------------------------------------------------------

(Updated Dec. 2, 2014, 2:03 a.m.)


Review request for kafka.


Bugs: KAFKA-1634
    https://issues.apache.org/jira/browse/KAFKA-1634


Repository: kafka


Description
-------

Add another api in offset manager to return the struct, and the cache layer will only read its expiration timestamp while the offset formatter will read the struct as a whole


Diffs (updated)
-----

  clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java 7517b879866fc5dad5f8d8ad30636da8bbe7784a 
  clients/src/main/java/org/apache/kafka/common/protocol/types/Struct.java 121e880a941fcd3e6392859edba11a94236494cc 
  clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java 3ee5cbad55ce836fd04bb954dcf6ef2f9bc3288f 
  clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java df37fc6d8f0db0b8192a948426af603be3444da4 
  core/src/main/scala/kafka/api/OffsetCommitRequest.scala 050615c72efe7dbaa4634f53943bd73273d20ffb 
  core/src/main/scala/kafka/api/OffsetFetchRequest.scala c7604b9cdeb8f46507f04ed7d35fcb3d5f150713 
  core/src/main/scala/kafka/common/OffsetMetadataAndError.scala 4cabffeacea09a49913505db19a96a55d58c0909 
  core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala da29a8cb461099eb675161db2f11a9937424a5c6 
  core/src/main/scala/kafka/server/KafkaApis.scala 2a1c0326b6e6966d8b8254bd6a1cb83ad98a3b80 
  core/src/main/scala/kafka/server/KafkaServer.scala 1bf7d10cef23a77e716666eb16bf6d0e68bc4ebe 
  core/src/main/scala/kafka/server/OffsetManager.scala 3c79428962604800983415f6f705e04f52acb8fb 
  core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala cd16ced5465d098be7a60498326b2a98c248f343 
  core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala 8c5364fa97da1be09973c176d1baeb339455d319 

Diff: https://reviews.apache.org/r/27391/diff/


Testing
-------


Thanks,

Guozhang Wang