You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@samza.apache.org by Shanthoosh Venkataraman <sa...@gmail.com> on 2016/10/03 17:00:50 UTC

Review Request 52476: Do not load task store which are older than delete tombstones.

-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/52476/
-----------------------------------------------------------

Review request for samza.


Repository: samza


Description
-------

Every local task store is backed up by a kafka changelog topic. Due to log compaction, delete tombstones of the changelog topic have a ttl of delete.retention.ms. Replaying the events from the changelog that has missing delete tombstones, would result in creation of an inconsistent local store(due to the missing of some delete events). This patch deletes the local stores in which difference between current time and last modified time of the offset file is greater than delete.retention.ms during the container startup.


Diffs
-----

  samza-core/src/main/java/org/apache/samza/storage/StorageRecovery.java 9329edf7d724f3a0d9235354bb77936f713e3b5f 
  samza-core/src/main/scala/org/apache/samza/config/StorageConfig.scala be3f1068f7921c9c8f8697577efea6580672813e 
  samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala 05a996c98075ea8ed3767af666b9beeb1933f2a6 
  samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala 0b7bcdda1639eea8239a69c31bdf42558e9077d2 
  samza-core/src/test/scala/org/apache/samza/storage/TestTaskStorageManager.scala 4d40f520e54beb643acd8410c772b75e2f6a9162 

Diff: https://reviews.apache.org/r/52476/diff/


Testing
-------

Unit testing and manual testing has been done to verify the functionality.


Thanks,

Shanthoosh Venkataraman


Re: Review Request 52476: Do not load task store which are older than delete tombstones.

Posted by Shanthoosh Venkataraman <sa...@gmail.com>.

> On Oct. 4, 2016, 1:57 a.m., Jake Maes wrote:
> > samza-core/src/test/scala/org/apache/samza/storage/TestTaskStorageManager.scala, line 293
> > <https://reviews.apache.org/r/52476/diff/1/?file=1518377#file1518377line293>
> >
> >     2 nits:
> >     1. Can you swap this test with the next one in terms of position? The tests above and below this one are related, so this one breaks them up, which just adds cognitive load for the reader.
> >     2. I'm all for descriptive names, but this is almost un-tweet-able. :-) Could it be shortened to: testStoreDeletedWhenOffsetFileOlderThanDeleteRetention()

Fixed.


- Shanthoosh


-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/52476/#review151273
-----------------------------------------------------------


On Oct. 3, 2016, 5 p.m., Shanthoosh Venkataraman wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/52476/
> -----------------------------------------------------------
> 
> (Updated Oct. 3, 2016, 5 p.m.)
> 
> 
> Review request for samza.
> 
> 
> Repository: samza
> 
> 
> Description
> -------
> 
> Every local task store is backed up by a kafka changelog topic. Due to log compaction, delete tombstones of the changelog topic have a ttl of delete.retention.ms. Replaying the events from the changelog that has missing delete tombstones, would result in creation of an inconsistent local store(due to the missing of some delete events). This patch deletes the local stores in which difference between current time and last modified time of the offset file is greater than delete.retention.ms during the container startup.
> 
> 
> Diffs
> -----
> 
>   samza-core/src/main/java/org/apache/samza/storage/StorageRecovery.java 9329edf7d724f3a0d9235354bb77936f713e3b5f 
>   samza-core/src/main/scala/org/apache/samza/config/StorageConfig.scala be3f1068f7921c9c8f8697577efea6580672813e 
>   samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala 05a996c98075ea8ed3767af666b9beeb1933f2a6 
>   samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala 0b7bcdda1639eea8239a69c31bdf42558e9077d2 
>   samza-core/src/test/scala/org/apache/samza/storage/TestTaskStorageManager.scala 4d40f520e54beb643acd8410c772b75e2f6a9162 
> 
> Diff: https://reviews.apache.org/r/52476/diff/
> 
> 
> Testing
> -------
> 
> Unit testing and manual testing has been done to verify the functionality.
> 
> 
> Thanks,
> 
> Shanthoosh Venkataraman
> 
>


Re: Review Request 52476: Do not load task store which are older than delete tombstones.

Posted by Jake Maes <jm...@apache.org>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/52476/#review151273
-----------------------------------------------------------



Looks good. Just a couple things below.


samza-core/src/main/scala/org/apache/samza/config/StorageConfig.scala (line 32)
<https://reviews.apache.org/r/52476/#comment219623>

    Typo: deletion.retention.ms is not a valid property. 
    
    http://kafka.apache.org/documentation.html#configuration



samza-core/src/test/scala/org/apache/samza/storage/TestTaskStorageManager.scala (line 293)
<https://reviews.apache.org/r/52476/#comment219633>

    2 nits:
    1. Can you swap this test with the next one in terms of position? The tests above and below this one are related, so this one breaks them up, which just adds cognitive load for the reader.
    2. I'm all for descriptive names, but this is almost un-tweet-able. :-) Could it be shortened to: testStoreDeletedWhenOffsetFileOlderThanDeleteRetention()


- Jake Maes


On Oct. 3, 2016, 5 p.m., Shanthoosh Venkataraman wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/52476/
> -----------------------------------------------------------
> 
> (Updated Oct. 3, 2016, 5 p.m.)
> 
> 
> Review request for samza.
> 
> 
> Repository: samza
> 
> 
> Description
> -------
> 
> Every local task store is backed up by a kafka changelog topic. Due to log compaction, delete tombstones of the changelog topic have a ttl of delete.retention.ms. Replaying the events from the changelog that has missing delete tombstones, would result in creation of an inconsistent local store(due to the missing of some delete events). This patch deletes the local stores in which difference between current time and last modified time of the offset file is greater than delete.retention.ms during the container startup.
> 
> 
> Diffs
> -----
> 
>   samza-core/src/main/java/org/apache/samza/storage/StorageRecovery.java 9329edf7d724f3a0d9235354bb77936f713e3b5f 
>   samza-core/src/main/scala/org/apache/samza/config/StorageConfig.scala be3f1068f7921c9c8f8697577efea6580672813e 
>   samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala 05a996c98075ea8ed3767af666b9beeb1933f2a6 
>   samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala 0b7bcdda1639eea8239a69c31bdf42558e9077d2 
>   samza-core/src/test/scala/org/apache/samza/storage/TestTaskStorageManager.scala 4d40f520e54beb643acd8410c772b75e2f6a9162 
> 
> Diff: https://reviews.apache.org/r/52476/diff/
> 
> 
> Testing
> -------
> 
> Unit testing and manual testing has been done to verify the functionality.
> 
> 
> Thanks,
> 
> Shanthoosh Venkataraman
> 
>


Re: Review Request 52476: Do not load task store which are older than delete tombstones.

Posted by Boris Shkolnik <bo...@apache.org>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/52476/#review153476
-----------------------------------------------------------


Ship it!




- Boris Shkolnik


On Oct. 19, 2016, 10:04 p.m., Shanthoosh Venkataraman wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/52476/
> -----------------------------------------------------------
> 
> (Updated Oct. 19, 2016, 10:04 p.m.)
> 
> 
> Review request for samza.
> 
> 
> Repository: samza
> 
> 
> Description
> -------
> 
> Every local task store is backed up by a kafka changelog topic. Due to log compaction, delete tombstones of the changelog topic have a ttl of delete.retention.ms. Replaying the events from the changelog that has missing delete tombstones, would result in creation of an inconsistent local store(due to the missing of some delete events). This patch deletes the local stores in which difference between current time and last modified time of the offset file is greater than delete.retention.ms during the container startup.
> 
> 
> Diffs
> -----
> 
>   samza-core/src/main/java/org/apache/samza/storage/StorageRecovery.java 9329edf7d724f3a0d9235354bb77936f713e3b5f 
>   samza-core/src/main/scala/org/apache/samza/config/StorageConfig.scala be3f1068f7921c9c8f8697577efea6580672813e 
>   samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala 05a996c98075ea8ed3767af666b9beeb1933f2a6 
>   samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala 0b7bcdda1639eea8239a69c31bdf42558e9077d2 
>   samza-core/src/test/scala/org/apache/samza/storage/TestTaskStorageManager.scala 4d40f520e54beb643acd8410c772b75e2f6a9162 
>   samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala 973ab8cfb3d248bec7efe5e338f5e667f097556d 
> 
> Diff: https://reviews.apache.org/r/52476/diff/
> 
> 
> Testing
> -------
> 
> Unit testing and manual testing has been done to verify the functionality.
> 
> 
> Thanks,
> 
> Shanthoosh Venkataraman
> 
>


Re: Review Request 52476: Do not load task store which are older than delete tombstones.

Posted by Shanthoosh Venkataraman <sa...@gmail.com>.

> On Oct. 21, 2016, 12:45 a.m., Prateek Maheshwari wrote:
> > samza-core/src/main/java/org/apache/samza/storage/StorageRecovery.java, line 254
> > <https://reviews.apache.org/r/52476/diff/4/?file=1541854#file1541854line254>
> >
> >     Does jobConfig.getChangeLog...() (implicit conversion) not work?

No, the implicit conversion doesn't work here. The convenience method is part of StorageConfig class.


> On Oct. 21, 2016, 12:45 a.m., Prateek Maheshwari wrote:
> > samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala, line 130
> > <https://reviews.apache.org/r/52476/diff/4/?file=1541857#file1541857line130>
> >
> >     First %s should be store name. Add another %s at the end for loggedStoreDir.

This log message belongs to the task store, which would in itself contain the store name. Adding store name here is unnecessary.


> On Oct. 21, 2016, 12:45 a.m., Prateek Maheshwari wrote:
> > samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala, line 144
> > <https://reviews.apache.org/r/52476/diff/4/?file=1541859#file1541859line144>
> >
> >     implicit conversion should probably work.

No, implicit conversion doesn't work here, that is the reason for creating the object explicitly.


> On Oct. 21, 2016, 12:45 a.m., Prateek Maheshwari wrote:
> > samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala, line 186
> > <https://reviews.apache.org/r/52476/diff/4/?file=1541857#file1541857line186>
> >
> >     s/partition/logged storage partition to be consistent with next message.

Done.


> On Oct. 21, 2016, 12:45 a.m., Prateek Maheshwari wrote:
> > samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala, line 133
> > <https://reviews.apache.org/r/52476/diff/4/?file=1541857#file1541857line133>
> >
> >     Log both last modified time and delete retention ms values too.

Done.


> On Oct. 21, 2016, 12:45 a.m., Prateek Maheshwari wrote:
> > samza-core/src/main/scala/org/apache/samza/config/StorageConfig.scala, line 57
> > <https://reviews.apache.org/r/52476/diff/4/?file=1541855#file1541855line57>
> >
> >     getChangeLogDeleteRetentionsInMs

Done.


- Shanthoosh


-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/52476/#review153478
-----------------------------------------------------------


On Oct. 22, 2016, 10:06 p.m., Shanthoosh Venkataraman wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/52476/
> -----------------------------------------------------------
> 
> (Updated Oct. 22, 2016, 10:06 p.m.)
> 
> 
> Review request for samza.
> 
> 
> Repository: samza
> 
> 
> Description
> -------
> 
> Every local task store is backed up by a kafka changelog topic. Due to log compaction, delete tombstones of the changelog topic have a ttl of delete.retention.ms. Replaying the events from the changelog that has missing delete tombstones, would result in creation of an inconsistent local store(due to the missing of some delete events). This patch deletes the local stores in which difference between current time and last modified time of the offset file is greater than delete.retention.ms during the container startup.
> 
> 
> Diffs
> -----
> 
>   samza-core/src/main/java/org/apache/samza/storage/StorageRecovery.java 9329edf7d724f3a0d9235354bb77936f713e3b5f 
>   samza-core/src/main/scala/org/apache/samza/config/StorageConfig.scala be3f1068f7921c9c8f8697577efea6580672813e 
>   samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala 05a996c98075ea8ed3767af666b9beeb1933f2a6 
>   samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala 0b7bcdda1639eea8239a69c31bdf42558e9077d2 
>   samza-core/src/test/scala/org/apache/samza/storage/TestTaskStorageManager.scala 4d40f520e54beb643acd8410c772b75e2f6a9162 
>   samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala 973ab8cfb3d248bec7efe5e338f5e667f097556d 
> 
> Diff: https://reviews.apache.org/r/52476/diff/
> 
> 
> Testing
> -------
> 
> Unit testing and manual testing has been done to verify the functionality.
> 
> 
> Thanks,
> 
> Shanthoosh Venkataraman
> 
>


Re: Review Request 52476: Do not load task store which are older than delete tombstones.

Posted by Prateek Maheshwari <pm...@linkedin.com>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/52476/#review153478
-----------------------------------------------------------



Looks pretty good, few final comments.


samza-core/src/main/java/org/apache/samza/storage/StorageRecovery.java (line 253)
<https://reviews.apache.org/r/52476/#comment222805>

    Does jobConfig.getChangeLog...() (implicit conversion) not work?



samza-core/src/main/scala/org/apache/samza/config/StorageConfig.scala (line 57)
<https://reviews.apache.org/r/52476/#comment222811>

    getChangeLogDeleteRetentionsInMs



samza-core/src/main/scala/org/apache/samza/config/StorageConfig.scala (line 58)
<https://reviews.apache.org/r/52476/#comment222812>

    See if you can use the named operator instead of the symbolic operator.
    
    I think you might be able to .toMap on the list of pairs.



samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala (line 104)
<https://reviews.apache.org/r/52476/#comment222827>

    Indent by 4 (or whatever continuation indent is)



samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala (line 106)
<https://reviews.apache.org/r/52476/#comment222821>

    If loggedStoreDir isn't present we put null into fileOffset. If that's the expected behavior, let's log at info in isStateLoggedStore if (!loggedStoreDir.exists())



samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala (line 108)
<https://reviews.apache.org/r/52476/#comment222815>

    Misleading comment, could be the other condition too.



samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala (line 111)
<https://reviews.apache.org/r/52476/#comment222818>

    Will be useful to log at info the read file offset  here (or in readOffsetFile)



samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala (line 112)
<https://reviews.apache.org/r/52476/#comment222813>

    Indent by 4 (or whatever continuation indent is)



samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala (line 118)
<https://reviews.apache.org/r/52476/#comment222819>

    Add method description.



samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala (line 130)
<https://reviews.apache.org/r/52476/#comment222824>

    First %s should be store name. Add another %s at the end for loggedStoreDir.



samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala (line 133)
<https://reviews.apache.org/r/52476/#comment222822>

    log both last modified time and delete retention ms values too.



samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala (line 133)
<https://reviews.apache.org/r/52476/#comment222823>

    Log both last modified time and delete retention ms values too.



samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala (line 182)
<https://reviews.apache.org/r/52476/#comment222826>

    s/partition/logged storage partition to be consistent with next message.



samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala (line 144)
<https://reviews.apache.org/r/52476/#comment222830>

    implicit conversion should probably work.


- Prateek Maheshwari


On Oct. 19, 2016, 3:04 p.m., Shanthoosh Venkataraman wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/52476/
> -----------------------------------------------------------
> 
> (Updated Oct. 19, 2016, 3:04 p.m.)
> 
> 
> Review request for samza.
> 
> 
> Repository: samza
> 
> 
> Description
> -------
> 
> Every local task store is backed up by a kafka changelog topic. Due to log compaction, delete tombstones of the changelog topic have a ttl of delete.retention.ms. Replaying the events from the changelog that has missing delete tombstones, would result in creation of an inconsistent local store(due to the missing of some delete events). This patch deletes the local stores in which difference between current time and last modified time of the offset file is greater than delete.retention.ms during the container startup.
> 
> 
> Diffs
> -----
> 
>   samza-core/src/main/java/org/apache/samza/storage/StorageRecovery.java 9329edf7d724f3a0d9235354bb77936f713e3b5f 
>   samza-core/src/main/scala/org/apache/samza/config/StorageConfig.scala be3f1068f7921c9c8f8697577efea6580672813e 
>   samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala 05a996c98075ea8ed3767af666b9beeb1933f2a6 
>   samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala 0b7bcdda1639eea8239a69c31bdf42558e9077d2 
>   samza-core/src/test/scala/org/apache/samza/storage/TestTaskStorageManager.scala 4d40f520e54beb643acd8410c772b75e2f6a9162 
>   samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala 973ab8cfb3d248bec7efe5e338f5e667f097556d 
> 
> Diff: https://reviews.apache.org/r/52476/diff/
> 
> 
> Testing
> -------
> 
> Unit testing and manual testing has been done to verify the functionality.
> 
> 
> Thanks,
> 
> Shanthoosh Venkataraman
> 
>


Re: Review Request 52476: Do not load task store which are older than delete tombstones.

Posted by Shanthoosh Venkataraman <sa...@gmail.com>.

> On Oct. 20, 2016, 11:46 p.m., Boris Shkolnik wrote:
> > samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala, line 132
> > <https://reviews.apache.org/r/52476/diff/4/?file=1541857#file1541857line132>
> >
> >     nit. using System.currentTimeMillis directly makes it difficult to unit test.

Done. Injected SystemClock dependency as a parameter.


- Shanthoosh


-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/52476/#review153468
-----------------------------------------------------------


On Oct. 22, 2016, 10:06 p.m., Shanthoosh Venkataraman wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/52476/
> -----------------------------------------------------------
> 
> (Updated Oct. 22, 2016, 10:06 p.m.)
> 
> 
> Review request for samza.
> 
> 
> Repository: samza
> 
> 
> Description
> -------
> 
> Every local task store is backed up by a kafka changelog topic. Due to log compaction, delete tombstones of the changelog topic have a ttl of delete.retention.ms. Replaying the events from the changelog that has missing delete tombstones, would result in creation of an inconsistent local store(due to the missing of some delete events). This patch deletes the local stores in which difference between current time and last modified time of the offset file is greater than delete.retention.ms during the container startup.
> 
> 
> Diffs
> -----
> 
>   samza-core/src/main/java/org/apache/samza/storage/StorageRecovery.java 9329edf7d724f3a0d9235354bb77936f713e3b5f 
>   samza-core/src/main/scala/org/apache/samza/config/StorageConfig.scala be3f1068f7921c9c8f8697577efea6580672813e 
>   samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala 05a996c98075ea8ed3767af666b9beeb1933f2a6 
>   samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala 0b7bcdda1639eea8239a69c31bdf42558e9077d2 
>   samza-core/src/test/scala/org/apache/samza/storage/TestTaskStorageManager.scala 4d40f520e54beb643acd8410c772b75e2f6a9162 
>   samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala 973ab8cfb3d248bec7efe5e338f5e667f097556d 
> 
> Diff: https://reviews.apache.org/r/52476/diff/
> 
> 
> Testing
> -------
> 
> Unit testing and manual testing has been done to verify the functionality.
> 
> 
> Thanks,
> 
> Shanthoosh Venkataraman
> 
>


Re: Review Request 52476: Do not load task store which are older than delete tombstones.

Posted by Boris Shkolnik <bo...@apache.org>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/52476/#review153468
-----------------------------------------------------------




samza-core/src/main/scala/org/apache/samza/config/StorageConfig.scala (line 44)
<https://reviews.apache.org/r/52476/#comment222787>

    nit. can we call it storeName.



samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala (line 132)
<https://reviews.apache.org/r/52476/#comment222789>

    nit. using System.currentTimeMillis directly makes it difficult to unit test.



samza-core/src/test/scala/org/apache/samza/storage/TestTaskStorageManager.scala (line 323)
<https://reviews.apache.org/r/52476/#comment222800>

    isn't it easier just to make method package private?


- Boris Shkolnik


On Oct. 19, 2016, 10:04 p.m., Shanthoosh Venkataraman wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/52476/
> -----------------------------------------------------------
> 
> (Updated Oct. 19, 2016, 10:04 p.m.)
> 
> 
> Review request for samza.
> 
> 
> Repository: samza
> 
> 
> Description
> -------
> 
> Every local task store is backed up by a kafka changelog topic. Due to log compaction, delete tombstones of the changelog topic have a ttl of delete.retention.ms. Replaying the events from the changelog that has missing delete tombstones, would result in creation of an inconsistent local store(due to the missing of some delete events). This patch deletes the local stores in which difference between current time and last modified time of the offset file is greater than delete.retention.ms during the container startup.
> 
> 
> Diffs
> -----
> 
>   samza-core/src/main/java/org/apache/samza/storage/StorageRecovery.java 9329edf7d724f3a0d9235354bb77936f713e3b5f 
>   samza-core/src/main/scala/org/apache/samza/config/StorageConfig.scala be3f1068f7921c9c8f8697577efea6580672813e 
>   samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala 05a996c98075ea8ed3767af666b9beeb1933f2a6 
>   samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala 0b7bcdda1639eea8239a69c31bdf42558e9077d2 
>   samza-core/src/test/scala/org/apache/samza/storage/TestTaskStorageManager.scala 4d40f520e54beb643acd8410c772b75e2f6a9162 
>   samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala 973ab8cfb3d248bec7efe5e338f5e667f097556d 
> 
> Diff: https://reviews.apache.org/r/52476/diff/
> 
> 
> Testing
> -------
> 
> Unit testing and manual testing has been done to verify the functionality.
> 
> 
> Thanks,
> 
> Shanthoosh Venkataraman
> 
>


Re: Review Request 52476: Do not load task store which are older than delete tombstones.

Posted by Shanthoosh Venkataraman <sa...@gmail.com>.

> On Oct. 24, 2016, 9:43 p.m., Prateek Maheshwari wrote:
> > samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala, line 138
> > <https://reviews.apache.org/r/52476/diff/5/?file=1543710#file1543710line138>
> >
> >     s/is greater than/is older than.

Fixed.


> On Oct. 24, 2016, 9:43 p.m., Prateek Maheshwari wrote:
> > samza-core/src/main/scala/org/apache/samza/config/StorageConfig.scala, line 32
> > <https://reviews.apache.org/r/52476/diff/5/?file=1543708#file1543708line32>
> >
> >     s/CHANGE_LOG/CHANGELOG

Fixed.


- Shanthoosh


-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/52476/#review153741
-----------------------------------------------------------


On Oct. 22, 2016, 10:06 p.m., Shanthoosh Venkataraman wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/52476/
> -----------------------------------------------------------
> 
> (Updated Oct. 22, 2016, 10:06 p.m.)
> 
> 
> Review request for samza.
> 
> 
> Repository: samza
> 
> 
> Description
> -------
> 
> Every local task store is backed up by a kafka changelog topic. Due to log compaction, delete tombstones of the changelog topic have a ttl of delete.retention.ms. Replaying the events from the changelog that has missing delete tombstones, would result in creation of an inconsistent local store(due to the missing of some delete events). This patch deletes the local stores in which difference between current time and last modified time of the offset file is greater than delete.retention.ms during the container startup.
> 
> 
> Diffs
> -----
> 
>   samza-core/src/main/java/org/apache/samza/storage/StorageRecovery.java 9329edf7d724f3a0d9235354bb77936f713e3b5f 
>   samza-core/src/main/scala/org/apache/samza/config/StorageConfig.scala be3f1068f7921c9c8f8697577efea6580672813e 
>   samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala 05a996c98075ea8ed3767af666b9beeb1933f2a6 
>   samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala 0b7bcdda1639eea8239a69c31bdf42558e9077d2 
>   samza-core/src/test/scala/org/apache/samza/storage/TestTaskStorageManager.scala 4d40f520e54beb643acd8410c772b75e2f6a9162 
>   samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala 973ab8cfb3d248bec7efe5e338f5e667f097556d 
> 
> Diff: https://reviews.apache.org/r/52476/diff/
> 
> 
> Testing
> -------
> 
> Unit testing and manual testing has been done to verify the functionality.
> 
> 
> Thanks,
> 
> Shanthoosh Venkataraman
> 
>


Re: Review Request 52476: Do not load task store which are older than delete tombstones.

Posted by Prateek Maheshwari <pm...@linkedin.com>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/52476/#review153741
-----------------------------------------------------------


Ship it!




Looks good to me, thanks.


samza-core/src/main/scala/org/apache/samza/config/StorageConfig.scala (line 32)
<https://reviews.apache.org/r/52476/#comment223156>

    s/CHANGE_LOG/CHANGELOG



samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala (line 138)
<https://reviews.apache.org/r/52476/#comment223158>

    s/is greater than/is older than.


- Prateek Maheshwari


On Oct. 22, 2016, 3:06 p.m., Shanthoosh Venkataraman wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/52476/
> -----------------------------------------------------------
> 
> (Updated Oct. 22, 2016, 3:06 p.m.)
> 
> 
> Review request for samza.
> 
> 
> Repository: samza
> 
> 
> Description
> -------
> 
> Every local task store is backed up by a kafka changelog topic. Due to log compaction, delete tombstones of the changelog topic have a ttl of delete.retention.ms. Replaying the events from the changelog that has missing delete tombstones, would result in creation of an inconsistent local store(due to the missing of some delete events). This patch deletes the local stores in which difference between current time and last modified time of the offset file is greater than delete.retention.ms during the container startup.
> 
> 
> Diffs
> -----
> 
>   samza-core/src/main/java/org/apache/samza/storage/StorageRecovery.java 9329edf7d724f3a0d9235354bb77936f713e3b5f 
>   samza-core/src/main/scala/org/apache/samza/config/StorageConfig.scala be3f1068f7921c9c8f8697577efea6580672813e 
>   samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala 05a996c98075ea8ed3767af666b9beeb1933f2a6 
>   samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala 0b7bcdda1639eea8239a69c31bdf42558e9077d2 
>   samza-core/src/test/scala/org/apache/samza/storage/TestTaskStorageManager.scala 4d40f520e54beb643acd8410c772b75e2f6a9162 
>   samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala 973ab8cfb3d248bec7efe5e338f5e667f097556d 
> 
> Diff: https://reviews.apache.org/r/52476/diff/
> 
> 
> Testing
> -------
> 
> Unit testing and manual testing has been done to verify the functionality.
> 
> 
> Thanks,
> 
> Shanthoosh Venkataraman
> 
>


Re: Review Request 52476: Do not load task store which are older than delete tombstones.

Posted by Shanthoosh Venkataraman <sa...@gmail.com>.

> On Jan. 25, 2017, 10:28 p.m., Prateek Maheshwari wrote:
> > samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala, line 97
> > <https://reviews.apache.org/r/52476/diff/5/?file=1543710#file1543710line97>
> >
> >     "Got default storage ..."

Changed.


> On Jan. 25, 2017, 10:28 p.m., Prateek Maheshwari wrote:
> > samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala, line 137
> > <https://reviews.apache.org/r/52476/diff/5/?file=1543710#file1543710line137>
> >
> >     No space before ":" here and elsewhere, including method type annotations (e.g. line 169, 185).

Fixed.


> On Jan. 25, 2017, 10:28 p.m., Prateek Maheshwari wrote:
> > samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala, line 152
> > <https://reviews.apache.org/r/52476/diff/5/?file=1543710#file1543710line152>
> >
> >     We should log which directory we're using for the store here at INFO.

Added a log statement.


- Shanthoosh


-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/52476/#review163035
-----------------------------------------------------------


On Oct. 22, 2016, 10:06 p.m., Shanthoosh Venkataraman wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/52476/
> -----------------------------------------------------------
> 
> (Updated Oct. 22, 2016, 10:06 p.m.)
> 
> 
> Review request for samza.
> 
> 
> Repository: samza
> 
> 
> Description
> -------
> 
> Every local task store is backed up by a kafka changelog topic. Due to log compaction, delete tombstones of the changelog topic have a ttl of delete.retention.ms. Replaying the events from the changelog that has missing delete tombstones, would result in creation of an inconsistent local store(due to the missing of some delete events). This patch deletes the local stores in which difference between current time and last modified time of the offset file is greater than delete.retention.ms during the container startup.
> 
> 
> Diffs
> -----
> 
>   samza-core/src/main/java/org/apache/samza/storage/StorageRecovery.java 9329edf7d724f3a0d9235354bb77936f713e3b5f 
>   samza-core/src/main/scala/org/apache/samza/config/StorageConfig.scala be3f1068f7921c9c8f8697577efea6580672813e 
>   samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala 05a996c98075ea8ed3767af666b9beeb1933f2a6 
>   samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala 0b7bcdda1639eea8239a69c31bdf42558e9077d2 
>   samza-core/src/test/scala/org/apache/samza/storage/TestTaskStorageManager.scala 4d40f520e54beb643acd8410c772b75e2f6a9162 
>   samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala 973ab8cfb3d248bec7efe5e338f5e667f097556d 
> 
> Diff: https://reviews.apache.org/r/52476/diff/
> 
> 
> Testing
> -------
> 
> Unit testing and manual testing has been done to verify the functionality.
> 
> 
> Thanks,
> 
> Shanthoosh Venkataraman
> 
>


Re: Review Request 52476: Do not load task store which are older than delete tombstones.

Posted by Prateek Maheshwari <pm...@linkedin.com>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/52476/#review163035
-----------------------------------------------------------




samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala (line 97)
<https://reviews.apache.org/r/52476/#comment234449>

    "Got default storage ..."



samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala (line 137)
<https://reviews.apache.org/r/52476/#comment234451>

    No space before ":" here and elsewhere, including method type annotations (e.g. line 169, 185).



samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala (line 152)
<https://reviews.apache.org/r/52476/#comment234450>

    We should log which directory we're using for the store here at INFO.


- Prateek Maheshwari


On Oct. 22, 2016, 3:06 p.m., Shanthoosh Venkataraman wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/52476/
> -----------------------------------------------------------
> 
> (Updated Oct. 22, 2016, 3:06 p.m.)
> 
> 
> Review request for samza.
> 
> 
> Repository: samza
> 
> 
> Description
> -------
> 
> Every local task store is backed up by a kafka changelog topic. Due to log compaction, delete tombstones of the changelog topic have a ttl of delete.retention.ms. Replaying the events from the changelog that has missing delete tombstones, would result in creation of an inconsistent local store(due to the missing of some delete events). This patch deletes the local stores in which difference between current time and last modified time of the offset file is greater than delete.retention.ms during the container startup.
> 
> 
> Diffs
> -----
> 
>   samza-core/src/main/java/org/apache/samza/storage/StorageRecovery.java 9329edf7d724f3a0d9235354bb77936f713e3b5f 
>   samza-core/src/main/scala/org/apache/samza/config/StorageConfig.scala be3f1068f7921c9c8f8697577efea6580672813e 
>   samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala 05a996c98075ea8ed3767af666b9beeb1933f2a6 
>   samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala 0b7bcdda1639eea8239a69c31bdf42558e9077d2 
>   samza-core/src/test/scala/org/apache/samza/storage/TestTaskStorageManager.scala 4d40f520e54beb643acd8410c772b75e2f6a9162 
>   samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala 973ab8cfb3d248bec7efe5e338f5e667f097556d 
> 
> Diff: https://reviews.apache.org/r/52476/diff/
> 
> 
> Testing
> -------
> 
> Unit testing and manual testing has been done to verify the functionality.
> 
> 
> Thanks,
> 
> Shanthoosh Venkataraman
> 
>


Re: Review Request 52476: SAMZA-1083 : Do not load task store which are older than delete tombstones.

Posted by Shanthoosh Venkataraman <sa...@gmail.com>.

> On Feb. 8, 2017, 5:17 p.m., Jake Maes wrote:
> > samza-core/src/main/scala/org/apache/samza/config/StorageConfig.scala, line 33
> > <https://reviews.apache.org/r/52476/diff/5/?file=1543708#file1543708line33>
> >
> >     nit: TimeUnit.DAYS.toMillis(1)

Fixed.


- Shanthoosh


-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/52476/#review164701
-----------------------------------------------------------


On Feb. 8, 2017, 7:09 p.m., Shanthoosh Venkataraman wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/52476/
> -----------------------------------------------------------
> 
> (Updated Feb. 8, 2017, 7:09 p.m.)
> 
> 
> Review request for samza.
> 
> 
> Repository: samza
> 
> 
> Description
> -------
> 
> Every local task store is backed up by a kafka changelog topic. Due to log compaction, delete tombstones of the changelog topic have a ttl of delete.retention.ms. Replaying the events from the changelog that has missing delete tombstones, would result in creation of an inconsistent local store(due to the missing of some delete events). This patch deletes the local stores in which difference between current time and last modified time of the offset file is greater than delete.retention.ms during the container startup.
> 
> 
> Diffs
> -----
> 
>   samza-core/src/main/java/org/apache/samza/storage/StorageRecovery.java 9329edf7d724f3a0d9235354bb77936f713e3b5f 
>   samza-core/src/main/scala/org/apache/samza/config/StorageConfig.scala be3f1068f7921c9c8f8697577efea6580672813e 
>   samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala 05a996c98075ea8ed3767af666b9beeb1933f2a6 
>   samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala 0b7bcdda1639eea8239a69c31bdf42558e9077d2 
>   samza-core/src/test/scala/org/apache/samza/storage/TestTaskStorageManager.scala 4d40f520e54beb643acd8410c772b75e2f6a9162 
>   samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala 973ab8cfb3d248bec7efe5e338f5e667f097556d 
> 
> Diff: https://reviews.apache.org/r/52476/diff/
> 
> 
> Testing
> -------
> 
> Unit testing and manual testing has been done to verify the functionality.
> 
> 
> Thanks,
> 
> Shanthoosh Venkataraman
> 
>


Re: Review Request 52476: Do not load task store which are older than delete tombstones.

Posted by Jake Maes <jm...@apache.org>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/52476/#review164701
-----------------------------------------------------------




samza-core/src/main/scala/org/apache/samza/config/StorageConfig.scala (line 33)
<https://reviews.apache.org/r/52476/#comment236461>

    nit: TimeUnit.DAYS.toMillis(1)



samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala (lines 170 - 171)
<https://reviews.apache.org/r/52476/#comment236464>

    The logic structure is weird. 
    
    We really shouldn't need to read the file in order to return whether the file is present and its last modified time. 
    
    It seems like we need a "isStoreValid()" method which 
    1. Checks persistedStores.contains(storeName)
    2. Calls isStaleLoggedStore to verify the age,
    3. Calls  a new isOffsetFileValid method to verify the contents of the file
    
    And it returns true only if all the above checks succeed.


- Jake Maes


On Oct. 22, 2016, 10:06 p.m., Shanthoosh Venkataraman wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/52476/
> -----------------------------------------------------------
> 
> (Updated Oct. 22, 2016, 10:06 p.m.)
> 
> 
> Review request for samza.
> 
> 
> Repository: samza
> 
> 
> Description
> -------
> 
> Every local task store is backed up by a kafka changelog topic. Due to log compaction, delete tombstones of the changelog topic have a ttl of delete.retention.ms. Replaying the events from the changelog that has missing delete tombstones, would result in creation of an inconsistent local store(due to the missing of some delete events). This patch deletes the local stores in which difference between current time and last modified time of the offset file is greater than delete.retention.ms during the container startup.
> 
> 
> Diffs
> -----
> 
>   samza-core/src/main/java/org/apache/samza/storage/StorageRecovery.java 9329edf7d724f3a0d9235354bb77936f713e3b5f 
>   samza-core/src/main/scala/org/apache/samza/config/StorageConfig.scala be3f1068f7921c9c8f8697577efea6580672813e 
>   samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala 05a996c98075ea8ed3767af666b9beeb1933f2a6 
>   samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala 0b7bcdda1639eea8239a69c31bdf42558e9077d2 
>   samza-core/src/test/scala/org/apache/samza/storage/TestTaskStorageManager.scala 4d40f520e54beb643acd8410c772b75e2f6a9162 
>   samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala 973ab8cfb3d248bec7efe5e338f5e667f097556d 
> 
> Diff: https://reviews.apache.org/r/52476/diff/
> 
> 
> Testing
> -------
> 
> Unit testing and manual testing has been done to verify the functionality.
> 
> 
> Thanks,
> 
> Shanthoosh Venkataraman
> 
>


Re: Review Request 52476: SAMZA-1083 : Do not load task store which are older than delete tombstones.

Posted by Shanthoosh Venkataraman <sa...@gmail.com>.

> On Feb. 8, 2017, 8:43 p.m., Jake Maes wrote:
> > samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala, line 177
> > <https://reviews.apache.org/r/52476/diff/4-6/?file=1541857#file1541857line177>
> >
> >     Looks like this log statement belongs in an else-block. It says the store is stale, but in this block, we've determined hasValidOffsetFile=true.
> >     
> >     Also the wording isn't entirely accurate. The offset file may be present, but empty. It might be better to say "Offset file is not valid for store: %s"

My bad, this minor thing was missed while refactoring. Thanks for pointing it out. Fixed.


- Shanthoosh


-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/52476/#review164765
-----------------------------------------------------------


On Feb. 8, 2017, 9:37 p.m., Shanthoosh Venkataraman wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/52476/
> -----------------------------------------------------------
> 
> (Updated Feb. 8, 2017, 9:37 p.m.)
> 
> 
> Review request for samza.
> 
> 
> Repository: samza
> 
> 
> Description
> -------
> 
> Every local task store is backed up by a kafka changelog topic. Due to log compaction, delete tombstones of the changelog topic have a ttl of delete.retention.ms. Replaying the events from the changelog that has missing delete tombstones, would result in creation of an inconsistent local store(due to the missing of some delete events). This patch deletes the local stores in which difference between current time and last modified time of the offset file is greater than delete.retention.ms during the container startup.
> 
> 
> Diffs
> -----
> 
>   samza-core/src/main/java/org/apache/samza/storage/StorageRecovery.java 9329edf7d724f3a0d9235354bb77936f713e3b5f 
>   samza-core/src/main/scala/org/apache/samza/config/StorageConfig.scala a3587d0a40c57374ee1742234929d444e381e42d 
>   samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala c3308bfd7de04c335fef6cb66baa29286a230080 
>   samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala 0b7bcdda1639eea8239a69c31bdf42558e9077d2 
>   samza-core/src/test/scala/org/apache/samza/storage/TestTaskStorageManager.scala 4d40f520e54beb643acd8410c772b75e2f6a9162 
>   samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala 9320cf744ff90d647a198b51cb06d2a526fe68fa 
> 
> Diff: https://reviews.apache.org/r/52476/diff/
> 
> 
> Testing
> -------
> 
> Unit testing and manual testing has been done to verify the functionality.
> 
> 
> Thanks,
> 
> Shanthoosh Venkataraman
> 
>


Re: Review Request 52476: SAMZA-1083 : Do not load task store which are older than delete tombstones.

Posted by Jake Maes <jm...@apache.org>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/52476/#review164765
-----------------------------------------------------------




samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala (line 173)
<https://reviews.apache.org/r/52476/#comment236527>

    Looks like this log statement belongs in an else-block. It says the store is stale, but in this block, we've determined hasValidOffsetFile=true.
    
    Also the wording isn't entirely accurate. The offset file may be present, but empty. It might be better to say "Offset file is not valid for store: %s"


- Jake Maes


On Feb. 8, 2017, 8 p.m., Shanthoosh Venkataraman wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/52476/
> -----------------------------------------------------------
> 
> (Updated Feb. 8, 2017, 8 p.m.)
> 
> 
> Review request for samza.
> 
> 
> Repository: samza
> 
> 
> Description
> -------
> 
> Every local task store is backed up by a kafka changelog topic. Due to log compaction, delete tombstones of the changelog topic have a ttl of delete.retention.ms. Replaying the events from the changelog that has missing delete tombstones, would result in creation of an inconsistent local store(due to the missing of some delete events). This patch deletes the local stores in which difference between current time and last modified time of the offset file is greater than delete.retention.ms during the container startup.
> 
> 
> Diffs
> -----
> 
>   samza-core/src/main/java/org/apache/samza/storage/StorageRecovery.java 9329edf7d724f3a0d9235354bb77936f713e3b5f 
>   samza-core/src/main/scala/org/apache/samza/config/StorageConfig.scala a3587d0a40c57374ee1742234929d444e381e42d 
>   samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala c3308bfd7de04c335fef6cb66baa29286a230080 
>   samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala 0b7bcdda1639eea8239a69c31bdf42558e9077d2 
>   samza-core/src/test/scala/org/apache/samza/storage/TestTaskStorageManager.scala 4d40f520e54beb643acd8410c772b75e2f6a9162 
>   samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala 9320cf744ff90d647a198b51cb06d2a526fe68fa 
> 
> Diff: https://reviews.apache.org/r/52476/diff/
> 
> 
> Testing
> -------
> 
> Unit testing and manual testing has been done to verify the functionality.
> 
> 
> Thanks,
> 
> Shanthoosh Venkataraman
> 
>


Re: Review Request 52476: SAMZA-1083 : Do not load task store which are older than delete tombstones.

Posted by Shanthoosh Venkataraman <sa...@gmail.com>.

> On Feb. 8, 2017, 10:06 p.m., Prateek Maheshwari wrote:
> > samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala, line 196
> > <https://reviews.apache.org/r/52476/diff/7/?file=1627958#file1627958line196>
> >
> >     s/in the store/for the store.

Fixed.


> On Feb. 8, 2017, 10:06 p.m., Prateek Maheshwari wrote:
> > samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala, line 199
> > <https://reviews.apache.org/r/52476/diff/7/?file=1627958#file1627958line199>
> >
> >     No space before colon, here and other places in this file.

Fixed at all the places.


> On Feb. 8, 2017, 10:06 p.m., Prateek Maheshwari wrote:
> > samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala, line 159
> > <https://reviews.apache.org/r/52476/diff/7/?file=1627958#file1627958line159>
> >
> >     No comma before 'if' if it's the last clause in the sentence.

Fixed.


> On Feb. 8, 2017, 10:06 p.m., Prateek Maheshwari wrote:
> > samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala, line 113
> > <https://reviews.apache.org/r/52476/diff/7/?file=1627958#file1627958line113>
> >
> >     s/of the store/for the store.

Fixed.


> On Feb. 8, 2017, 10:06 p.m., Prateek Maheshwari wrote:
> > samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala, line 120
> > <https://reviews.apache.org/r/52476/diff/7/?file=1627958#file1627958line120>
> >
> >     @param doesn't go in the method description, use {@code}

Fixed.


> On Feb. 8, 2017, 10:06 p.m., Prateek Maheshwari wrote:
> > samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala, line 135
> > <https://reviews.apache.org/r/52476/diff/7/?file=1627958#file1627958line135>
> >
> >     Explain what stale means here.
> >     
> >     Use @code instead of @param here.

Fixed.


- Shanthoosh


-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/52476/#review164786
-----------------------------------------------------------


On Feb. 8, 2017, 9:37 p.m., Shanthoosh Venkataraman wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/52476/
> -----------------------------------------------------------
> 
> (Updated Feb. 8, 2017, 9:37 p.m.)
> 
> 
> Review request for samza.
> 
> 
> Repository: samza
> 
> 
> Description
> -------
> 
> Every local task store is backed up by a kafka changelog topic. Due to log compaction, delete tombstones of the changelog topic have a ttl of delete.retention.ms. Replaying the events from the changelog that has missing delete tombstones, would result in creation of an inconsistent local store(due to the missing of some delete events). This patch deletes the local stores in which difference between current time and last modified time of the offset file is greater than delete.retention.ms during the container startup.
> 
> 
> Diffs
> -----
> 
>   samza-core/src/main/java/org/apache/samza/storage/StorageRecovery.java 9329edf7d724f3a0d9235354bb77936f713e3b5f 
>   samza-core/src/main/scala/org/apache/samza/config/StorageConfig.scala a3587d0a40c57374ee1742234929d444e381e42d 
>   samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala c3308bfd7de04c335fef6cb66baa29286a230080 
>   samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala 0b7bcdda1639eea8239a69c31bdf42558e9077d2 
>   samza-core/src/test/scala/org/apache/samza/storage/TestTaskStorageManager.scala 4d40f520e54beb643acd8410c772b75e2f6a9162 
>   samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala 9320cf744ff90d647a198b51cb06d2a526fe68fa 
> 
> Diff: https://reviews.apache.org/r/52476/diff/
> 
> 
> Testing
> -------
> 
> Unit testing and manual testing has been done to verify the functionality.
> 
> 
> Thanks,
> 
> Shanthoosh Venkataraman
> 
>


Re: Review Request 52476: SAMZA-1083 : Do not load task store which are older than delete tombstones.

Posted by Prateek Maheshwari <pm...@linkedin.com>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/52476/#review164786
-----------------------------------------------------------


Fix it, then Ship it!




LGTM, thanks. Few code style/documentation related comments.


samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala (line 107)
<https://reviews.apache.org/r/52476/#comment236555>

    Can delete, doesn't describe the entire block, and unnecessary for the first part.



samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala (line 113)
<https://reviews.apache.org/r/52476/#comment236557>

    s/of the store/for the store.



samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala (line 120)
<https://reviews.apache.org/r/52476/#comment236554>

    @param doesn't go in the method description, use {@code}



samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala (line 135)
<https://reviews.apache.org/r/52476/#comment236559>

    Explain what stale means here.
    
    Use @code instead of @param here.



samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala (line 138)
<https://reviews.apache.org/r/52476/#comment236561>

    Can just say "true if the store is stale". Description of what stale means should go in the method description.



samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala (line 159)
<https://reviews.apache.org/r/52476/#comment236562>

    No comma before 'if' if it's the last clause in the sentence.



samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala (line 196)
<https://reviews.apache.org/r/52476/#comment236565>

    s/in the store/for the store.



samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala (line 198)
<https://reviews.apache.org/r/52476/#comment236563>

    No space before colon, here and other places in this file.



samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala (line 202)
<https://reviews.apache.org/r/52476/#comment236564>

    Thanks!


- Prateek Maheshwari


On Feb. 8, 2017, 1:37 p.m., Shanthoosh Venkataraman wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/52476/
> -----------------------------------------------------------
> 
> (Updated Feb. 8, 2017, 1:37 p.m.)
> 
> 
> Review request for samza.
> 
> 
> Repository: samza
> 
> 
> Description
> -------
> 
> Every local task store is backed up by a kafka changelog topic. Due to log compaction, delete tombstones of the changelog topic have a ttl of delete.retention.ms. Replaying the events from the changelog that has missing delete tombstones, would result in creation of an inconsistent local store(due to the missing of some delete events). This patch deletes the local stores in which difference between current time and last modified time of the offset file is greater than delete.retention.ms during the container startup.
> 
> 
> Diffs
> -----
> 
>   samza-core/src/main/java/org/apache/samza/storage/StorageRecovery.java 9329edf7d724f3a0d9235354bb77936f713e3b5f 
>   samza-core/src/main/scala/org/apache/samza/config/StorageConfig.scala a3587d0a40c57374ee1742234929d444e381e42d 
>   samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala c3308bfd7de04c335fef6cb66baa29286a230080 
>   samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala 0b7bcdda1639eea8239a69c31bdf42558e9077d2 
>   samza-core/src/test/scala/org/apache/samza/storage/TestTaskStorageManager.scala 4d40f520e54beb643acd8410c772b75e2f6a9162 
>   samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala 9320cf744ff90d647a198b51cb06d2a526fe68fa 
> 
> Diff: https://reviews.apache.org/r/52476/diff/
> 
> 
> Testing
> -------
> 
> Unit testing and manual testing has been done to verify the functionality.
> 
> 
> Thanks,
> 
> Shanthoosh Venkataraman
> 
>


Re: Review Request 52476: SAMZA-1083 : Do not load task store which are older than delete tombstones.

Posted by Jake Maes <jm...@apache.org>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/52476/#review164782
-----------------------------------------------------------


Ship it!




Ship It!

- Jake Maes


On Feb. 8, 2017, 9:37 p.m., Shanthoosh Venkataraman wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/52476/
> -----------------------------------------------------------
> 
> (Updated Feb. 8, 2017, 9:37 p.m.)
> 
> 
> Review request for samza.
> 
> 
> Repository: samza
> 
> 
> Description
> -------
> 
> Every local task store is backed up by a kafka changelog topic. Due to log compaction, delete tombstones of the changelog topic have a ttl of delete.retention.ms. Replaying the events from the changelog that has missing delete tombstones, would result in creation of an inconsistent local store(due to the missing of some delete events). This patch deletes the local stores in which difference between current time and last modified time of the offset file is greater than delete.retention.ms during the container startup.
> 
> 
> Diffs
> -----
> 
>   samza-core/src/main/java/org/apache/samza/storage/StorageRecovery.java 9329edf7d724f3a0d9235354bb77936f713e3b5f 
>   samza-core/src/main/scala/org/apache/samza/config/StorageConfig.scala a3587d0a40c57374ee1742234929d444e381e42d 
>   samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala c3308bfd7de04c335fef6cb66baa29286a230080 
>   samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala 0b7bcdda1639eea8239a69c31bdf42558e9077d2 
>   samza-core/src/test/scala/org/apache/samza/storage/TestTaskStorageManager.scala 4d40f520e54beb643acd8410c772b75e2f6a9162 
>   samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala 9320cf744ff90d647a198b51cb06d2a526fe68fa 
> 
> Diff: https://reviews.apache.org/r/52476/diff/
> 
> 
> Testing
> -------
> 
> Unit testing and manual testing has been done to verify the functionality.
> 
> 
> Thanks,
> 
> Shanthoosh Venkataraman
> 
>


Re: Review Request 52476: SAMZA-1083 : Do not load task store which are older than delete tombstones.

Posted by Shanthoosh Venkataraman <sa...@gmail.com>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/52476/
-----------------------------------------------------------

(Updated Feb. 8, 2017, 9:37 p.m.)


Review request for samza.


Repository: samza


Description
-------

Every local task store is backed up by a kafka changelog topic. Due to log compaction, delete tombstones of the changelog topic have a ttl of delete.retention.ms. Replaying the events from the changelog that has missing delete tombstones, would result in creation of an inconsistent local store(due to the missing of some delete events). This patch deletes the local stores in which difference between current time and last modified time of the offset file is greater than delete.retention.ms during the container startup.


Diffs (updated)
-----

  samza-core/src/main/java/org/apache/samza/storage/StorageRecovery.java 9329edf7d724f3a0d9235354bb77936f713e3b5f 
  samza-core/src/main/scala/org/apache/samza/config/StorageConfig.scala a3587d0a40c57374ee1742234929d444e381e42d 
  samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala c3308bfd7de04c335fef6cb66baa29286a230080 
  samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala 0b7bcdda1639eea8239a69c31bdf42558e9077d2 
  samza-core/src/test/scala/org/apache/samza/storage/TestTaskStorageManager.scala 4d40f520e54beb643acd8410c772b75e2f6a9162 
  samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala 9320cf744ff90d647a198b51cb06d2a526fe68fa 

Diff: https://reviews.apache.org/r/52476/diff/


Testing
-------

Unit testing and manual testing has been done to verify the functionality.


Thanks,

Shanthoosh Venkataraman


Re: Review Request 52476: SAMZA-1083 : Do not load task store which are older than delete tombstones.

Posted by Shanthoosh Venkataraman <sa...@gmail.com>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/52476/
-----------------------------------------------------------

(Updated Feb. 8, 2017, 8 p.m.)


Review request for samza.


Repository: samza


Description
-------

Every local task store is backed up by a kafka changelog topic. Due to log compaction, delete tombstones of the changelog topic have a ttl of delete.retention.ms. Replaying the events from the changelog that has missing delete tombstones, would result in creation of an inconsistent local store(due to the missing of some delete events). This patch deletes the local stores in which difference between current time and last modified time of the offset file is greater than delete.retention.ms during the container startup.


Diffs (updated)
-----

  samza-core/src/main/java/org/apache/samza/storage/StorageRecovery.java 9329edf7d724f3a0d9235354bb77936f713e3b5f 
  samza-core/src/main/scala/org/apache/samza/config/StorageConfig.scala a3587d0a40c57374ee1742234929d444e381e42d 
  samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala c3308bfd7de04c335fef6cb66baa29286a230080 
  samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala 0b7bcdda1639eea8239a69c31bdf42558e9077d2 
  samza-core/src/test/scala/org/apache/samza/storage/TestTaskStorageManager.scala 4d40f520e54beb643acd8410c772b75e2f6a9162 
  samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala 9320cf744ff90d647a198b51cb06d2a526fe68fa 

Diff: https://reviews.apache.org/r/52476/diff/


Testing
-------

Unit testing and manual testing has been done to verify the functionality.


Thanks,

Shanthoosh Venkataraman


Re: Review Request 52476: SAMZA-1083 : Do not load task store which are older than delete tombstones.

Posted by Prateek Maheshwari <pm...@linkedin.com>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/52476/#review164749
-----------------------------------------------------------




samza-core/src/main/scala/org/apache/samza/config/StorageConfig.scala (line 32)
<https://reviews.apache.org/r/52476/#comment236506>

    Doesn't look like this is fixed? Did you miss updating with a patch?


- Prateek Maheshwari


On Feb. 8, 2017, 11:09 a.m., Shanthoosh Venkataraman wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/52476/
> -----------------------------------------------------------
> 
> (Updated Feb. 8, 2017, 11:09 a.m.)
> 
> 
> Review request for samza.
> 
> 
> Repository: samza
> 
> 
> Description
> -------
> 
> Every local task store is backed up by a kafka changelog topic. Due to log compaction, delete tombstones of the changelog topic have a ttl of delete.retention.ms. Replaying the events from the changelog that has missing delete tombstones, would result in creation of an inconsistent local store(due to the missing of some delete events). This patch deletes the local stores in which difference between current time and last modified time of the offset file is greater than delete.retention.ms during the container startup.
> 
> 
> Diffs
> -----
> 
>   samza-core/src/main/java/org/apache/samza/storage/StorageRecovery.java 9329edf7d724f3a0d9235354bb77936f713e3b5f 
>   samza-core/src/main/scala/org/apache/samza/config/StorageConfig.scala be3f1068f7921c9c8f8697577efea6580672813e 
>   samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala 05a996c98075ea8ed3767af666b9beeb1933f2a6 
>   samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala 0b7bcdda1639eea8239a69c31bdf42558e9077d2 
>   samza-core/src/test/scala/org/apache/samza/storage/TestTaskStorageManager.scala 4d40f520e54beb643acd8410c772b75e2f6a9162 
>   samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala 973ab8cfb3d248bec7efe5e338f5e667f097556d 
> 
> Diff: https://reviews.apache.org/r/52476/diff/
> 
> 
> Testing
> -------
> 
> Unit testing and manual testing has been done to verify the functionality.
> 
> 
> Thanks,
> 
> Shanthoosh Venkataraman
> 
>


Re: Review Request 52476: SAMZA-1083 : Do not load task store which are older than delete tombstones.

Posted by Shanthoosh Venkataraman <sa...@gmail.com>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/52476/
-----------------------------------------------------------

(Updated Feb. 8, 2017, 7:09 p.m.)


Review request for samza.


Summary (updated)
-----------------

SAMZA-1083 : Do not load task store which are older than delete tombstones.


Repository: samza


Description
-------

Every local task store is backed up by a kafka changelog topic. Due to log compaction, delete tombstones of the changelog topic have a ttl of delete.retention.ms. Replaying the events from the changelog that has missing delete tombstones, would result in creation of an inconsistent local store(due to the missing of some delete events). This patch deletes the local stores in which difference between current time and last modified time of the offset file is greater than delete.retention.ms during the container startup.


Diffs
-----

  samza-core/src/main/java/org/apache/samza/storage/StorageRecovery.java 9329edf7d724f3a0d9235354bb77936f713e3b5f 
  samza-core/src/main/scala/org/apache/samza/config/StorageConfig.scala be3f1068f7921c9c8f8697577efea6580672813e 
  samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala 05a996c98075ea8ed3767af666b9beeb1933f2a6 
  samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala 0b7bcdda1639eea8239a69c31bdf42558e9077d2 
  samza-core/src/test/scala/org/apache/samza/storage/TestTaskStorageManager.scala 4d40f520e54beb643acd8410c772b75e2f6a9162 
  samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala 973ab8cfb3d248bec7efe5e338f5e667f097556d 

Diff: https://reviews.apache.org/r/52476/diff/


Testing
-------

Unit testing and manual testing has been done to verify the functionality.


Thanks,

Shanthoosh Venkataraman


Re: Review Request 52476: Do not load task store which are older than delete tombstones.

Posted by Shanthoosh Venkataraman <sa...@gmail.com>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/52476/
-----------------------------------------------------------

(Updated Oct. 22, 2016, 10:06 p.m.)


Review request for samza.


Repository: samza


Description
-------

Every local task store is backed up by a kafka changelog topic. Due to log compaction, delete tombstones of the changelog topic have a ttl of delete.retention.ms. Replaying the events from the changelog that has missing delete tombstones, would result in creation of an inconsistent local store(due to the missing of some delete events). This patch deletes the local stores in which difference between current time and last modified time of the offset file is greater than delete.retention.ms during the container startup.


Diffs (updated)
-----

  samza-core/src/main/java/org/apache/samza/storage/StorageRecovery.java 9329edf7d724f3a0d9235354bb77936f713e3b5f 
  samza-core/src/main/scala/org/apache/samza/config/StorageConfig.scala be3f1068f7921c9c8f8697577efea6580672813e 
  samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala 05a996c98075ea8ed3767af666b9beeb1933f2a6 
  samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala 0b7bcdda1639eea8239a69c31bdf42558e9077d2 
  samza-core/src/test/scala/org/apache/samza/storage/TestTaskStorageManager.scala 4d40f520e54beb643acd8410c772b75e2f6a9162 
  samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala 973ab8cfb3d248bec7efe5e338f5e667f097556d 

Diff: https://reviews.apache.org/r/52476/diff/


Testing
-------

Unit testing and manual testing has been done to verify the functionality.


Thanks,

Shanthoosh Venkataraman


Re: Review Request 52476: Do not load task store which are older than delete tombstones.

Posted by Shanthoosh Venkataraman <sa...@gmail.com>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/52476/
-----------------------------------------------------------

(Updated Oct. 19, 2016, 10:04 p.m.)


Review request for samza.


Repository: samza


Description
-------

Every local task store is backed up by a kafka changelog topic. Due to log compaction, delete tombstones of the changelog topic have a ttl of delete.retention.ms. Replaying the events from the changelog that has missing delete tombstones, would result in creation of an inconsistent local store(due to the missing of some delete events). This patch deletes the local stores in which difference between current time and last modified time of the offset file is greater than delete.retention.ms during the container startup.


Diffs (updated)
-----

  samza-core/src/main/java/org/apache/samza/storage/StorageRecovery.java 9329edf7d724f3a0d9235354bb77936f713e3b5f 
  samza-core/src/main/scala/org/apache/samza/config/StorageConfig.scala be3f1068f7921c9c8f8697577efea6580672813e 
  samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala 05a996c98075ea8ed3767af666b9beeb1933f2a6 
  samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala 0b7bcdda1639eea8239a69c31bdf42558e9077d2 
  samza-core/src/test/scala/org/apache/samza/storage/TestTaskStorageManager.scala 4d40f520e54beb643acd8410c772b75e2f6a9162 
  samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala 973ab8cfb3d248bec7efe5e338f5e667f097556d 

Diff: https://reviews.apache.org/r/52476/diff/


Testing
-------

Unit testing and manual testing has been done to verify the functionality.


Thanks,

Shanthoosh Venkataraman


Re: Review Request 52476: Do not load task store which are older than delete tombstones.

Posted by Shanthoosh Venkataraman <sa...@gmail.com>.

> On Oct. 17, 2016, 11:08 p.m., Prateek Maheshwari wrote:
> > samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala, line 156
> > <https://reviews.apache.org/r/52476/diff/3/?file=1539990#file1539990line156>
> >
> >     Unrelated, but let's make this info.

Done.


> On Oct. 17, 2016, 11:08 p.m., Prateek Maheshwari wrote:
> > samza-core/src/main/scala/org/apache/samza/config/StorageConfig.scala, line 33
> > <https://reviews.apache.org/r/52476/diff/3/?file=1539988#file1539988line33>
> >
> >     Usually more readable if you write this as a multiplication: 1 * 24 * 60 * 60 * 1000L

Done.


> On Oct. 17, 2016, 11:08 p.m., Prateek Maheshwari wrote:
> > samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala, line 532
> > <https://reviews.apache.org/r/52476/diff/3/?file=1539989#file1539989line532>
> >
> >     Prefer passing the one config that we need explicitly instead of passing the config object.

Moved to use changeLogDeleteRetentions map.


> On Oct. 17, 2016, 11:08 p.m., Prateek Maheshwari wrote:
> > samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala, line 29
> > <https://reviews.apache.org/r/52476/diff/3/?file=1539990#file1539990line29>
> >
> >     Unrelated to RB but prefer explicit imports.

Done.


> On Oct. 17, 2016, 11:08 p.m., Prateek Maheshwari wrote:
> > samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala, line 26
> > <https://reviews.apache.org/r/52476/diff/3/?file=1539990#file1539990line26>
> >
> >     Delete or import explicitly.

Done.


> On Oct. 17, 2016, 11:08 p.m., Prateek Maheshwari wrote:
> > samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala, line 107
> > <https://reviews.apache.org/r/52476/diff/3/?file=1539990#file1539990line107>
> >
> >     Add method description.

Done.


> On Oct. 17, 2016, 11:08 p.m., Prateek Maheshwari wrote:
> > samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala, line 114
> > <https://reviews.apache.org/r/52476/diff/3/?file=1539990#file1539990line114>
> >
> >     Another case we ran into on Friday - if the oldest offset in the changelog topic is newer than the offset in the OFFSET file. Do you need to handle that here?
> >     
> >     Nitpick: would isStaleStore be clearer?

Discussed offline. This is a regular scenario that happens with compaction (message expiration in general w.r.t topics). When the offset in the offset file is older than oldest offset in changelog, it indicates that compaction has happened. To not miss messages from the topic in the consumption, users have to consume from the oldest offset in the changelog, which is controlled by the config parameter systems.name.consumer.auto.offset.reset.


> On Oct. 17, 2016, 11:08 p.m., Prateek Maheshwari wrote:
> > samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala, line 122
> > <https://reviews.apache.org/r/52476/diff/3/?file=1539990#file1539990line122>
> >
> >     Mention somewhere in the message that this means that the store is stale.

Done.


- Shanthoosh


-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/52476/#review152976
-----------------------------------------------------------


On Oct. 19, 2016, 10:04 p.m., Shanthoosh Venkataraman wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/52476/
> -----------------------------------------------------------
> 
> (Updated Oct. 19, 2016, 10:04 p.m.)
> 
> 
> Review request for samza.
> 
> 
> Repository: samza
> 
> 
> Description
> -------
> 
> Every local task store is backed up by a kafka changelog topic. Due to log compaction, delete tombstones of the changelog topic have a ttl of delete.retention.ms. Replaying the events from the changelog that has missing delete tombstones, would result in creation of an inconsistent local store(due to the missing of some delete events). This patch deletes the local stores in which difference between current time and last modified time of the offset file is greater than delete.retention.ms during the container startup.
> 
> 
> Diffs
> -----
> 
>   samza-core/src/main/java/org/apache/samza/storage/StorageRecovery.java 9329edf7d724f3a0d9235354bb77936f713e3b5f 
>   samza-core/src/main/scala/org/apache/samza/config/StorageConfig.scala be3f1068f7921c9c8f8697577efea6580672813e 
>   samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala 05a996c98075ea8ed3767af666b9beeb1933f2a6 
>   samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala 0b7bcdda1639eea8239a69c31bdf42558e9077d2 
>   samza-core/src/test/scala/org/apache/samza/storage/TestTaskStorageManager.scala 4d40f520e54beb643acd8410c772b75e2f6a9162 
>   samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala 973ab8cfb3d248bec7efe5e338f5e667f097556d 
> 
> Diff: https://reviews.apache.org/r/52476/diff/
> 
> 
> Testing
> -------
> 
> Unit testing and manual testing has been done to verify the functionality.
> 
> 
> Thanks,
> 
> Shanthoosh Venkataraman
> 
>


Re: Review Request 52476: Do not load task store which are older than delete tombstones.

Posted by Prateek Maheshwari <pm...@linkedin.com>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/52476/#review152976
-----------------------------------------------------------




samza-core/src/main/scala/org/apache/samza/config/StorageConfig.scala (line 33)
<https://reviews.apache.org/r/52476/#comment222170>

    Usually more readable if you write this as a multiplication: 1 * 24 * 60 * 60 * 1000L



samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala (line 532)
<https://reviews.apache.org/r/52476/#comment222189>

    Prefer passing the one config that we need explicitly instead of passing the config object.



samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala (line 26)
<https://reviews.apache.org/r/52476/#comment222171>

    Delete or import explicitly.



samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala (line 29)
<https://reviews.apache.org/r/52476/#comment222190>

    Unrelated to RB but prefer explicit imports.



samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala (line 71)
<https://reviews.apache.org/r/52476/#comment222173>

    SystemClock exists so that you can pass a "Clock" to your method/class and mock it in tests. Let's either do that (preferred) or use System.currentTimeMillis() directly.



samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala (line 98)
<https://reviews.apache.org/r/52476/#comment222184>

    Looks like we've updated `fileOffset` in `#readOffsetFile` as a side effect even when the store is stale. Is that what we want here?



samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala (line 100)
<https://reviews.apache.org/r/52476/#comment222175>

    Add an INFO message here.



samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala (line 106)
<https://reviews.apache.org/r/52476/#comment222176>

    Add method description.



samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala (line 113)
<https://reviews.apache.org/r/52476/#comment222177>

    Another case we ran into on Friday - if the oldest offset in the changelog topic is newer than the offset in the OFFSET file. Do you need to handle that here?
    
    Nitpick: would isStaleStore be clearer?



samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala (line 118)
<https://reviews.apache.org/r/52476/#comment222180>

    Looks like this is already logged at line 163?



samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala (line 119)
<https://reviews.apache.org/r/52476/#comment222179>

    Don't `return` in scala code.



samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala (line 121)
<https://reviews.apache.org/r/52476/#comment222181>

    Mention somewhere in the message that this means that the store is stale.



samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala (line 150)
<https://reviews.apache.org/r/52476/#comment222178>

    I'd prefer to split this into two methods - existence check and file read. Would be even nicer if fileOffset was updated explicitly (after staleness checks etc.) and not as a side effect of reading the file.
    
    If you don't, let's add return type to method signature.



samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala (line 155)
<https://reviews.apache.org/r/52476/#comment222182>

    Unrelated, but let's make this info.


- Prateek Maheshwari


On Oct. 17, 2016, 3:40 p.m., Shanthoosh Venkataraman wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/52476/
> -----------------------------------------------------------
> 
> (Updated Oct. 17, 2016, 3:40 p.m.)
> 
> 
> Review request for samza.
> 
> 
> Repository: samza
> 
> 
> Description
> -------
> 
> Every local task store is backed up by a kafka changelog topic. Due to log compaction, delete tombstones of the changelog topic have a ttl of delete.retention.ms. Replaying the events from the changelog that has missing delete tombstones, would result in creation of an inconsistent local store(due to the missing of some delete events). This patch deletes the local stores in which difference between current time and last modified time of the offset file is greater than delete.retention.ms during the container startup.
> 
> 
> Diffs
> -----
> 
>   samza-core/src/main/java/org/apache/samza/storage/StorageRecovery.java 9329edf7d724f3a0d9235354bb77936f713e3b5f 
>   samza-core/src/main/scala/org/apache/samza/config/StorageConfig.scala be3f1068f7921c9c8f8697577efea6580672813e 
>   samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala 05a996c98075ea8ed3767af666b9beeb1933f2a6 
>   samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala 0b7bcdda1639eea8239a69c31bdf42558e9077d2 
>   samza-core/src/test/scala/org/apache/samza/storage/TestTaskStorageManager.scala 4d40f520e54beb643acd8410c772b75e2f6a9162 
>   samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala 973ab8cfb3d248bec7efe5e338f5e667f097556d 
> 
> Diff: https://reviews.apache.org/r/52476/diff/
> 
> 
> Testing
> -------
> 
> Unit testing and manual testing has been done to verify the functionality.
> 
> 
> Thanks,
> 
> Shanthoosh Venkataraman
> 
>


Re: Review Request 52476: Do not load task store which are older than delete tombstones.

Posted by Shanthoosh Venkataraman <sa...@gmail.com>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/52476/
-----------------------------------------------------------

(Updated Oct. 17, 2016, 10:40 p.m.)


Review request for samza.


Repository: samza


Description
-------

Every local task store is backed up by a kafka changelog topic. Due to log compaction, delete tombstones of the changelog topic have a ttl of delete.retention.ms. Replaying the events from the changelog that has missing delete tombstones, would result in creation of an inconsistent local store(due to the missing of some delete events). This patch deletes the local stores in which difference between current time and last modified time of the offset file is greater than delete.retention.ms during the container startup.


Diffs (updated)
-----

  samza-core/src/main/java/org/apache/samza/storage/StorageRecovery.java 9329edf7d724f3a0d9235354bb77936f713e3b5f 
  samza-core/src/main/scala/org/apache/samza/config/StorageConfig.scala be3f1068f7921c9c8f8697577efea6580672813e 
  samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala 05a996c98075ea8ed3767af666b9beeb1933f2a6 
  samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala 0b7bcdda1639eea8239a69c31bdf42558e9077d2 
  samza-core/src/test/scala/org/apache/samza/storage/TestTaskStorageManager.scala 4d40f520e54beb643acd8410c772b75e2f6a9162 
  samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala 973ab8cfb3d248bec7efe5e338f5e667f097556d 

Diff: https://reviews.apache.org/r/52476/diff/


Testing
-------

Unit testing and manual testing has been done to verify the functionality.


Thanks,

Shanthoosh Venkataraman


Re: Review Request 52476: Do not load task store which are older than delete tombstones.

Posted by Shanthoosh Venkataraman <sa...@gmail.com>.

> On Oct. 5, 2016, 2:08 a.m., Jake Maes wrote:
> > Looks better, but I think there's still one major part missing. 
> > 
> > In order to have agreement between a kafka changelog and the task storage, the changelog should be created with the same delete.retention.ms property. 
> > 
> > There are 2 ways to do this:
> > 1. (preferred) update the kafka system admin to read the samza changelog property that you've defined (which also needs to be added to the config table, btw) and create the topic with that value for delete.retention.ms
> > 2. Rename the property so it's one of the "topic-level-property" so it gets automatically passed to kafka. This is convenient but wouldn't apply to other systems, which could be useful if those other systems have a delete retention policy.
> 
> Shanthoosh Venkataraman wrote:
>     I think 1) is the only plausible way to accomplish this through job config. delete.retention.ms configuration is associated only with stores changelog, not applicable to topics in general, so making it topic level property might notbe a good idea. Enforcing the delete.retention.ms property is harder to accomplish through config, since kafka is a external system. Ideally, if there's a way to fetch kafka metadata/config(delete.retention.ms) about a topic, during container startups we could fetch that value, rather than expecting the users to specify it.
> 
> Jake Maes wrote:
>     Please take a look at org.apache.samza.config.KafkaConfig#getChangelogKafkaProperties

Done.


- Shanthoosh


-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/52476/#review151439
-----------------------------------------------------------


On Oct. 17, 2016, 10:40 p.m., Shanthoosh Venkataraman wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/52476/
> -----------------------------------------------------------
> 
> (Updated Oct. 17, 2016, 10:40 p.m.)
> 
> 
> Review request for samza.
> 
> 
> Repository: samza
> 
> 
> Description
> -------
> 
> Every local task store is backed up by a kafka changelog topic. Due to log compaction, delete tombstones of the changelog topic have a ttl of delete.retention.ms. Replaying the events from the changelog that has missing delete tombstones, would result in creation of an inconsistent local store(due to the missing of some delete events). This patch deletes the local stores in which difference between current time and last modified time of the offset file is greater than delete.retention.ms during the container startup.
> 
> 
> Diffs
> -----
> 
>   samza-core/src/main/java/org/apache/samza/storage/StorageRecovery.java 9329edf7d724f3a0d9235354bb77936f713e3b5f 
>   samza-core/src/main/scala/org/apache/samza/config/StorageConfig.scala be3f1068f7921c9c8f8697577efea6580672813e 
>   samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala 05a996c98075ea8ed3767af666b9beeb1933f2a6 
>   samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala 0b7bcdda1639eea8239a69c31bdf42558e9077d2 
>   samza-core/src/test/scala/org/apache/samza/storage/TestTaskStorageManager.scala 4d40f520e54beb643acd8410c772b75e2f6a9162 
>   samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala 973ab8cfb3d248bec7efe5e338f5e667f097556d 
> 
> Diff: https://reviews.apache.org/r/52476/diff/
> 
> 
> Testing
> -------
> 
> Unit testing and manual testing has been done to verify the functionality.
> 
> 
> Thanks,
> 
> Shanthoosh Venkataraman
> 
>


Re: Review Request 52476: Do not load task store which are older than delete tombstones.

Posted by Shanthoosh Venkataraman <sa...@gmail.com>.

> On Oct. 5, 2016, 2:08 a.m., Jake Maes wrote:
> > Looks better, but I think there's still one major part missing. 
> > 
> > In order to have agreement between a kafka changelog and the task storage, the changelog should be created with the same delete.retention.ms property. 
> > 
> > There are 2 ways to do this:
> > 1. (preferred) update the kafka system admin to read the samza changelog property that you've defined (which also needs to be added to the config table, btw) and create the topic with that value for delete.retention.ms
> > 2. Rename the property so it's one of the "topic-level-property" so it gets automatically passed to kafka. This is convenient but wouldn't apply to other systems, which could be useful if those other systems have a delete retention policy.

I think 1) is the only plausible way to accomplish this through job config. delete.retention.ms configuration is associated only with stores changelog, not applicable to topics in general, so making it topic level property might notbe a good idea. Enforcing the delete.retention.ms property is harder to accomplish through config, since kafka is a external system. Ideally, if there's a way to fetch kafka metadata/config(delete.retention.ms) about a topic, during container startups we could fetch that value, rather than expecting the users to specify it.


- Shanthoosh


-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/52476/#review151439
-----------------------------------------------------------


On Oct. 4, 2016, 11:33 p.m., Shanthoosh Venkataraman wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/52476/
> -----------------------------------------------------------
> 
> (Updated Oct. 4, 2016, 11:33 p.m.)
> 
> 
> Review request for samza.
> 
> 
> Repository: samza
> 
> 
> Description
> -------
> 
> Every local task store is backed up by a kafka changelog topic. Due to log compaction, delete tombstones of the changelog topic have a ttl of delete.retention.ms. Replaying the events from the changelog that has missing delete tombstones, would result in creation of an inconsistent local store(due to the missing of some delete events). This patch deletes the local stores in which difference between current time and last modified time of the offset file is greater than delete.retention.ms during the container startup.
> 
> 
> Diffs
> -----
> 
>   samza-core/src/main/java/org/apache/samza/storage/StorageRecovery.java 9329edf7d724f3a0d9235354bb77936f713e3b5f 
>   samza-core/src/main/scala/org/apache/samza/config/StorageConfig.scala be3f1068f7921c9c8f8697577efea6580672813e 
>   samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala 05a996c98075ea8ed3767af666b9beeb1933f2a6 
>   samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala 0b7bcdda1639eea8239a69c31bdf42558e9077d2 
>   samza-core/src/test/scala/org/apache/samza/storage/TestTaskStorageManager.scala 4d40f520e54beb643acd8410c772b75e2f6a9162 
> 
> Diff: https://reviews.apache.org/r/52476/diff/
> 
> 
> Testing
> -------
> 
> Unit testing and manual testing has been done to verify the functionality.
> 
> 
> Thanks,
> 
> Shanthoosh Venkataraman
> 
>


Re: Review Request 52476: Do not load task store which are older than delete tombstones.

Posted by Jake Maes <jm...@apache.org>.

> On Oct. 5, 2016, 2:08 a.m., Jake Maes wrote:
> > Looks better, but I think there's still one major part missing. 
> > 
> > In order to have agreement between a kafka changelog and the task storage, the changelog should be created with the same delete.retention.ms property. 
> > 
> > There are 2 ways to do this:
> > 1. (preferred) update the kafka system admin to read the samza changelog property that you've defined (which also needs to be added to the config table, btw) and create the topic with that value for delete.retention.ms
> > 2. Rename the property so it's one of the "topic-level-property" so it gets automatically passed to kafka. This is convenient but wouldn't apply to other systems, which could be useful if those other systems have a delete retention policy.
> 
> Shanthoosh Venkataraman wrote:
>     I think 1) is the only plausible way to accomplish this through job config. delete.retention.ms configuration is associated only with stores changelog, not applicable to topics in general, so making it topic level property might notbe a good idea. Enforcing the delete.retention.ms property is harder to accomplish through config, since kafka is a external system. Ideally, if there's a way to fetch kafka metadata/config(delete.retention.ms) about a topic, during container startups we could fetch that value, rather than expecting the users to specify it.

Please take a look at org.apache.samza.config.KafkaConfig#getChangelogKafkaProperties


- Jake


-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/52476/#review151439
-----------------------------------------------------------


On Oct. 4, 2016, 11:33 p.m., Shanthoosh Venkataraman wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/52476/
> -----------------------------------------------------------
> 
> (Updated Oct. 4, 2016, 11:33 p.m.)
> 
> 
> Review request for samza.
> 
> 
> Repository: samza
> 
> 
> Description
> -------
> 
> Every local task store is backed up by a kafka changelog topic. Due to log compaction, delete tombstones of the changelog topic have a ttl of delete.retention.ms. Replaying the events from the changelog that has missing delete tombstones, would result in creation of an inconsistent local store(due to the missing of some delete events). This patch deletes the local stores in which difference between current time and last modified time of the offset file is greater than delete.retention.ms during the container startup.
> 
> 
> Diffs
> -----
> 
>   samza-core/src/main/java/org/apache/samza/storage/StorageRecovery.java 9329edf7d724f3a0d9235354bb77936f713e3b5f 
>   samza-core/src/main/scala/org/apache/samza/config/StorageConfig.scala be3f1068f7921c9c8f8697577efea6580672813e 
>   samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala 05a996c98075ea8ed3767af666b9beeb1933f2a6 
>   samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala 0b7bcdda1639eea8239a69c31bdf42558e9077d2 
>   samza-core/src/test/scala/org/apache/samza/storage/TestTaskStorageManager.scala 4d40f520e54beb643acd8410c772b75e2f6a9162 
> 
> Diff: https://reviews.apache.org/r/52476/diff/
> 
> 
> Testing
> -------
> 
> Unit testing and manual testing has been done to verify the functionality.
> 
> 
> Thanks,
> 
> Shanthoosh Venkataraman
> 
>


Re: Review Request 52476: Do not load task store which are older than delete tombstones.

Posted by Jake Maes <jm...@apache.org>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/52476/#review151439
-----------------------------------------------------------



Looks better, but I think there's still one major part missing. 

In order to have agreement between a kafka changelog and the task storage, the changelog should be created with the same delete.retention.ms property. 

There are 2 ways to do this:
1. (preferred) update the kafka system admin to read the samza changelog property that you've defined (which also needs to be added to the config table, btw) and create the topic with that value for delete.retention.ms
2. Rename the property so it's one of the "topic-level-property" so it gets automatically passed to kafka. This is convenient but wouldn't apply to other systems, which could be useful if those other systems have a delete retention policy.

- Jake Maes


On Oct. 4, 2016, 11:33 p.m., Shanthoosh Venkataraman wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/52476/
> -----------------------------------------------------------
> 
> (Updated Oct. 4, 2016, 11:33 p.m.)
> 
> 
> Review request for samza.
> 
> 
> Repository: samza
> 
> 
> Description
> -------
> 
> Every local task store is backed up by a kafka changelog topic. Due to log compaction, delete tombstones of the changelog topic have a ttl of delete.retention.ms. Replaying the events from the changelog that has missing delete tombstones, would result in creation of an inconsistent local store(due to the missing of some delete events). This patch deletes the local stores in which difference between current time and last modified time of the offset file is greater than delete.retention.ms during the container startup.
> 
> 
> Diffs
> -----
> 
>   samza-core/src/main/java/org/apache/samza/storage/StorageRecovery.java 9329edf7d724f3a0d9235354bb77936f713e3b5f 
>   samza-core/src/main/scala/org/apache/samza/config/StorageConfig.scala be3f1068f7921c9c8f8697577efea6580672813e 
>   samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala 05a996c98075ea8ed3767af666b9beeb1933f2a6 
>   samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala 0b7bcdda1639eea8239a69c31bdf42558e9077d2 
>   samza-core/src/test/scala/org/apache/samza/storage/TestTaskStorageManager.scala 4d40f520e54beb643acd8410c772b75e2f6a9162 
> 
> Diff: https://reviews.apache.org/r/52476/diff/
> 
> 
> Testing
> -------
> 
> Unit testing and manual testing has been done to verify the functionality.
> 
> 
> Thanks,
> 
> Shanthoosh Venkataraman
> 
>


Re: Review Request 52476: Do not load task store which are older than delete tombstones.

Posted by Shanthoosh Venkataraman <sa...@gmail.com>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/52476/
-----------------------------------------------------------

(Updated Oct. 4, 2016, 11:33 p.m.)


Review request for samza.


Repository: samza


Description
-------

Every local task store is backed up by a kafka changelog topic. Due to log compaction, delete tombstones of the changelog topic have a ttl of delete.retention.ms. Replaying the events from the changelog that has missing delete tombstones, would result in creation of an inconsistent local store(due to the missing of some delete events). This patch deletes the local stores in which difference between current time and last modified time of the offset file is greater than delete.retention.ms during the container startup.


Diffs (updated)
-----

  samza-core/src/main/java/org/apache/samza/storage/StorageRecovery.java 9329edf7d724f3a0d9235354bb77936f713e3b5f 
  samza-core/src/main/scala/org/apache/samza/config/StorageConfig.scala be3f1068f7921c9c8f8697577efea6580672813e 
  samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala 05a996c98075ea8ed3767af666b9beeb1933f2a6 
  samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala 0b7bcdda1639eea8239a69c31bdf42558e9077d2 
  samza-core/src/test/scala/org/apache/samza/storage/TestTaskStorageManager.scala 4d40f520e54beb643acd8410c772b75e2f6a9162 

Diff: https://reviews.apache.org/r/52476/diff/


Testing
-------

Unit testing and manual testing has been done to verify the functionality.


Thanks,

Shanthoosh Venkataraman