You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@samza.apache.org by Shanthoosh Venkataraman <sa...@gmail.com> on 2016/10/03 17:00:50 UTC
Review Request 52476: Do not load task store which are older than
delete tombstones.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/52476/
-----------------------------------------------------------
Review request for samza.
Repository: samza
Description
-------
Every local task store is backed up by a kafka changelog topic. Due to log compaction, delete tombstones of the changelog topic have a ttl of delete.retention.ms. Replaying the events from the changelog that has missing delete tombstones, would result in creation of an inconsistent local store(due to the missing of some delete events). This patch deletes the local stores in which difference between current time and last modified time of the offset file is greater than delete.retention.ms during the container startup.
Diffs
-----
samza-core/src/main/java/org/apache/samza/storage/StorageRecovery.java 9329edf7d724f3a0d9235354bb77936f713e3b5f
samza-core/src/main/scala/org/apache/samza/config/StorageConfig.scala be3f1068f7921c9c8f8697577efea6580672813e
samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala 05a996c98075ea8ed3767af666b9beeb1933f2a6
samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala 0b7bcdda1639eea8239a69c31bdf42558e9077d2
samza-core/src/test/scala/org/apache/samza/storage/TestTaskStorageManager.scala 4d40f520e54beb643acd8410c772b75e2f6a9162
Diff: https://reviews.apache.org/r/52476/diff/
Testing
-------
Unit testing and manual testing has been done to verify the functionality.
Thanks,
Shanthoosh Venkataraman
Re: Review Request 52476: Do not load task store which are older than
delete tombstones.
Posted by Shanthoosh Venkataraman <sa...@gmail.com>.
> On Oct. 4, 2016, 1:57 a.m., Jake Maes wrote:
> > samza-core/src/test/scala/org/apache/samza/storage/TestTaskStorageManager.scala, line 293
> > <https://reviews.apache.org/r/52476/diff/1/?file=1518377#file1518377line293>
> >
> > 2 nits:
> > 1. Can you swap this test with the next one in terms of position? The tests above and below this one are related, so this one breaks them up, which just adds cognitive load for the reader.
> > 2. I'm all for descriptive names, but this is almost un-tweet-able. :-) Could it be shortened to: testStoreDeletedWhenOffsetFileOlderThanDeleteRetention()
Fixed.
- Shanthoosh
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/52476/#review151273
-----------------------------------------------------------
On Oct. 3, 2016, 5 p.m., Shanthoosh Venkataraman wrote:
>
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/52476/
> -----------------------------------------------------------
>
> (Updated Oct. 3, 2016, 5 p.m.)
>
>
> Review request for samza.
>
>
> Repository: samza
>
>
> Description
> -------
>
> Every local task store is backed up by a kafka changelog topic. Due to log compaction, delete tombstones of the changelog topic have a ttl of delete.retention.ms. Replaying the events from the changelog that has missing delete tombstones, would result in creation of an inconsistent local store(due to the missing of some delete events). This patch deletes the local stores in which difference between current time and last modified time of the offset file is greater than delete.retention.ms during the container startup.
>
>
> Diffs
> -----
>
> samza-core/src/main/java/org/apache/samza/storage/StorageRecovery.java 9329edf7d724f3a0d9235354bb77936f713e3b5f
> samza-core/src/main/scala/org/apache/samza/config/StorageConfig.scala be3f1068f7921c9c8f8697577efea6580672813e
> samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala 05a996c98075ea8ed3767af666b9beeb1933f2a6
> samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala 0b7bcdda1639eea8239a69c31bdf42558e9077d2
> samza-core/src/test/scala/org/apache/samza/storage/TestTaskStorageManager.scala 4d40f520e54beb643acd8410c772b75e2f6a9162
>
> Diff: https://reviews.apache.org/r/52476/diff/
>
>
> Testing
> -------
>
> Unit testing and manual testing has been done to verify the functionality.
>
>
> Thanks,
>
> Shanthoosh Venkataraman
>
>
Re: Review Request 52476: Do not load task store which are older than
delete tombstones.
Posted by Jake Maes <jm...@apache.org>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/52476/#review151273
-----------------------------------------------------------
Looks good. Just a couple things below.
samza-core/src/main/scala/org/apache/samza/config/StorageConfig.scala (line 32)
<https://reviews.apache.org/r/52476/#comment219623>
Typo: deletion.retention.ms is not a valid property.
http://kafka.apache.org/documentation.html#configuration
samza-core/src/test/scala/org/apache/samza/storage/TestTaskStorageManager.scala (line 293)
<https://reviews.apache.org/r/52476/#comment219633>
2 nits:
1. Can you swap this test with the next one in terms of position? The tests above and below this one are related, so this one breaks them up, which just adds cognitive load for the reader.
2. I'm all for descriptive names, but this is almost un-tweet-able. :-) Could it be shortened to: testStoreDeletedWhenOffsetFileOlderThanDeleteRetention()
- Jake Maes
On Oct. 3, 2016, 5 p.m., Shanthoosh Venkataraman wrote:
>
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/52476/
> -----------------------------------------------------------
>
> (Updated Oct. 3, 2016, 5 p.m.)
>
>
> Review request for samza.
>
>
> Repository: samza
>
>
> Description
> -------
>
> Every local task store is backed up by a kafka changelog topic. Due to log compaction, delete tombstones of the changelog topic have a ttl of delete.retention.ms. Replaying the events from the changelog that has missing delete tombstones, would result in creation of an inconsistent local store(due to the missing of some delete events). This patch deletes the local stores in which difference between current time and last modified time of the offset file is greater than delete.retention.ms during the container startup.
>
>
> Diffs
> -----
>
> samza-core/src/main/java/org/apache/samza/storage/StorageRecovery.java 9329edf7d724f3a0d9235354bb77936f713e3b5f
> samza-core/src/main/scala/org/apache/samza/config/StorageConfig.scala be3f1068f7921c9c8f8697577efea6580672813e
> samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala 05a996c98075ea8ed3767af666b9beeb1933f2a6
> samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala 0b7bcdda1639eea8239a69c31bdf42558e9077d2
> samza-core/src/test/scala/org/apache/samza/storage/TestTaskStorageManager.scala 4d40f520e54beb643acd8410c772b75e2f6a9162
>
> Diff: https://reviews.apache.org/r/52476/diff/
>
>
> Testing
> -------
>
> Unit testing and manual testing has been done to verify the functionality.
>
>
> Thanks,
>
> Shanthoosh Venkataraman
>
>
Re: Review Request 52476: Do not load task store which are older than
delete tombstones.
Posted by Boris Shkolnik <bo...@apache.org>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/52476/#review153476
-----------------------------------------------------------
Ship it!
- Boris Shkolnik
On Oct. 19, 2016, 10:04 p.m., Shanthoosh Venkataraman wrote:
>
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/52476/
> -----------------------------------------------------------
>
> (Updated Oct. 19, 2016, 10:04 p.m.)
>
>
> Review request for samza.
>
>
> Repository: samza
>
>
> Description
> -------
>
> Every local task store is backed up by a kafka changelog topic. Due to log compaction, delete tombstones of the changelog topic have a ttl of delete.retention.ms. Replaying the events from the changelog that has missing delete tombstones, would result in creation of an inconsistent local store(due to the missing of some delete events). This patch deletes the local stores in which difference between current time and last modified time of the offset file is greater than delete.retention.ms during the container startup.
>
>
> Diffs
> -----
>
> samza-core/src/main/java/org/apache/samza/storage/StorageRecovery.java 9329edf7d724f3a0d9235354bb77936f713e3b5f
> samza-core/src/main/scala/org/apache/samza/config/StorageConfig.scala be3f1068f7921c9c8f8697577efea6580672813e
> samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala 05a996c98075ea8ed3767af666b9beeb1933f2a6
> samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala 0b7bcdda1639eea8239a69c31bdf42558e9077d2
> samza-core/src/test/scala/org/apache/samza/storage/TestTaskStorageManager.scala 4d40f520e54beb643acd8410c772b75e2f6a9162
> samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala 973ab8cfb3d248bec7efe5e338f5e667f097556d
>
> Diff: https://reviews.apache.org/r/52476/diff/
>
>
> Testing
> -------
>
> Unit testing and manual testing has been done to verify the functionality.
>
>
> Thanks,
>
> Shanthoosh Venkataraman
>
>
Re: Review Request 52476: Do not load task store which are older than
delete tombstones.
Posted by Shanthoosh Venkataraman <sa...@gmail.com>.
> On Oct. 21, 2016, 12:45 a.m., Prateek Maheshwari wrote:
> > samza-core/src/main/java/org/apache/samza/storage/StorageRecovery.java, line 254
> > <https://reviews.apache.org/r/52476/diff/4/?file=1541854#file1541854line254>
> >
> > Does jobConfig.getChangeLog...() (implicit conversion) not work?
No, the implicit conversion doesn't work here. The convenience method is part of StorageConfig class.
> On Oct. 21, 2016, 12:45 a.m., Prateek Maheshwari wrote:
> > samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala, line 130
> > <https://reviews.apache.org/r/52476/diff/4/?file=1541857#file1541857line130>
> >
> > First %s should be store name. Add another %s at the end for loggedStoreDir.
This log message belongs to the task store, which would in itself contain the store name. Adding store name here is unnecessary.
> On Oct. 21, 2016, 12:45 a.m., Prateek Maheshwari wrote:
> > samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala, line 144
> > <https://reviews.apache.org/r/52476/diff/4/?file=1541859#file1541859line144>
> >
> > implicit conversion should probably work.
No, implicit conversion doesn't work here, that is the reason for creating the object explicitly.
> On Oct. 21, 2016, 12:45 a.m., Prateek Maheshwari wrote:
> > samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala, line 186
> > <https://reviews.apache.org/r/52476/diff/4/?file=1541857#file1541857line186>
> >
> > s/partition/logged storage partition to be consistent with next message.
Done.
> On Oct. 21, 2016, 12:45 a.m., Prateek Maheshwari wrote:
> > samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala, line 133
> > <https://reviews.apache.org/r/52476/diff/4/?file=1541857#file1541857line133>
> >
> > Log both last modified time and delete retention ms values too.
Done.
> On Oct. 21, 2016, 12:45 a.m., Prateek Maheshwari wrote:
> > samza-core/src/main/scala/org/apache/samza/config/StorageConfig.scala, line 57
> > <https://reviews.apache.org/r/52476/diff/4/?file=1541855#file1541855line57>
> >
> > getChangeLogDeleteRetentionsInMs
Done.
- Shanthoosh
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/52476/#review153478
-----------------------------------------------------------
On Oct. 22, 2016, 10:06 p.m., Shanthoosh Venkataraman wrote:
>
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/52476/
> -----------------------------------------------------------
>
> (Updated Oct. 22, 2016, 10:06 p.m.)
>
>
> Review request for samza.
>
>
> Repository: samza
>
>
> Description
> -------
>
> Every local task store is backed up by a kafka changelog topic. Due to log compaction, delete tombstones of the changelog topic have a ttl of delete.retention.ms. Replaying the events from the changelog that has missing delete tombstones, would result in creation of an inconsistent local store(due to the missing of some delete events). This patch deletes the local stores in which difference between current time and last modified time of the offset file is greater than delete.retention.ms during the container startup.
>
>
> Diffs
> -----
>
> samza-core/src/main/java/org/apache/samza/storage/StorageRecovery.java 9329edf7d724f3a0d9235354bb77936f713e3b5f
> samza-core/src/main/scala/org/apache/samza/config/StorageConfig.scala be3f1068f7921c9c8f8697577efea6580672813e
> samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala 05a996c98075ea8ed3767af666b9beeb1933f2a6
> samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala 0b7bcdda1639eea8239a69c31bdf42558e9077d2
> samza-core/src/test/scala/org/apache/samza/storage/TestTaskStorageManager.scala 4d40f520e54beb643acd8410c772b75e2f6a9162
> samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala 973ab8cfb3d248bec7efe5e338f5e667f097556d
>
> Diff: https://reviews.apache.org/r/52476/diff/
>
>
> Testing
> -------
>
> Unit testing and manual testing has been done to verify the functionality.
>
>
> Thanks,
>
> Shanthoosh Venkataraman
>
>
Re: Review Request 52476: Do not load task store which are older than
delete tombstones.
Posted by Prateek Maheshwari <pm...@linkedin.com>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/52476/#review153478
-----------------------------------------------------------
Looks pretty good, few final comments.
samza-core/src/main/java/org/apache/samza/storage/StorageRecovery.java (line 253)
<https://reviews.apache.org/r/52476/#comment222805>
Does jobConfig.getChangeLog...() (implicit conversion) not work?
samza-core/src/main/scala/org/apache/samza/config/StorageConfig.scala (line 57)
<https://reviews.apache.org/r/52476/#comment222811>
getChangeLogDeleteRetentionsInMs
samza-core/src/main/scala/org/apache/samza/config/StorageConfig.scala (line 58)
<https://reviews.apache.org/r/52476/#comment222812>
See if you can use the named operator instead of the symbolic operator.
I think you might be able to .toMap on the list of pairs.
samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala (line 104)
<https://reviews.apache.org/r/52476/#comment222827>
Indent by 4 (or whatever continuation indent is)
samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala (line 106)
<https://reviews.apache.org/r/52476/#comment222821>
If loggedStoreDir isn't present we put null into fileOffset. If that's the expected behavior, let's log at info in isStateLoggedStore if (!loggedStoreDir.exists())
samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala (line 108)
<https://reviews.apache.org/r/52476/#comment222815>
Misleading comment, could be the other condition too.
samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala (line 111)
<https://reviews.apache.org/r/52476/#comment222818>
Will be useful to log at info the read file offset here (or in readOffsetFile)
samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala (line 112)
<https://reviews.apache.org/r/52476/#comment222813>
Indent by 4 (or whatever continuation indent is)
samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala (line 118)
<https://reviews.apache.org/r/52476/#comment222819>
Add method description.
samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala (line 130)
<https://reviews.apache.org/r/52476/#comment222824>
First %s should be store name. Add another %s at the end for loggedStoreDir.
samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala (line 133)
<https://reviews.apache.org/r/52476/#comment222822>
log both last modified time and delete retention ms values too.
samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala (line 133)
<https://reviews.apache.org/r/52476/#comment222823>
Log both last modified time and delete retention ms values too.
samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala (line 182)
<https://reviews.apache.org/r/52476/#comment222826>
s/partition/logged storage partition to be consistent with next message.
samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala (line 144)
<https://reviews.apache.org/r/52476/#comment222830>
implicit conversion should probably work.
- Prateek Maheshwari
On Oct. 19, 2016, 3:04 p.m., Shanthoosh Venkataraman wrote:
>
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/52476/
> -----------------------------------------------------------
>
> (Updated Oct. 19, 2016, 3:04 p.m.)
>
>
> Review request for samza.
>
>
> Repository: samza
>
>
> Description
> -------
>
> Every local task store is backed up by a kafka changelog topic. Due to log compaction, delete tombstones of the changelog topic have a ttl of delete.retention.ms. Replaying the events from the changelog that has missing delete tombstones, would result in creation of an inconsistent local store(due to the missing of some delete events). This patch deletes the local stores in which difference between current time and last modified time of the offset file is greater than delete.retention.ms during the container startup.
>
>
> Diffs
> -----
>
> samza-core/src/main/java/org/apache/samza/storage/StorageRecovery.java 9329edf7d724f3a0d9235354bb77936f713e3b5f
> samza-core/src/main/scala/org/apache/samza/config/StorageConfig.scala be3f1068f7921c9c8f8697577efea6580672813e
> samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala 05a996c98075ea8ed3767af666b9beeb1933f2a6
> samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala 0b7bcdda1639eea8239a69c31bdf42558e9077d2
> samza-core/src/test/scala/org/apache/samza/storage/TestTaskStorageManager.scala 4d40f520e54beb643acd8410c772b75e2f6a9162
> samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala 973ab8cfb3d248bec7efe5e338f5e667f097556d
>
> Diff: https://reviews.apache.org/r/52476/diff/
>
>
> Testing
> -------
>
> Unit testing and manual testing has been done to verify the functionality.
>
>
> Thanks,
>
> Shanthoosh Venkataraman
>
>
Re: Review Request 52476: Do not load task store which are older than
delete tombstones.
Posted by Shanthoosh Venkataraman <sa...@gmail.com>.
> On Oct. 20, 2016, 11:46 p.m., Boris Shkolnik wrote:
> > samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala, line 132
> > <https://reviews.apache.org/r/52476/diff/4/?file=1541857#file1541857line132>
> >
> > nit. using System.currentTimeMillis directly makes it difficult to unit test.
Done. Injected SystemClock dependency as a parameter.
- Shanthoosh
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/52476/#review153468
-----------------------------------------------------------
On Oct. 22, 2016, 10:06 p.m., Shanthoosh Venkataraman wrote:
>
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/52476/
> -----------------------------------------------------------
>
> (Updated Oct. 22, 2016, 10:06 p.m.)
>
>
> Review request for samza.
>
>
> Repository: samza
>
>
> Description
> -------
>
> Every local task store is backed up by a kafka changelog topic. Due to log compaction, delete tombstones of the changelog topic have a ttl of delete.retention.ms. Replaying the events from the changelog that has missing delete tombstones, would result in creation of an inconsistent local store(due to the missing of some delete events). This patch deletes the local stores in which difference between current time and last modified time of the offset file is greater than delete.retention.ms during the container startup.
>
>
> Diffs
> -----
>
> samza-core/src/main/java/org/apache/samza/storage/StorageRecovery.java 9329edf7d724f3a0d9235354bb77936f713e3b5f
> samza-core/src/main/scala/org/apache/samza/config/StorageConfig.scala be3f1068f7921c9c8f8697577efea6580672813e
> samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala 05a996c98075ea8ed3767af666b9beeb1933f2a6
> samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala 0b7bcdda1639eea8239a69c31bdf42558e9077d2
> samza-core/src/test/scala/org/apache/samza/storage/TestTaskStorageManager.scala 4d40f520e54beb643acd8410c772b75e2f6a9162
> samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala 973ab8cfb3d248bec7efe5e338f5e667f097556d
>
> Diff: https://reviews.apache.org/r/52476/diff/
>
>
> Testing
> -------
>
> Unit testing and manual testing has been done to verify the functionality.
>
>
> Thanks,
>
> Shanthoosh Venkataraman
>
>
Re: Review Request 52476: Do not load task store which are older than
delete tombstones.
Posted by Boris Shkolnik <bo...@apache.org>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/52476/#review153468
-----------------------------------------------------------
samza-core/src/main/scala/org/apache/samza/config/StorageConfig.scala (line 44)
<https://reviews.apache.org/r/52476/#comment222787>
nit. can we call it storeName.
samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala (line 132)
<https://reviews.apache.org/r/52476/#comment222789>
nit. using System.currentTimeMillis directly makes it difficult to unit test.
samza-core/src/test/scala/org/apache/samza/storage/TestTaskStorageManager.scala (line 323)
<https://reviews.apache.org/r/52476/#comment222800>
isn't it easier just to make method package private?
- Boris Shkolnik
On Oct. 19, 2016, 10:04 p.m., Shanthoosh Venkataraman wrote:
>
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/52476/
> -----------------------------------------------------------
>
> (Updated Oct. 19, 2016, 10:04 p.m.)
>
>
> Review request for samza.
>
>
> Repository: samza
>
>
> Description
> -------
>
> Every local task store is backed up by a kafka changelog topic. Due to log compaction, delete tombstones of the changelog topic have a ttl of delete.retention.ms. Replaying the events from the changelog that has missing delete tombstones, would result in creation of an inconsistent local store(due to the missing of some delete events). This patch deletes the local stores in which difference between current time and last modified time of the offset file is greater than delete.retention.ms during the container startup.
>
>
> Diffs
> -----
>
> samza-core/src/main/java/org/apache/samza/storage/StorageRecovery.java 9329edf7d724f3a0d9235354bb77936f713e3b5f
> samza-core/src/main/scala/org/apache/samza/config/StorageConfig.scala be3f1068f7921c9c8f8697577efea6580672813e
> samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala 05a996c98075ea8ed3767af666b9beeb1933f2a6
> samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala 0b7bcdda1639eea8239a69c31bdf42558e9077d2
> samza-core/src/test/scala/org/apache/samza/storage/TestTaskStorageManager.scala 4d40f520e54beb643acd8410c772b75e2f6a9162
> samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala 973ab8cfb3d248bec7efe5e338f5e667f097556d
>
> Diff: https://reviews.apache.org/r/52476/diff/
>
>
> Testing
> -------
>
> Unit testing and manual testing has been done to verify the functionality.
>
>
> Thanks,
>
> Shanthoosh Venkataraman
>
>
Re: Review Request 52476: Do not load task store which are older than
delete tombstones.
Posted by Shanthoosh Venkataraman <sa...@gmail.com>.
> On Oct. 24, 2016, 9:43 p.m., Prateek Maheshwari wrote:
> > samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala, line 138
> > <https://reviews.apache.org/r/52476/diff/5/?file=1543710#file1543710line138>
> >
> > s/is greater than/is older than.
Fixed.
> On Oct. 24, 2016, 9:43 p.m., Prateek Maheshwari wrote:
> > samza-core/src/main/scala/org/apache/samza/config/StorageConfig.scala, line 32
> > <https://reviews.apache.org/r/52476/diff/5/?file=1543708#file1543708line32>
> >
> > s/CHANGE_LOG/CHANGELOG
Fixed.
- Shanthoosh
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/52476/#review153741
-----------------------------------------------------------
On Oct. 22, 2016, 10:06 p.m., Shanthoosh Venkataraman wrote:
>
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/52476/
> -----------------------------------------------------------
>
> (Updated Oct. 22, 2016, 10:06 p.m.)
>
>
> Review request for samza.
>
>
> Repository: samza
>
>
> Description
> -------
>
> Every local task store is backed up by a kafka changelog topic. Due to log compaction, delete tombstones of the changelog topic have a ttl of delete.retention.ms. Replaying the events from the changelog that has missing delete tombstones, would result in creation of an inconsistent local store(due to the missing of some delete events). This patch deletes the local stores in which difference between current time and last modified time of the offset file is greater than delete.retention.ms during the container startup.
>
>
> Diffs
> -----
>
> samza-core/src/main/java/org/apache/samza/storage/StorageRecovery.java 9329edf7d724f3a0d9235354bb77936f713e3b5f
> samza-core/src/main/scala/org/apache/samza/config/StorageConfig.scala be3f1068f7921c9c8f8697577efea6580672813e
> samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala 05a996c98075ea8ed3767af666b9beeb1933f2a6
> samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala 0b7bcdda1639eea8239a69c31bdf42558e9077d2
> samza-core/src/test/scala/org/apache/samza/storage/TestTaskStorageManager.scala 4d40f520e54beb643acd8410c772b75e2f6a9162
> samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala 973ab8cfb3d248bec7efe5e338f5e667f097556d
>
> Diff: https://reviews.apache.org/r/52476/diff/
>
>
> Testing
> -------
>
> Unit testing and manual testing has been done to verify the functionality.
>
>
> Thanks,
>
> Shanthoosh Venkataraman
>
>
Re: Review Request 52476: Do not load task store which are older than
delete tombstones.
Posted by Prateek Maheshwari <pm...@linkedin.com>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/52476/#review153741
-----------------------------------------------------------
Ship it!
Looks good to me, thanks.
samza-core/src/main/scala/org/apache/samza/config/StorageConfig.scala (line 32)
<https://reviews.apache.org/r/52476/#comment223156>
s/CHANGE_LOG/CHANGELOG
samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala (line 138)
<https://reviews.apache.org/r/52476/#comment223158>
s/is greater than/is older than.
- Prateek Maheshwari
On Oct. 22, 2016, 3:06 p.m., Shanthoosh Venkataraman wrote:
>
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/52476/
> -----------------------------------------------------------
>
> (Updated Oct. 22, 2016, 3:06 p.m.)
>
>
> Review request for samza.
>
>
> Repository: samza
>
>
> Description
> -------
>
> Every local task store is backed up by a kafka changelog topic. Due to log compaction, delete tombstones of the changelog topic have a ttl of delete.retention.ms. Replaying the events from the changelog that has missing delete tombstones, would result in creation of an inconsistent local store(due to the missing of some delete events). This patch deletes the local stores in which difference between current time and last modified time of the offset file is greater than delete.retention.ms during the container startup.
>
>
> Diffs
> -----
>
> samza-core/src/main/java/org/apache/samza/storage/StorageRecovery.java 9329edf7d724f3a0d9235354bb77936f713e3b5f
> samza-core/src/main/scala/org/apache/samza/config/StorageConfig.scala be3f1068f7921c9c8f8697577efea6580672813e
> samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala 05a996c98075ea8ed3767af666b9beeb1933f2a6
> samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala 0b7bcdda1639eea8239a69c31bdf42558e9077d2
> samza-core/src/test/scala/org/apache/samza/storage/TestTaskStorageManager.scala 4d40f520e54beb643acd8410c772b75e2f6a9162
> samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala 973ab8cfb3d248bec7efe5e338f5e667f097556d
>
> Diff: https://reviews.apache.org/r/52476/diff/
>
>
> Testing
> -------
>
> Unit testing and manual testing has been done to verify the functionality.
>
>
> Thanks,
>
> Shanthoosh Venkataraman
>
>
Re: Review Request 52476: Do not load task store which are older than
delete tombstones.
Posted by Shanthoosh Venkataraman <sa...@gmail.com>.
> On Jan. 25, 2017, 10:28 p.m., Prateek Maheshwari wrote:
> > samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala, line 97
> > <https://reviews.apache.org/r/52476/diff/5/?file=1543710#file1543710line97>
> >
> > "Got default storage ..."
Changed.
> On Jan. 25, 2017, 10:28 p.m., Prateek Maheshwari wrote:
> > samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala, line 137
> > <https://reviews.apache.org/r/52476/diff/5/?file=1543710#file1543710line137>
> >
> > No space before ":" here and elsewhere, including method type annotations (e.g. line 169, 185).
Fixed.
> On Jan. 25, 2017, 10:28 p.m., Prateek Maheshwari wrote:
> > samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala, line 152
> > <https://reviews.apache.org/r/52476/diff/5/?file=1543710#file1543710line152>
> >
> > We should log which directory we're using for the store here at INFO.
Added a log statement.
- Shanthoosh
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/52476/#review163035
-----------------------------------------------------------
On Oct. 22, 2016, 10:06 p.m., Shanthoosh Venkataraman wrote:
>
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/52476/
> -----------------------------------------------------------
>
> (Updated Oct. 22, 2016, 10:06 p.m.)
>
>
> Review request for samza.
>
>
> Repository: samza
>
>
> Description
> -------
>
> Every local task store is backed up by a kafka changelog topic. Due to log compaction, delete tombstones of the changelog topic have a ttl of delete.retention.ms. Replaying the events from the changelog that has missing delete tombstones, would result in creation of an inconsistent local store(due to the missing of some delete events). This patch deletes the local stores in which difference between current time and last modified time of the offset file is greater than delete.retention.ms during the container startup.
>
>
> Diffs
> -----
>
> samza-core/src/main/java/org/apache/samza/storage/StorageRecovery.java 9329edf7d724f3a0d9235354bb77936f713e3b5f
> samza-core/src/main/scala/org/apache/samza/config/StorageConfig.scala be3f1068f7921c9c8f8697577efea6580672813e
> samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala 05a996c98075ea8ed3767af666b9beeb1933f2a6
> samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala 0b7bcdda1639eea8239a69c31bdf42558e9077d2
> samza-core/src/test/scala/org/apache/samza/storage/TestTaskStorageManager.scala 4d40f520e54beb643acd8410c772b75e2f6a9162
> samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala 973ab8cfb3d248bec7efe5e338f5e667f097556d
>
> Diff: https://reviews.apache.org/r/52476/diff/
>
>
> Testing
> -------
>
> Unit testing and manual testing has been done to verify the functionality.
>
>
> Thanks,
>
> Shanthoosh Venkataraman
>
>
Re: Review Request 52476: Do not load task store which are older than
delete tombstones.
Posted by Prateek Maheshwari <pm...@linkedin.com>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/52476/#review163035
-----------------------------------------------------------
samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala (line 97)
<https://reviews.apache.org/r/52476/#comment234449>
"Got default storage ..."
samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala (line 137)
<https://reviews.apache.org/r/52476/#comment234451>
No space before ":" here and elsewhere, including method type annotations (e.g. line 169, 185).
samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala (line 152)
<https://reviews.apache.org/r/52476/#comment234450>
We should log which directory we're using for the store here at INFO.
- Prateek Maheshwari
On Oct. 22, 2016, 3:06 p.m., Shanthoosh Venkataraman wrote:
>
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/52476/
> -----------------------------------------------------------
>
> (Updated Oct. 22, 2016, 3:06 p.m.)
>
>
> Review request for samza.
>
>
> Repository: samza
>
>
> Description
> -------
>
> Every local task store is backed up by a kafka changelog topic. Due to log compaction, delete tombstones of the changelog topic have a ttl of delete.retention.ms. Replaying the events from the changelog that has missing delete tombstones, would result in creation of an inconsistent local store(due to the missing of some delete events). This patch deletes the local stores in which difference between current time and last modified time of the offset file is greater than delete.retention.ms during the container startup.
>
>
> Diffs
> -----
>
> samza-core/src/main/java/org/apache/samza/storage/StorageRecovery.java 9329edf7d724f3a0d9235354bb77936f713e3b5f
> samza-core/src/main/scala/org/apache/samza/config/StorageConfig.scala be3f1068f7921c9c8f8697577efea6580672813e
> samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala 05a996c98075ea8ed3767af666b9beeb1933f2a6
> samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala 0b7bcdda1639eea8239a69c31bdf42558e9077d2
> samza-core/src/test/scala/org/apache/samza/storage/TestTaskStorageManager.scala 4d40f520e54beb643acd8410c772b75e2f6a9162
> samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala 973ab8cfb3d248bec7efe5e338f5e667f097556d
>
> Diff: https://reviews.apache.org/r/52476/diff/
>
>
> Testing
> -------
>
> Unit testing and manual testing has been done to verify the functionality.
>
>
> Thanks,
>
> Shanthoosh Venkataraman
>
>
Re: Review Request 52476: SAMZA-1083 : Do not load task store which
are older than delete tombstones.
Posted by Shanthoosh Venkataraman <sa...@gmail.com>.
> On Feb. 8, 2017, 5:17 p.m., Jake Maes wrote:
> > samza-core/src/main/scala/org/apache/samza/config/StorageConfig.scala, line 33
> > <https://reviews.apache.org/r/52476/diff/5/?file=1543708#file1543708line33>
> >
> > nit: TimeUnit.DAYS.toMillis(1)
Fixed.
- Shanthoosh
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/52476/#review164701
-----------------------------------------------------------
On Feb. 8, 2017, 7:09 p.m., Shanthoosh Venkataraman wrote:
>
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/52476/
> -----------------------------------------------------------
>
> (Updated Feb. 8, 2017, 7:09 p.m.)
>
>
> Review request for samza.
>
>
> Repository: samza
>
>
> Description
> -------
>
> Every local task store is backed up by a kafka changelog topic. Due to log compaction, delete tombstones of the changelog topic have a ttl of delete.retention.ms. Replaying the events from the changelog that has missing delete tombstones, would result in creation of an inconsistent local store(due to the missing of some delete events). This patch deletes the local stores in which difference between current time and last modified time of the offset file is greater than delete.retention.ms during the container startup.
>
>
> Diffs
> -----
>
> samza-core/src/main/java/org/apache/samza/storage/StorageRecovery.java 9329edf7d724f3a0d9235354bb77936f713e3b5f
> samza-core/src/main/scala/org/apache/samza/config/StorageConfig.scala be3f1068f7921c9c8f8697577efea6580672813e
> samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala 05a996c98075ea8ed3767af666b9beeb1933f2a6
> samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala 0b7bcdda1639eea8239a69c31bdf42558e9077d2
> samza-core/src/test/scala/org/apache/samza/storage/TestTaskStorageManager.scala 4d40f520e54beb643acd8410c772b75e2f6a9162
> samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala 973ab8cfb3d248bec7efe5e338f5e667f097556d
>
> Diff: https://reviews.apache.org/r/52476/diff/
>
>
> Testing
> -------
>
> Unit testing and manual testing has been done to verify the functionality.
>
>
> Thanks,
>
> Shanthoosh Venkataraman
>
>
Re: Review Request 52476: Do not load task store which are older than
delete tombstones.
Posted by Jake Maes <jm...@apache.org>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/52476/#review164701
-----------------------------------------------------------
samza-core/src/main/scala/org/apache/samza/config/StorageConfig.scala (line 33)
<https://reviews.apache.org/r/52476/#comment236461>
nit: TimeUnit.DAYS.toMillis(1)
samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala (lines 170 - 171)
<https://reviews.apache.org/r/52476/#comment236464>
The logic structure is weird.
We really shouldn't need to read the file in order to return whether the file is present and its last modified time.
It seems like we need a "isStoreValid()" method which
1. Checks persistedStores.contains(storeName)
2. Calls isStaleLoggedStore to verify the age,
3. Calls a new isOffsetFileValid method to verify the contents of the file
And it returns true only if all the above checks succeed.
- Jake Maes
On Oct. 22, 2016, 10:06 p.m., Shanthoosh Venkataraman wrote:
>
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/52476/
> -----------------------------------------------------------
>
> (Updated Oct. 22, 2016, 10:06 p.m.)
>
>
> Review request for samza.
>
>
> Repository: samza
>
>
> Description
> -------
>
> Every local task store is backed up by a kafka changelog topic. Due to log compaction, delete tombstones of the changelog topic have a ttl of delete.retention.ms. Replaying the events from the changelog that has missing delete tombstones, would result in creation of an inconsistent local store(due to the missing of some delete events). This patch deletes the local stores in which difference between current time and last modified time of the offset file is greater than delete.retention.ms during the container startup.
>
>
> Diffs
> -----
>
> samza-core/src/main/java/org/apache/samza/storage/StorageRecovery.java 9329edf7d724f3a0d9235354bb77936f713e3b5f
> samza-core/src/main/scala/org/apache/samza/config/StorageConfig.scala be3f1068f7921c9c8f8697577efea6580672813e
> samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala 05a996c98075ea8ed3767af666b9beeb1933f2a6
> samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala 0b7bcdda1639eea8239a69c31bdf42558e9077d2
> samza-core/src/test/scala/org/apache/samza/storage/TestTaskStorageManager.scala 4d40f520e54beb643acd8410c772b75e2f6a9162
> samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala 973ab8cfb3d248bec7efe5e338f5e667f097556d
>
> Diff: https://reviews.apache.org/r/52476/diff/
>
>
> Testing
> -------
>
> Unit testing and manual testing has been done to verify the functionality.
>
>
> Thanks,
>
> Shanthoosh Venkataraman
>
>
Re: Review Request 52476: SAMZA-1083 : Do not load task store which
are older than delete tombstones.
Posted by Shanthoosh Venkataraman <sa...@gmail.com>.
> On Feb. 8, 2017, 8:43 p.m., Jake Maes wrote:
> > samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala, line 177
> > <https://reviews.apache.org/r/52476/diff/4-6/?file=1541857#file1541857line177>
> >
> > Looks like this log statement belongs in an else-block. It says the store is stale, but in this block, we've determined hasValidOffsetFile=true.
> >
> > Also the wording isn't entirely accurate. The offset file may be present, but empty. It might be better to say "Offset file is not valid for store: %s"
My bad, this minor thing was missed while refactoring. Thanks for pointing it out. Fixed.
- Shanthoosh
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/52476/#review164765
-----------------------------------------------------------
On Feb. 8, 2017, 9:37 p.m., Shanthoosh Venkataraman wrote:
>
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/52476/
> -----------------------------------------------------------
>
> (Updated Feb. 8, 2017, 9:37 p.m.)
>
>
> Review request for samza.
>
>
> Repository: samza
>
>
> Description
> -------
>
> Every local task store is backed up by a kafka changelog topic. Due to log compaction, delete tombstones of the changelog topic have a ttl of delete.retention.ms. Replaying the events from the changelog that has missing delete tombstones, would result in creation of an inconsistent local store(due to the missing of some delete events). This patch deletes the local stores in which difference between current time and last modified time of the offset file is greater than delete.retention.ms during the container startup.
>
>
> Diffs
> -----
>
> samza-core/src/main/java/org/apache/samza/storage/StorageRecovery.java 9329edf7d724f3a0d9235354bb77936f713e3b5f
> samza-core/src/main/scala/org/apache/samza/config/StorageConfig.scala a3587d0a40c57374ee1742234929d444e381e42d
> samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala c3308bfd7de04c335fef6cb66baa29286a230080
> samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala 0b7bcdda1639eea8239a69c31bdf42558e9077d2
> samza-core/src/test/scala/org/apache/samza/storage/TestTaskStorageManager.scala 4d40f520e54beb643acd8410c772b75e2f6a9162
> samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala 9320cf744ff90d647a198b51cb06d2a526fe68fa
>
> Diff: https://reviews.apache.org/r/52476/diff/
>
>
> Testing
> -------
>
> Unit testing and manual testing has been done to verify the functionality.
>
>
> Thanks,
>
> Shanthoosh Venkataraman
>
>
Re: Review Request 52476: SAMZA-1083 : Do not load task store which
are older than delete tombstones.
Posted by Jake Maes <jm...@apache.org>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/52476/#review164765
-----------------------------------------------------------
samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala (line 173)
<https://reviews.apache.org/r/52476/#comment236527>
Looks like this log statement belongs in an else-block. It says the store is stale, but in this block, we've determined hasValidOffsetFile=true.
Also the wording isn't entirely accurate. The offset file may be present, but empty. It might be better to say "Offset file is not valid for store: %s"
- Jake Maes
On Feb. 8, 2017, 8 p.m., Shanthoosh Venkataraman wrote:
>
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/52476/
> -----------------------------------------------------------
>
> (Updated Feb. 8, 2017, 8 p.m.)
>
>
> Review request for samza.
>
>
> Repository: samza
>
>
> Description
> -------
>
> Every local task store is backed up by a kafka changelog topic. Due to log compaction, delete tombstones of the changelog topic have a ttl of delete.retention.ms. Replaying the events from the changelog that has missing delete tombstones, would result in creation of an inconsistent local store(due to the missing of some delete events). This patch deletes the local stores in which difference between current time and last modified time of the offset file is greater than delete.retention.ms during the container startup.
>
>
> Diffs
> -----
>
> samza-core/src/main/java/org/apache/samza/storage/StorageRecovery.java 9329edf7d724f3a0d9235354bb77936f713e3b5f
> samza-core/src/main/scala/org/apache/samza/config/StorageConfig.scala a3587d0a40c57374ee1742234929d444e381e42d
> samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala c3308bfd7de04c335fef6cb66baa29286a230080
> samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala 0b7bcdda1639eea8239a69c31bdf42558e9077d2
> samza-core/src/test/scala/org/apache/samza/storage/TestTaskStorageManager.scala 4d40f520e54beb643acd8410c772b75e2f6a9162
> samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala 9320cf744ff90d647a198b51cb06d2a526fe68fa
>
> Diff: https://reviews.apache.org/r/52476/diff/
>
>
> Testing
> -------
>
> Unit testing and manual testing has been done to verify the functionality.
>
>
> Thanks,
>
> Shanthoosh Venkataraman
>
>
Re: Review Request 52476: SAMZA-1083 : Do not load task store which
are older than delete tombstones.
Posted by Shanthoosh Venkataraman <sa...@gmail.com>.
> On Feb. 8, 2017, 10:06 p.m., Prateek Maheshwari wrote:
> > samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala, line 196
> > <https://reviews.apache.org/r/52476/diff/7/?file=1627958#file1627958line196>
> >
> > s/in the store/for the store.
Fixed.
> On Feb. 8, 2017, 10:06 p.m., Prateek Maheshwari wrote:
> > samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala, line 199
> > <https://reviews.apache.org/r/52476/diff/7/?file=1627958#file1627958line199>
> >
> > No space before colon, here and other places in this file.
Fixed at all the places.
> On Feb. 8, 2017, 10:06 p.m., Prateek Maheshwari wrote:
> > samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala, line 159
> > <https://reviews.apache.org/r/52476/diff/7/?file=1627958#file1627958line159>
> >
> > No comma before 'if' if it's the last clause in the sentence.
Fixed.
> On Feb. 8, 2017, 10:06 p.m., Prateek Maheshwari wrote:
> > samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala, line 113
> > <https://reviews.apache.org/r/52476/diff/7/?file=1627958#file1627958line113>
> >
> > s/of the store/for the store.
Fixed.
> On Feb. 8, 2017, 10:06 p.m., Prateek Maheshwari wrote:
> > samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala, line 120
> > <https://reviews.apache.org/r/52476/diff/7/?file=1627958#file1627958line120>
> >
> > @param doesn't go in the method description, use {@code}
Fixed.
> On Feb. 8, 2017, 10:06 p.m., Prateek Maheshwari wrote:
> > samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala, line 135
> > <https://reviews.apache.org/r/52476/diff/7/?file=1627958#file1627958line135>
> >
> > Explain what stale means here.
> >
> > Use @code instead of @param here.
Fixed.
- Shanthoosh
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/52476/#review164786
-----------------------------------------------------------
On Feb. 8, 2017, 9:37 p.m., Shanthoosh Venkataraman wrote:
>
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/52476/
> -----------------------------------------------------------
>
> (Updated Feb. 8, 2017, 9:37 p.m.)
>
>
> Review request for samza.
>
>
> Repository: samza
>
>
> Description
> -------
>
> Every local task store is backed up by a kafka changelog topic. Due to log compaction, delete tombstones of the changelog topic have a ttl of delete.retention.ms. Replaying the events from the changelog that has missing delete tombstones, would result in creation of an inconsistent local store(due to the missing of some delete events). This patch deletes the local stores in which difference between current time and last modified time of the offset file is greater than delete.retention.ms during the container startup.
>
>
> Diffs
> -----
>
> samza-core/src/main/java/org/apache/samza/storage/StorageRecovery.java 9329edf7d724f3a0d9235354bb77936f713e3b5f
> samza-core/src/main/scala/org/apache/samza/config/StorageConfig.scala a3587d0a40c57374ee1742234929d444e381e42d
> samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala c3308bfd7de04c335fef6cb66baa29286a230080
> samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala 0b7bcdda1639eea8239a69c31bdf42558e9077d2
> samza-core/src/test/scala/org/apache/samza/storage/TestTaskStorageManager.scala 4d40f520e54beb643acd8410c772b75e2f6a9162
> samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala 9320cf744ff90d647a198b51cb06d2a526fe68fa
>
> Diff: https://reviews.apache.org/r/52476/diff/
>
>
> Testing
> -------
>
> Unit testing and manual testing has been done to verify the functionality.
>
>
> Thanks,
>
> Shanthoosh Venkataraman
>
>
Re: Review Request 52476: SAMZA-1083 : Do not load task store which
are older than delete tombstones.
Posted by Prateek Maheshwari <pm...@linkedin.com>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/52476/#review164786
-----------------------------------------------------------
Fix it, then Ship it!
LGTM, thanks. Few code style/documentation related comments.
samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala (line 107)
<https://reviews.apache.org/r/52476/#comment236555>
Can delete, doesn't describe the entire block, and unnecessary for the first part.
samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala (line 113)
<https://reviews.apache.org/r/52476/#comment236557>
s/of the store/for the store.
samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala (line 120)
<https://reviews.apache.org/r/52476/#comment236554>
@param doesn't go in the method description, use {@code}
samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala (line 135)
<https://reviews.apache.org/r/52476/#comment236559>
Explain what stale means here.
Use @code instead of @param here.
samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala (line 138)
<https://reviews.apache.org/r/52476/#comment236561>
Can just say "true if the store is stale". Description of what stale means should go in the method description.
samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala (line 159)
<https://reviews.apache.org/r/52476/#comment236562>
No comma before 'if' if it's the last clause in the sentence.
samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala (line 196)
<https://reviews.apache.org/r/52476/#comment236565>
s/in the store/for the store.
samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala (line 198)
<https://reviews.apache.org/r/52476/#comment236563>
No space before colon, here and other places in this file.
samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala (line 202)
<https://reviews.apache.org/r/52476/#comment236564>
Thanks!
- Prateek Maheshwari
On Feb. 8, 2017, 1:37 p.m., Shanthoosh Venkataraman wrote:
>
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/52476/
> -----------------------------------------------------------
>
> (Updated Feb. 8, 2017, 1:37 p.m.)
>
>
> Review request for samza.
>
>
> Repository: samza
>
>
> Description
> -------
>
> Every local task store is backed up by a kafka changelog topic. Due to log compaction, delete tombstones of the changelog topic have a ttl of delete.retention.ms. Replaying the events from the changelog that has missing delete tombstones, would result in creation of an inconsistent local store(due to the missing of some delete events). This patch deletes the local stores in which difference between current time and last modified time of the offset file is greater than delete.retention.ms during the container startup.
>
>
> Diffs
> -----
>
> samza-core/src/main/java/org/apache/samza/storage/StorageRecovery.java 9329edf7d724f3a0d9235354bb77936f713e3b5f
> samza-core/src/main/scala/org/apache/samza/config/StorageConfig.scala a3587d0a40c57374ee1742234929d444e381e42d
> samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala c3308bfd7de04c335fef6cb66baa29286a230080
> samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala 0b7bcdda1639eea8239a69c31bdf42558e9077d2
> samza-core/src/test/scala/org/apache/samza/storage/TestTaskStorageManager.scala 4d40f520e54beb643acd8410c772b75e2f6a9162
> samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala 9320cf744ff90d647a198b51cb06d2a526fe68fa
>
> Diff: https://reviews.apache.org/r/52476/diff/
>
>
> Testing
> -------
>
> Unit testing and manual testing has been done to verify the functionality.
>
>
> Thanks,
>
> Shanthoosh Venkataraman
>
>
Re: Review Request 52476: SAMZA-1083 : Do not load task store which
are older than delete tombstones.
Posted by Jake Maes <jm...@apache.org>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/52476/#review164782
-----------------------------------------------------------
Ship it!
Ship It!
- Jake Maes
On Feb. 8, 2017, 9:37 p.m., Shanthoosh Venkataraman wrote:
>
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/52476/
> -----------------------------------------------------------
>
> (Updated Feb. 8, 2017, 9:37 p.m.)
>
>
> Review request for samza.
>
>
> Repository: samza
>
>
> Description
> -------
>
> Every local task store is backed up by a kafka changelog topic. Due to log compaction, delete tombstones of the changelog topic have a ttl of delete.retention.ms. Replaying the events from the changelog that has missing delete tombstones, would result in creation of an inconsistent local store(due to the missing of some delete events). This patch deletes the local stores in which difference between current time and last modified time of the offset file is greater than delete.retention.ms during the container startup.
>
>
> Diffs
> -----
>
> samza-core/src/main/java/org/apache/samza/storage/StorageRecovery.java 9329edf7d724f3a0d9235354bb77936f713e3b5f
> samza-core/src/main/scala/org/apache/samza/config/StorageConfig.scala a3587d0a40c57374ee1742234929d444e381e42d
> samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala c3308bfd7de04c335fef6cb66baa29286a230080
> samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala 0b7bcdda1639eea8239a69c31bdf42558e9077d2
> samza-core/src/test/scala/org/apache/samza/storage/TestTaskStorageManager.scala 4d40f520e54beb643acd8410c772b75e2f6a9162
> samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala 9320cf744ff90d647a198b51cb06d2a526fe68fa
>
> Diff: https://reviews.apache.org/r/52476/diff/
>
>
> Testing
> -------
>
> Unit testing and manual testing has been done to verify the functionality.
>
>
> Thanks,
>
> Shanthoosh Venkataraman
>
>
Re: Review Request 52476: SAMZA-1083 : Do not load task store which
are older than delete tombstones.
Posted by Shanthoosh Venkataraman <sa...@gmail.com>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/52476/
-----------------------------------------------------------
(Updated Feb. 8, 2017, 9:37 p.m.)
Review request for samza.
Repository: samza
Description
-------
Every local task store is backed up by a kafka changelog topic. Due to log compaction, delete tombstones of the changelog topic have a ttl of delete.retention.ms. Replaying the events from the changelog that has missing delete tombstones, would result in creation of an inconsistent local store(due to the missing of some delete events). This patch deletes the local stores in which difference between current time and last modified time of the offset file is greater than delete.retention.ms during the container startup.
Diffs (updated)
-----
samza-core/src/main/java/org/apache/samza/storage/StorageRecovery.java 9329edf7d724f3a0d9235354bb77936f713e3b5f
samza-core/src/main/scala/org/apache/samza/config/StorageConfig.scala a3587d0a40c57374ee1742234929d444e381e42d
samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala c3308bfd7de04c335fef6cb66baa29286a230080
samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala 0b7bcdda1639eea8239a69c31bdf42558e9077d2
samza-core/src/test/scala/org/apache/samza/storage/TestTaskStorageManager.scala 4d40f520e54beb643acd8410c772b75e2f6a9162
samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala 9320cf744ff90d647a198b51cb06d2a526fe68fa
Diff: https://reviews.apache.org/r/52476/diff/
Testing
-------
Unit testing and manual testing has been done to verify the functionality.
Thanks,
Shanthoosh Venkataraman
Re: Review Request 52476: SAMZA-1083 : Do not load task store which
are older than delete tombstones.
Posted by Shanthoosh Venkataraman <sa...@gmail.com>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/52476/
-----------------------------------------------------------
(Updated Feb. 8, 2017, 8 p.m.)
Review request for samza.
Repository: samza
Description
-------
Every local task store is backed up by a kafka changelog topic. Due to log compaction, delete tombstones of the changelog topic have a ttl of delete.retention.ms. Replaying the events from the changelog that has missing delete tombstones, would result in creation of an inconsistent local store(due to the missing of some delete events). This patch deletes the local stores in which difference between current time and last modified time of the offset file is greater than delete.retention.ms during the container startup.
Diffs (updated)
-----
samza-core/src/main/java/org/apache/samza/storage/StorageRecovery.java 9329edf7d724f3a0d9235354bb77936f713e3b5f
samza-core/src/main/scala/org/apache/samza/config/StorageConfig.scala a3587d0a40c57374ee1742234929d444e381e42d
samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala c3308bfd7de04c335fef6cb66baa29286a230080
samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala 0b7bcdda1639eea8239a69c31bdf42558e9077d2
samza-core/src/test/scala/org/apache/samza/storage/TestTaskStorageManager.scala 4d40f520e54beb643acd8410c772b75e2f6a9162
samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala 9320cf744ff90d647a198b51cb06d2a526fe68fa
Diff: https://reviews.apache.org/r/52476/diff/
Testing
-------
Unit testing and manual testing has been done to verify the functionality.
Thanks,
Shanthoosh Venkataraman
Re: Review Request 52476: SAMZA-1083 : Do not load task store which
are older than delete tombstones.
Posted by Prateek Maheshwari <pm...@linkedin.com>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/52476/#review164749
-----------------------------------------------------------
samza-core/src/main/scala/org/apache/samza/config/StorageConfig.scala (line 32)
<https://reviews.apache.org/r/52476/#comment236506>
Doesn't look like this is fixed? Did you miss updating with a patch?
- Prateek Maheshwari
On Feb. 8, 2017, 11:09 a.m., Shanthoosh Venkataraman wrote:
>
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/52476/
> -----------------------------------------------------------
>
> (Updated Feb. 8, 2017, 11:09 a.m.)
>
>
> Review request for samza.
>
>
> Repository: samza
>
>
> Description
> -------
>
> Every local task store is backed up by a kafka changelog topic. Due to log compaction, delete tombstones of the changelog topic have a ttl of delete.retention.ms. Replaying the events from the changelog that has missing delete tombstones, would result in creation of an inconsistent local store(due to the missing of some delete events). This patch deletes the local stores in which difference between current time and last modified time of the offset file is greater than delete.retention.ms during the container startup.
>
>
> Diffs
> -----
>
> samza-core/src/main/java/org/apache/samza/storage/StorageRecovery.java 9329edf7d724f3a0d9235354bb77936f713e3b5f
> samza-core/src/main/scala/org/apache/samza/config/StorageConfig.scala be3f1068f7921c9c8f8697577efea6580672813e
> samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala 05a996c98075ea8ed3767af666b9beeb1933f2a6
> samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala 0b7bcdda1639eea8239a69c31bdf42558e9077d2
> samza-core/src/test/scala/org/apache/samza/storage/TestTaskStorageManager.scala 4d40f520e54beb643acd8410c772b75e2f6a9162
> samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala 973ab8cfb3d248bec7efe5e338f5e667f097556d
>
> Diff: https://reviews.apache.org/r/52476/diff/
>
>
> Testing
> -------
>
> Unit testing and manual testing has been done to verify the functionality.
>
>
> Thanks,
>
> Shanthoosh Venkataraman
>
>
Re: Review Request 52476: SAMZA-1083 : Do not load task store which
are older than delete tombstones.
Posted by Shanthoosh Venkataraman <sa...@gmail.com>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/52476/
-----------------------------------------------------------
(Updated Feb. 8, 2017, 7:09 p.m.)
Review request for samza.
Summary (updated)
-----------------
SAMZA-1083 : Do not load task store which are older than delete tombstones.
Repository: samza
Description
-------
Every local task store is backed up by a kafka changelog topic. Due to log compaction, delete tombstones of the changelog topic have a ttl of delete.retention.ms. Replaying the events from the changelog that has missing delete tombstones, would result in creation of an inconsistent local store(due to the missing of some delete events). This patch deletes the local stores in which difference between current time and last modified time of the offset file is greater than delete.retention.ms during the container startup.
Diffs
-----
samza-core/src/main/java/org/apache/samza/storage/StorageRecovery.java 9329edf7d724f3a0d9235354bb77936f713e3b5f
samza-core/src/main/scala/org/apache/samza/config/StorageConfig.scala be3f1068f7921c9c8f8697577efea6580672813e
samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala 05a996c98075ea8ed3767af666b9beeb1933f2a6
samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala 0b7bcdda1639eea8239a69c31bdf42558e9077d2
samza-core/src/test/scala/org/apache/samza/storage/TestTaskStorageManager.scala 4d40f520e54beb643acd8410c772b75e2f6a9162
samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala 973ab8cfb3d248bec7efe5e338f5e667f097556d
Diff: https://reviews.apache.org/r/52476/diff/
Testing
-------
Unit testing and manual testing has been done to verify the functionality.
Thanks,
Shanthoosh Venkataraman
Re: Review Request 52476: Do not load task store which are older than
delete tombstones.
Posted by Shanthoosh Venkataraman <sa...@gmail.com>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/52476/
-----------------------------------------------------------
(Updated Oct. 22, 2016, 10:06 p.m.)
Review request for samza.
Repository: samza
Description
-------
Every local task store is backed up by a kafka changelog topic. Due to log compaction, delete tombstones of the changelog topic have a ttl of delete.retention.ms. Replaying the events from the changelog that has missing delete tombstones, would result in creation of an inconsistent local store(due to the missing of some delete events). This patch deletes the local stores in which difference between current time and last modified time of the offset file is greater than delete.retention.ms during the container startup.
Diffs (updated)
-----
samza-core/src/main/java/org/apache/samza/storage/StorageRecovery.java 9329edf7d724f3a0d9235354bb77936f713e3b5f
samza-core/src/main/scala/org/apache/samza/config/StorageConfig.scala be3f1068f7921c9c8f8697577efea6580672813e
samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala 05a996c98075ea8ed3767af666b9beeb1933f2a6
samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala 0b7bcdda1639eea8239a69c31bdf42558e9077d2
samza-core/src/test/scala/org/apache/samza/storage/TestTaskStorageManager.scala 4d40f520e54beb643acd8410c772b75e2f6a9162
samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala 973ab8cfb3d248bec7efe5e338f5e667f097556d
Diff: https://reviews.apache.org/r/52476/diff/
Testing
-------
Unit testing and manual testing has been done to verify the functionality.
Thanks,
Shanthoosh Venkataraman
Re: Review Request 52476: Do not load task store which are older than
delete tombstones.
Posted by Shanthoosh Venkataraman <sa...@gmail.com>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/52476/
-----------------------------------------------------------
(Updated Oct. 19, 2016, 10:04 p.m.)
Review request for samza.
Repository: samza
Description
-------
Every local task store is backed up by a kafka changelog topic. Due to log compaction, delete tombstones of the changelog topic have a ttl of delete.retention.ms. Replaying the events from the changelog that has missing delete tombstones, would result in creation of an inconsistent local store(due to the missing of some delete events). This patch deletes the local stores in which difference between current time and last modified time of the offset file is greater than delete.retention.ms during the container startup.
Diffs (updated)
-----
samza-core/src/main/java/org/apache/samza/storage/StorageRecovery.java 9329edf7d724f3a0d9235354bb77936f713e3b5f
samza-core/src/main/scala/org/apache/samza/config/StorageConfig.scala be3f1068f7921c9c8f8697577efea6580672813e
samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala 05a996c98075ea8ed3767af666b9beeb1933f2a6
samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala 0b7bcdda1639eea8239a69c31bdf42558e9077d2
samza-core/src/test/scala/org/apache/samza/storage/TestTaskStorageManager.scala 4d40f520e54beb643acd8410c772b75e2f6a9162
samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala 973ab8cfb3d248bec7efe5e338f5e667f097556d
Diff: https://reviews.apache.org/r/52476/diff/
Testing
-------
Unit testing and manual testing has been done to verify the functionality.
Thanks,
Shanthoosh Venkataraman
Re: Review Request 52476: Do not load task store which are older than
delete tombstones.
Posted by Shanthoosh Venkataraman <sa...@gmail.com>.
> On Oct. 17, 2016, 11:08 p.m., Prateek Maheshwari wrote:
> > samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala, line 156
> > <https://reviews.apache.org/r/52476/diff/3/?file=1539990#file1539990line156>
> >
> > Unrelated, but let's make this info.
Done.
> On Oct. 17, 2016, 11:08 p.m., Prateek Maheshwari wrote:
> > samza-core/src/main/scala/org/apache/samza/config/StorageConfig.scala, line 33
> > <https://reviews.apache.org/r/52476/diff/3/?file=1539988#file1539988line33>
> >
> > Usually more readable if you write this as a multiplication: 1 * 24 * 60 * 60 * 1000L
Done.
> On Oct. 17, 2016, 11:08 p.m., Prateek Maheshwari wrote:
> > samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala, line 532
> > <https://reviews.apache.org/r/52476/diff/3/?file=1539989#file1539989line532>
> >
> > Prefer passing the one config that we need explicitly instead of passing the config object.
Moved to use changeLogDeleteRetentions map.
> On Oct. 17, 2016, 11:08 p.m., Prateek Maheshwari wrote:
> > samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala, line 29
> > <https://reviews.apache.org/r/52476/diff/3/?file=1539990#file1539990line29>
> >
> > Unrelated to RB but prefer explicit imports.
Done.
> On Oct. 17, 2016, 11:08 p.m., Prateek Maheshwari wrote:
> > samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala, line 26
> > <https://reviews.apache.org/r/52476/diff/3/?file=1539990#file1539990line26>
> >
> > Delete or import explicitly.
Done.
> On Oct. 17, 2016, 11:08 p.m., Prateek Maheshwari wrote:
> > samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala, line 107
> > <https://reviews.apache.org/r/52476/diff/3/?file=1539990#file1539990line107>
> >
> > Add method description.
Done.
> On Oct. 17, 2016, 11:08 p.m., Prateek Maheshwari wrote:
> > samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala, line 114
> > <https://reviews.apache.org/r/52476/diff/3/?file=1539990#file1539990line114>
> >
> > Another case we ran into on Friday - if the oldest offset in the changelog topic is newer than the offset in the OFFSET file. Do you need to handle that here?
> >
> > Nitpick: would isStaleStore be clearer?
Discussed offline. This is a regular scenario that happens with compaction (message expiration in general w.r.t topics). When the offset in the offset file is older than oldest offset in changelog, it indicates that compaction has happened. To not miss messages from the topic in the consumption, users have to consume from the oldest offset in the changelog, which is controlled by the config parameter systems.name.consumer.auto.offset.reset.
> On Oct. 17, 2016, 11:08 p.m., Prateek Maheshwari wrote:
> > samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala, line 122
> > <https://reviews.apache.org/r/52476/diff/3/?file=1539990#file1539990line122>
> >
> > Mention somewhere in the message that this means that the store is stale.
Done.
- Shanthoosh
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/52476/#review152976
-----------------------------------------------------------
On Oct. 19, 2016, 10:04 p.m., Shanthoosh Venkataraman wrote:
>
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/52476/
> -----------------------------------------------------------
>
> (Updated Oct. 19, 2016, 10:04 p.m.)
>
>
> Review request for samza.
>
>
> Repository: samza
>
>
> Description
> -------
>
> Every local task store is backed up by a kafka changelog topic. Due to log compaction, delete tombstones of the changelog topic have a ttl of delete.retention.ms. Replaying the events from the changelog that has missing delete tombstones, would result in creation of an inconsistent local store(due to the missing of some delete events). This patch deletes the local stores in which difference between current time and last modified time of the offset file is greater than delete.retention.ms during the container startup.
>
>
> Diffs
> -----
>
> samza-core/src/main/java/org/apache/samza/storage/StorageRecovery.java 9329edf7d724f3a0d9235354bb77936f713e3b5f
> samza-core/src/main/scala/org/apache/samza/config/StorageConfig.scala be3f1068f7921c9c8f8697577efea6580672813e
> samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala 05a996c98075ea8ed3767af666b9beeb1933f2a6
> samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala 0b7bcdda1639eea8239a69c31bdf42558e9077d2
> samza-core/src/test/scala/org/apache/samza/storage/TestTaskStorageManager.scala 4d40f520e54beb643acd8410c772b75e2f6a9162
> samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala 973ab8cfb3d248bec7efe5e338f5e667f097556d
>
> Diff: https://reviews.apache.org/r/52476/diff/
>
>
> Testing
> -------
>
> Unit testing and manual testing has been done to verify the functionality.
>
>
> Thanks,
>
> Shanthoosh Venkataraman
>
>
Re: Review Request 52476: Do not load task store which are older than
delete tombstones.
Posted by Prateek Maheshwari <pm...@linkedin.com>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/52476/#review152976
-----------------------------------------------------------
samza-core/src/main/scala/org/apache/samza/config/StorageConfig.scala (line 33)
<https://reviews.apache.org/r/52476/#comment222170>
Usually more readable if you write this as a multiplication: 1 * 24 * 60 * 60 * 1000L
samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala (line 532)
<https://reviews.apache.org/r/52476/#comment222189>
Prefer passing the one config that we need explicitly instead of passing the config object.
samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala (line 26)
<https://reviews.apache.org/r/52476/#comment222171>
Delete or import explicitly.
samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala (line 29)
<https://reviews.apache.org/r/52476/#comment222190>
Unrelated to RB but prefer explicit imports.
samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala (line 71)
<https://reviews.apache.org/r/52476/#comment222173>
SystemClock exists so that you can pass a "Clock" to your method/class and mock it in tests. Let's either do that (preferred) or use System.currentTimeMillis() directly.
samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala (line 98)
<https://reviews.apache.org/r/52476/#comment222184>
Looks like we've updated `fileOffset` in `#readOffsetFile` as a side effect even when the store is stale. Is that what we want here?
samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala (line 100)
<https://reviews.apache.org/r/52476/#comment222175>
Add an INFO message here.
samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala (line 106)
<https://reviews.apache.org/r/52476/#comment222176>
Add method description.
samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala (line 113)
<https://reviews.apache.org/r/52476/#comment222177>
Another case we ran into on Friday - if the oldest offset in the changelog topic is newer than the offset in the OFFSET file. Do you need to handle that here?
Nitpick: would isStaleStore be clearer?
samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala (line 118)
<https://reviews.apache.org/r/52476/#comment222180>
Looks like this is already logged at line 163?
samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala (line 119)
<https://reviews.apache.org/r/52476/#comment222179>
Don't `return` in scala code.
samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala (line 121)
<https://reviews.apache.org/r/52476/#comment222181>
Mention somewhere in the message that this means that the store is stale.
samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala (line 150)
<https://reviews.apache.org/r/52476/#comment222178>
I'd prefer to split this into two methods - existence check and file read. Would be even nicer if fileOffset was updated explicitly (after staleness checks etc.) and not as a side effect of reading the file.
If you don't, let's add return type to method signature.
samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala (line 155)
<https://reviews.apache.org/r/52476/#comment222182>
Unrelated, but let's make this info.
- Prateek Maheshwari
On Oct. 17, 2016, 3:40 p.m., Shanthoosh Venkataraman wrote:
>
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/52476/
> -----------------------------------------------------------
>
> (Updated Oct. 17, 2016, 3:40 p.m.)
>
>
> Review request for samza.
>
>
> Repository: samza
>
>
> Description
> -------
>
> Every local task store is backed up by a kafka changelog topic. Due to log compaction, delete tombstones of the changelog topic have a ttl of delete.retention.ms. Replaying the events from the changelog that has missing delete tombstones, would result in creation of an inconsistent local store(due to the missing of some delete events). This patch deletes the local stores in which difference between current time and last modified time of the offset file is greater than delete.retention.ms during the container startup.
>
>
> Diffs
> -----
>
> samza-core/src/main/java/org/apache/samza/storage/StorageRecovery.java 9329edf7d724f3a0d9235354bb77936f713e3b5f
> samza-core/src/main/scala/org/apache/samza/config/StorageConfig.scala be3f1068f7921c9c8f8697577efea6580672813e
> samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala 05a996c98075ea8ed3767af666b9beeb1933f2a6
> samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala 0b7bcdda1639eea8239a69c31bdf42558e9077d2
> samza-core/src/test/scala/org/apache/samza/storage/TestTaskStorageManager.scala 4d40f520e54beb643acd8410c772b75e2f6a9162
> samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala 973ab8cfb3d248bec7efe5e338f5e667f097556d
>
> Diff: https://reviews.apache.org/r/52476/diff/
>
>
> Testing
> -------
>
> Unit testing and manual testing has been done to verify the functionality.
>
>
> Thanks,
>
> Shanthoosh Venkataraman
>
>
Re: Review Request 52476: Do not load task store which are older than
delete tombstones.
Posted by Shanthoosh Venkataraman <sa...@gmail.com>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/52476/
-----------------------------------------------------------
(Updated Oct. 17, 2016, 10:40 p.m.)
Review request for samza.
Repository: samza
Description
-------
Every local task store is backed up by a kafka changelog topic. Due to log compaction, delete tombstones of the changelog topic have a ttl of delete.retention.ms. Replaying the events from the changelog that has missing delete tombstones, would result in creation of an inconsistent local store(due to the missing of some delete events). This patch deletes the local stores in which difference between current time and last modified time of the offset file is greater than delete.retention.ms during the container startup.
Diffs (updated)
-----
samza-core/src/main/java/org/apache/samza/storage/StorageRecovery.java 9329edf7d724f3a0d9235354bb77936f713e3b5f
samza-core/src/main/scala/org/apache/samza/config/StorageConfig.scala be3f1068f7921c9c8f8697577efea6580672813e
samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala 05a996c98075ea8ed3767af666b9beeb1933f2a6
samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala 0b7bcdda1639eea8239a69c31bdf42558e9077d2
samza-core/src/test/scala/org/apache/samza/storage/TestTaskStorageManager.scala 4d40f520e54beb643acd8410c772b75e2f6a9162
samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala 973ab8cfb3d248bec7efe5e338f5e667f097556d
Diff: https://reviews.apache.org/r/52476/diff/
Testing
-------
Unit testing and manual testing has been done to verify the functionality.
Thanks,
Shanthoosh Venkataraman
Re: Review Request 52476: Do not load task store which are older than
delete tombstones.
Posted by Shanthoosh Venkataraman <sa...@gmail.com>.
> On Oct. 5, 2016, 2:08 a.m., Jake Maes wrote:
> > Looks better, but I think there's still one major part missing.
> >
> > In order to have agreement between a kafka changelog and the task storage, the changelog should be created with the same delete.retention.ms property.
> >
> > There are 2 ways to do this:
> > 1. (preferred) update the kafka system admin to read the samza changelog property that you've defined (which also needs to be added to the config table, btw) and create the topic with that value for delete.retention.ms
> > 2. Rename the property so it's one of the "topic-level-property" so it gets automatically passed to kafka. This is convenient but wouldn't apply to other systems, which could be useful if those other systems have a delete retention policy.
>
> Shanthoosh Venkataraman wrote:
> I think 1) is the only plausible way to accomplish this through job config. delete.retention.ms configuration is associated only with stores changelog, not applicable to topics in general, so making it topic level property might notbe a good idea. Enforcing the delete.retention.ms property is harder to accomplish through config, since kafka is a external system. Ideally, if there's a way to fetch kafka metadata/config(delete.retention.ms) about a topic, during container startups we could fetch that value, rather than expecting the users to specify it.
>
> Jake Maes wrote:
> Please take a look at org.apache.samza.config.KafkaConfig#getChangelogKafkaProperties
Done.
- Shanthoosh
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/52476/#review151439
-----------------------------------------------------------
On Oct. 17, 2016, 10:40 p.m., Shanthoosh Venkataraman wrote:
>
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/52476/
> -----------------------------------------------------------
>
> (Updated Oct. 17, 2016, 10:40 p.m.)
>
>
> Review request for samza.
>
>
> Repository: samza
>
>
> Description
> -------
>
> Every local task store is backed up by a kafka changelog topic. Due to log compaction, delete tombstones of the changelog topic have a ttl of delete.retention.ms. Replaying the events from the changelog that has missing delete tombstones, would result in creation of an inconsistent local store(due to the missing of some delete events). This patch deletes the local stores in which difference between current time and last modified time of the offset file is greater than delete.retention.ms during the container startup.
>
>
> Diffs
> -----
>
> samza-core/src/main/java/org/apache/samza/storage/StorageRecovery.java 9329edf7d724f3a0d9235354bb77936f713e3b5f
> samza-core/src/main/scala/org/apache/samza/config/StorageConfig.scala be3f1068f7921c9c8f8697577efea6580672813e
> samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala 05a996c98075ea8ed3767af666b9beeb1933f2a6
> samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala 0b7bcdda1639eea8239a69c31bdf42558e9077d2
> samza-core/src/test/scala/org/apache/samza/storage/TestTaskStorageManager.scala 4d40f520e54beb643acd8410c772b75e2f6a9162
> samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala 973ab8cfb3d248bec7efe5e338f5e667f097556d
>
> Diff: https://reviews.apache.org/r/52476/diff/
>
>
> Testing
> -------
>
> Unit testing and manual testing has been done to verify the functionality.
>
>
> Thanks,
>
> Shanthoosh Venkataraman
>
>
Re: Review Request 52476: Do not load task store which are older than
delete tombstones.
Posted by Shanthoosh Venkataraman <sa...@gmail.com>.
> On Oct. 5, 2016, 2:08 a.m., Jake Maes wrote:
> > Looks better, but I think there's still one major part missing.
> >
> > In order to have agreement between a kafka changelog and the task storage, the changelog should be created with the same delete.retention.ms property.
> >
> > There are 2 ways to do this:
> > 1. (preferred) update the kafka system admin to read the samza changelog property that you've defined (which also needs to be added to the config table, btw) and create the topic with that value for delete.retention.ms
> > 2. Rename the property so it's one of the "topic-level-property" so it gets automatically passed to kafka. This is convenient but wouldn't apply to other systems, which could be useful if those other systems have a delete retention policy.
I think 1) is the only plausible way to accomplish this through job config. delete.retention.ms configuration is associated only with stores changelog, not applicable to topics in general, so making it topic level property might notbe a good idea. Enforcing the delete.retention.ms property is harder to accomplish through config, since kafka is a external system. Ideally, if there's a way to fetch kafka metadata/config(delete.retention.ms) about a topic, during container startups we could fetch that value, rather than expecting the users to specify it.
- Shanthoosh
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/52476/#review151439
-----------------------------------------------------------
On Oct. 4, 2016, 11:33 p.m., Shanthoosh Venkataraman wrote:
>
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/52476/
> -----------------------------------------------------------
>
> (Updated Oct. 4, 2016, 11:33 p.m.)
>
>
> Review request for samza.
>
>
> Repository: samza
>
>
> Description
> -------
>
> Every local task store is backed up by a kafka changelog topic. Due to log compaction, delete tombstones of the changelog topic have a ttl of delete.retention.ms. Replaying the events from the changelog that has missing delete tombstones, would result in creation of an inconsistent local store(due to the missing of some delete events). This patch deletes the local stores in which difference between current time and last modified time of the offset file is greater than delete.retention.ms during the container startup.
>
>
> Diffs
> -----
>
> samza-core/src/main/java/org/apache/samza/storage/StorageRecovery.java 9329edf7d724f3a0d9235354bb77936f713e3b5f
> samza-core/src/main/scala/org/apache/samza/config/StorageConfig.scala be3f1068f7921c9c8f8697577efea6580672813e
> samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala 05a996c98075ea8ed3767af666b9beeb1933f2a6
> samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala 0b7bcdda1639eea8239a69c31bdf42558e9077d2
> samza-core/src/test/scala/org/apache/samza/storage/TestTaskStorageManager.scala 4d40f520e54beb643acd8410c772b75e2f6a9162
>
> Diff: https://reviews.apache.org/r/52476/diff/
>
>
> Testing
> -------
>
> Unit testing and manual testing has been done to verify the functionality.
>
>
> Thanks,
>
> Shanthoosh Venkataraman
>
>
Re: Review Request 52476: Do not load task store which are older than
delete tombstones.
Posted by Jake Maes <jm...@apache.org>.
> On Oct. 5, 2016, 2:08 a.m., Jake Maes wrote:
> > Looks better, but I think there's still one major part missing.
> >
> > In order to have agreement between a kafka changelog and the task storage, the changelog should be created with the same delete.retention.ms property.
> >
> > There are 2 ways to do this:
> > 1. (preferred) update the kafka system admin to read the samza changelog property that you've defined (which also needs to be added to the config table, btw) and create the topic with that value for delete.retention.ms
> > 2. Rename the property so it's one of the "topic-level-property" so it gets automatically passed to kafka. This is convenient but wouldn't apply to other systems, which could be useful if those other systems have a delete retention policy.
>
> Shanthoosh Venkataraman wrote:
> I think 1) is the only plausible way to accomplish this through job config. delete.retention.ms configuration is associated only with stores changelog, not applicable to topics in general, so making it topic level property might notbe a good idea. Enforcing the delete.retention.ms property is harder to accomplish through config, since kafka is a external system. Ideally, if there's a way to fetch kafka metadata/config(delete.retention.ms) about a topic, during container startups we could fetch that value, rather than expecting the users to specify it.
Please take a look at org.apache.samza.config.KafkaConfig#getChangelogKafkaProperties
- Jake
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/52476/#review151439
-----------------------------------------------------------
On Oct. 4, 2016, 11:33 p.m., Shanthoosh Venkataraman wrote:
>
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/52476/
> -----------------------------------------------------------
>
> (Updated Oct. 4, 2016, 11:33 p.m.)
>
>
> Review request for samza.
>
>
> Repository: samza
>
>
> Description
> -------
>
> Every local task store is backed up by a kafka changelog topic. Due to log compaction, delete tombstones of the changelog topic have a ttl of delete.retention.ms. Replaying the events from the changelog that has missing delete tombstones, would result in creation of an inconsistent local store(due to the missing of some delete events). This patch deletes the local stores in which difference between current time and last modified time of the offset file is greater than delete.retention.ms during the container startup.
>
>
> Diffs
> -----
>
> samza-core/src/main/java/org/apache/samza/storage/StorageRecovery.java 9329edf7d724f3a0d9235354bb77936f713e3b5f
> samza-core/src/main/scala/org/apache/samza/config/StorageConfig.scala be3f1068f7921c9c8f8697577efea6580672813e
> samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala 05a996c98075ea8ed3767af666b9beeb1933f2a6
> samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala 0b7bcdda1639eea8239a69c31bdf42558e9077d2
> samza-core/src/test/scala/org/apache/samza/storage/TestTaskStorageManager.scala 4d40f520e54beb643acd8410c772b75e2f6a9162
>
> Diff: https://reviews.apache.org/r/52476/diff/
>
>
> Testing
> -------
>
> Unit testing and manual testing has been done to verify the functionality.
>
>
> Thanks,
>
> Shanthoosh Venkataraman
>
>
Re: Review Request 52476: Do not load task store which are older than
delete tombstones.
Posted by Jake Maes <jm...@apache.org>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/52476/#review151439
-----------------------------------------------------------
Looks better, but I think there's still one major part missing.
In order to have agreement between a kafka changelog and the task storage, the changelog should be created with the same delete.retention.ms property.
There are 2 ways to do this:
1. (preferred) update the kafka system admin to read the samza changelog property that you've defined (which also needs to be added to the config table, btw) and create the topic with that value for delete.retention.ms
2. Rename the property so it's one of the "topic-level-property" so it gets automatically passed to kafka. This is convenient but wouldn't apply to other systems, which could be useful if those other systems have a delete retention policy.
- Jake Maes
On Oct. 4, 2016, 11:33 p.m., Shanthoosh Venkataraman wrote:
>
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/52476/
> -----------------------------------------------------------
>
> (Updated Oct. 4, 2016, 11:33 p.m.)
>
>
> Review request for samza.
>
>
> Repository: samza
>
>
> Description
> -------
>
> Every local task store is backed up by a kafka changelog topic. Due to log compaction, delete tombstones of the changelog topic have a ttl of delete.retention.ms. Replaying the events from the changelog that has missing delete tombstones, would result in creation of an inconsistent local store(due to the missing of some delete events). This patch deletes the local stores in which difference between current time and last modified time of the offset file is greater than delete.retention.ms during the container startup.
>
>
> Diffs
> -----
>
> samza-core/src/main/java/org/apache/samza/storage/StorageRecovery.java 9329edf7d724f3a0d9235354bb77936f713e3b5f
> samza-core/src/main/scala/org/apache/samza/config/StorageConfig.scala be3f1068f7921c9c8f8697577efea6580672813e
> samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala 05a996c98075ea8ed3767af666b9beeb1933f2a6
> samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala 0b7bcdda1639eea8239a69c31bdf42558e9077d2
> samza-core/src/test/scala/org/apache/samza/storage/TestTaskStorageManager.scala 4d40f520e54beb643acd8410c772b75e2f6a9162
>
> Diff: https://reviews.apache.org/r/52476/diff/
>
>
> Testing
> -------
>
> Unit testing and manual testing has been done to verify the functionality.
>
>
> Thanks,
>
> Shanthoosh Venkataraman
>
>
Re: Review Request 52476: Do not load task store which are older than
delete tombstones.
Posted by Shanthoosh Venkataraman <sa...@gmail.com>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/52476/
-----------------------------------------------------------
(Updated Oct. 4, 2016, 11:33 p.m.)
Review request for samza.
Repository: samza
Description
-------
Every local task store is backed up by a kafka changelog topic. Due to log compaction, delete tombstones of the changelog topic have a ttl of delete.retention.ms. Replaying the events from the changelog that has missing delete tombstones, would result in creation of an inconsistent local store(due to the missing of some delete events). This patch deletes the local stores in which difference between current time and last modified time of the offset file is greater than delete.retention.ms during the container startup.
Diffs (updated)
-----
samza-core/src/main/java/org/apache/samza/storage/StorageRecovery.java 9329edf7d724f3a0d9235354bb77936f713e3b5f
samza-core/src/main/scala/org/apache/samza/config/StorageConfig.scala be3f1068f7921c9c8f8697577efea6580672813e
samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala 05a996c98075ea8ed3767af666b9beeb1933f2a6
samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala 0b7bcdda1639eea8239a69c31bdf42558e9077d2
samza-core/src/test/scala/org/apache/samza/storage/TestTaskStorageManager.scala 4d40f520e54beb643acd8410c772b75e2f6a9162
Diff: https://reviews.apache.org/r/52476/diff/
Testing
-------
Unit testing and manual testing has been done to verify the functionality.
Thanks,
Shanthoosh Venkataraman