You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@samza.apache.org by Jake Maes <ja...@gmail.com> on 2016/06/09 04:46:21 UTC

Review Request 48459: SAMZA-964 Improve the performance of the continuous OFFSET checkpointing for logged stores

-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/48459/
-----------------------------------------------------------

Review request for samza, Boris Shkolnik, Chris Pettitt, Jake Maes, Navina Ramesh, Jagadish Venkatraman, Xinyu Liu, and Yi Pan (Data Infrastructure).


Bugs: SAMZA-964
    https://issues.apache.org/jira/browse/SAMZA-964


Repository: samza


Description
-------

SAMZA-964 Improve the performance of the continuous OFFSET checkpointing for logged stores

1. Cache metadata more aggressively. Only expire metadata if we get Kafka exceptions. This applies for all cases EXCEPT the partition count monitor, which uses the TTL from the StreamMetadataCache
2. Reduce excessive Offset fetching. 
3. Do not allow unbounded exponential backoff for offset checkpointing, just skip the offset file. Exponential backoff can balloon the commit time and stall the event loop. So we will only retry up to 3 times for a max delay of 400ms
4. Add some trace log messages to help track/time KV Store flushes (the other culprit for the slowdown)


Diffs
-----

  samza-api/src/main/java/org/apache/samza/system/ExtendedSystemAdmin.java daa2212cf1d54e90861657fab86b2e780d7e89e2 
  samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamSystemConsumer.java 0a6661c423a09944aa211223cad205958d3b1fee 
  samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala c7b05203a1958a62af9dec04b215d985c4646dc4 
  samza-core/src/main/scala/org/apache/samza/system/StreamMetadataCache.scala 18b47ec3393978e403cadd8754f3fa5fd68654e9 
  samza-core/src/test/scala/org/apache/samza/coordinator/TestJobCoordinator.scala 110c3a910aa0bae77dfe5eebbf82286b56dc4654 
  samza-core/src/test/scala/org/apache/samza/storage/TestTaskStorageManager.scala c8ea64c7c67dd6bf789d2a3445d620ccef1beac0 
  samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala 23aa58dff6b5e282bb634d3913cacd73003402ea 
  samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemAdmin.scala 6c292234dcdd54eaca05f3e1a3fc401e205d6066 
  samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStore.scala f0965aec5f3ec2a214dc40c70832c58273623749 
  samza-kv/src/main/scala/org/apache/samza/storage/kv/CachedStore.scala c28f8db8cb59bd5415e78535877acc1e5bee0f67 
  samza-kv/src/main/scala/org/apache/samza/storage/kv/LoggedStore.scala 7bba6ff37d8266674e7f15c10c7c146f4a41fc91 
  samza-kv/src/main/scala/org/apache/samza/storage/kv/SerializedKeyValueStore.scala 8e183efcdec6fd3f921fc2bfe1971c95715930ed 

Diff: https://reviews.apache.org/r/48459/diff/


Testing
-------

New unit tests. 

./gradlew clean build

Manual testing with 2 test jobs and the big job that had the performance issue.


Thanks,

Jake Maes


Re: Review Request 48459: SAMZA-964 Improve the performance of the continuous OFFSET checkpointing for logged stores

Posted by Navina Ramesh <nr...@linkedin.com>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/48459/#review137624
-----------------------------------------------------------


Ship it!




lgtm! +1

- Navina Ramesh


On June 14, 2016, 11:14 p.m., Jake Maes wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/48459/
> -----------------------------------------------------------
> 
> (Updated June 14, 2016, 11:14 p.m.)
> 
> 
> Review request for samza, Boris Shkolnik, Chris Pettitt, Jake Maes, Navina Ramesh, Jagadish Venkatraman, Xinyu Liu, and Yi Pan (Data Infrastructure).
> 
> 
> Bugs: SAMZA-964
>     https://issues.apache.org/jira/browse/SAMZA-964
> 
> 
> Repository: samza
> 
> 
> Description
> -------
> 
> SAMZA-964 Improve the performance of the continuous OFFSET checkpointing for logged stores
> 
> 1. Cache metadata more aggressively. Only expire metadata if we get Kafka exceptions. This applies for all cases EXCEPT the partition count monitor, which uses the TTL from the StreamMetadataCache
> 2. Reduce excessive Offset fetching. 
> 3. Do not allow unbounded exponential backoff for offset checkpointing, just skip the offset file. Exponential backoff can balloon the commit time and stall the event loop. So we will only retry up to 3 times for a max delay of 400ms
> 4. Add some trace log messages to help track/time KV Store flushes (the other culprit for the slowdown)
> 
> 
> Diffs
> -----
> 
>   samza-api/src/main/java/org/apache/samza/system/ExtendedSystemAdmin.java daa2212cf1d54e90861657fab86b2e780d7e89e2 
>   samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamSystemConsumer.java 0a6661c423a09944aa211223cad205958d3b1fee 
>   samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala c7b05203a1958a62af9dec04b215d985c4646dc4 
>   samza-core/src/main/scala/org/apache/samza/system/StreamMetadataCache.scala 18b47ec3393978e403cadd8754f3fa5fd68654e9 
>   samza-core/src/test/scala/org/apache/samza/coordinator/TestJobCoordinator.scala 110c3a910aa0bae77dfe5eebbf82286b56dc4654 
>   samza-core/src/test/scala/org/apache/samza/storage/TestTaskStorageManager.scala c8ea64c7c67dd6bf789d2a3445d620ccef1beac0 
>   samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala 23aa58dff6b5e282bb634d3913cacd73003402ea 
>   samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemAdmin.scala 6c292234dcdd54eaca05f3e1a3fc401e205d6066 
>   samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStore.scala f0965aec5f3ec2a214dc40c70832c58273623749 
>   samza-kv/src/main/scala/org/apache/samza/storage/kv/CachedStore.scala c28f8db8cb59bd5415e78535877acc1e5bee0f67 
>   samza-kv/src/main/scala/org/apache/samza/storage/kv/LoggedStore.scala 7bba6ff37d8266674e7f15c10c7c146f4a41fc91 
>   samza-kv/src/main/scala/org/apache/samza/storage/kv/SerializedKeyValueStore.scala 8e183efcdec6fd3f921fc2bfe1971c95715930ed 
> 
> Diff: https://reviews.apache.org/r/48459/diff/
> 
> 
> Testing
> -------
> 
> New unit tests. 
> 
> ./gradlew clean build
> bin/check-all.sh on my Mac
> 
> Manual testing with 2 test jobs and the big job that had the performance issue.
> 
> 
> Thanks,
> 
> Jake Maes
> 
>


Re: Review Request 48459: SAMZA-964 Improve the performance of the continuous OFFSET checkpointing for logged stores

Posted by Jake Maes <ja...@gmail.com>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/48459/
-----------------------------------------------------------

(Updated June 14, 2016, 11:14 p.m.)


Review request for samza, Boris Shkolnik, Chris Pettitt, Jake Maes, Navina Ramesh, Jagadish Venkatraman, Xinyu Liu, and Yi Pan (Data Infrastructure).


Bugs: SAMZA-964
    https://issues.apache.org/jira/browse/SAMZA-964


Repository: samza


Description
-------

SAMZA-964 Improve the performance of the continuous OFFSET checkpointing for logged stores

1. Cache metadata more aggressively. Only expire metadata if we get Kafka exceptions. This applies for all cases EXCEPT the partition count monitor, which uses the TTL from the StreamMetadataCache
2. Reduce excessive Offset fetching. 
3. Do not allow unbounded exponential backoff for offset checkpointing, just skip the offset file. Exponential backoff can balloon the commit time and stall the event loop. So we will only retry up to 3 times for a max delay of 400ms
4. Add some trace log messages to help track/time KV Store flushes (the other culprit for the slowdown)


Diffs (updated)
-----

  samza-api/src/main/java/org/apache/samza/system/ExtendedSystemAdmin.java daa2212cf1d54e90861657fab86b2e780d7e89e2 
  samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamSystemConsumer.java 0a6661c423a09944aa211223cad205958d3b1fee 
  samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala c7b05203a1958a62af9dec04b215d985c4646dc4 
  samza-core/src/main/scala/org/apache/samza/system/StreamMetadataCache.scala 18b47ec3393978e403cadd8754f3fa5fd68654e9 
  samza-core/src/test/scala/org/apache/samza/coordinator/TestJobCoordinator.scala 110c3a910aa0bae77dfe5eebbf82286b56dc4654 
  samza-core/src/test/scala/org/apache/samza/storage/TestTaskStorageManager.scala c8ea64c7c67dd6bf789d2a3445d620ccef1beac0 
  samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala 23aa58dff6b5e282bb634d3913cacd73003402ea 
  samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemAdmin.scala 6c292234dcdd54eaca05f3e1a3fc401e205d6066 
  samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStore.scala f0965aec5f3ec2a214dc40c70832c58273623749 
  samza-kv/src/main/scala/org/apache/samza/storage/kv/CachedStore.scala c28f8db8cb59bd5415e78535877acc1e5bee0f67 
  samza-kv/src/main/scala/org/apache/samza/storage/kv/LoggedStore.scala 7bba6ff37d8266674e7f15c10c7c146f4a41fc91 
  samza-kv/src/main/scala/org/apache/samza/storage/kv/SerializedKeyValueStore.scala 8e183efcdec6fd3f921fc2bfe1971c95715930ed 

Diff: https://reviews.apache.org/r/48459/diff/


Testing
-------

New unit tests. 

./gradlew clean build
bin/check-all.sh on my Mac

Manual testing with 2 test jobs and the big job that had the performance issue.


Thanks,

Jake Maes


Re: Review Request 48459: SAMZA-964 Improve the performance of the continuous OFFSET checkpointing for logged stores

Posted by Jake Maes <ja...@gmail.com>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/48459/
-----------------------------------------------------------

(Updated June 14, 2016, 11:13 p.m.)


Review request for samza, Boris Shkolnik, Chris Pettitt, Jake Maes, Navina Ramesh, Jagadish Venkatraman, Xinyu Liu, and Yi Pan (Data Infrastructure).


Bugs: SAMZA-964
    https://issues.apache.org/jira/browse/SAMZA-964


Repository: samza


Description
-------

SAMZA-964 Improve the performance of the continuous OFFSET checkpointing for logged stores

1. Cache metadata more aggressively. Only expire metadata if we get Kafka exceptions. This applies for all cases EXCEPT the partition count monitor, which uses the TTL from the StreamMetadataCache
2. Reduce excessive Offset fetching. 
3. Do not allow unbounded exponential backoff for offset checkpointing, just skip the offset file. Exponential backoff can balloon the commit time and stall the event loop. So we will only retry up to 3 times for a max delay of 400ms
4. Add some trace log messages to help track/time KV Store flushes (the other culprit for the slowdown)


Diffs (updated)
-----

  samza-api/src/main/java/org/apache/samza/system/ExtendedSystemAdmin.java daa2212cf1d54e90861657fab86b2e780d7e89e2 
  samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamSystemConsumer.java 0a6661c423a09944aa211223cad205958d3b1fee 
  samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala c7b05203a1958a62af9dec04b215d985c4646dc4 
  samza-core/src/main/scala/org/apache/samza/system/StreamMetadataCache.scala 18b47ec3393978e403cadd8754f3fa5fd68654e9 
  samza-core/src/test/scala/org/apache/samza/coordinator/TestJobCoordinator.scala 110c3a910aa0bae77dfe5eebbf82286b56dc4654 
  samza-core/src/test/scala/org/apache/samza/storage/TestTaskStorageManager.scala c8ea64c7c67dd6bf789d2a3445d620ccef1beac0 
  samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala 23aa58dff6b5e282bb634d3913cacd73003402ea 
  samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemAdmin.scala 6c292234dcdd54eaca05f3e1a3fc401e205d6066 
  samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStore.scala f0965aec5f3ec2a214dc40c70832c58273623749 
  samza-kv/src/main/scala/org/apache/samza/storage/kv/CachedStore.scala c28f8db8cb59bd5415e78535877acc1e5bee0f67 
  samza-kv/src/main/scala/org/apache/samza/storage/kv/LoggedStore.scala 7bba6ff37d8266674e7f15c10c7c146f4a41fc91 
  samza-kv/src/main/scala/org/apache/samza/storage/kv/SerializedKeyValueStore.scala 8e183efcdec6fd3f921fc2bfe1971c95715930ed 

Diff: https://reviews.apache.org/r/48459/diff/


Testing
-------

New unit tests. 

./gradlew clean build
bin/check-all.sh on my Mac

Manual testing with 2 test jobs and the big job that had the performance issue.


Thanks,

Jake Maes


Re: Review Request 48459: SAMZA-964 Improve the performance of the continuous OFFSET checkpointing for logged stores

Posted by Jake Maes <ja...@gmail.com>.

> On June 14, 2016, 8:11 p.m., Navina Ramesh wrote:
> > samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala, line 230
> > <https://reviews.apache.org/r/48459/diff/3/?file=1416302#file1416302line230>
> >
> >     nit: Doc can be improved by explaining why this is more efficient. That will be more informative. Or remove it altogther.

The javaDoc for the method explains why it is more efficient. I'll elaborate just a little here.


> On June 14, 2016, 8:11 p.m., Navina Ramesh wrote:
> > samza-core/src/main/scala/org/apache/samza/system/StreamMetadataCache.scala, line 67
> > <https://reviews.apache.org/r/48459/diff/3/?file=1416303#file1416303line67>
> >
> >     Seems like we need a more precise interface for accessing stream's metadata cache. 
> >     An interface that will allow specifying list of ssps and the cacheTTL. 
> >     
> >     Note to self: Make changes to systemAdmin interface after the next release :)

Yeah, TTL is uniquely important to the partition count monitor because an infinite metadata cache would prevent it from detecting partition changes. Everywhere else, a longer cache is better. 

Your note to self is reflected at the top of the file. :-)


> On June 14, 2016, 8:11 p.m., Navina Ramesh wrote:
> > samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala, line 281
> > <https://reviews.apache.org/r/48459/diff/3/?file=1416306#file1416306line281>
> >
> >     nit: more accurate documentation here.  You can move it out of the function definition

I actually didn't intend to leave this comment in. The older version had no retry, but this version retries in order to start with an infinite cache TTL and then shorten it if there's a failure.


> On June 14, 2016, 8:11 p.m., Navina Ramesh wrote:
> > samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala, line 284
> > <https://reviews.apache.org/r/48459/diff/3/?file=1416306#file1416306line284>
> >
> >     I don't like what we are doing here - setting the ttl to be max value. However, I don't see a better way of doing it. :(

There's no reason to fetch metadata if everything is working, so initially we want to use the cache no matter what. If there's a problem, we should refresh. 

To accomplish this, I considered 2 ways:
1. Update the metadata cache to explicitly expire problematic entries. This would be a much more invasive change and potentially less performant. 
2. Lower the cache TTL if there is a problem so the cache refreshes itself. 

Both rely on a large or infinite initial TTL.


> On June 14, 2016, 8:11 p.m., Navina Ramesh wrote:
> > samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala, line 324
> > <https://reviews.apache.org/r/48459/diff/3/?file=1416306#file1416306line324>
> >
> >     I strongly feel the need to make retries as a part of the ExponentialSleepStrategy. It will certainly be used in many places. Considering the urgency of this fix and the fact that the runLoop is in Scale, we can table it for another day.

There is a separate, unrelated patch to enforce max retries on the strategy, but even that wouldn't change the logic here because we want to control the TTL and print meaningful log messages based on the remaining number of retries.


> On June 14, 2016, 8:11 p.m., Navina Ramesh wrote:
> > samza-kv/src/main/scala/org/apache/samza/storage/kv/CachedStore.scala, line 192
> > <https://reviews.apache.org/r/48459/diff/3/?file=1416309#file1416309line192>
> >
> >     nit: Do we really need these trace lines? If we do, can you change it to something like: "Begin flush to store" and "Flush to store complete"

They could be submitted in a separate patch, but they are useful trace messages. 
The original message only tells us that the flush method was called but doesn't help to trace the calls in the various KV Store wrappers. These new messages can also be used to time the operation. 

I don't see any value to making the messages more verbose. They currently read like this:
2016-06-14 21:31:19.171 [main] [SerializedKeyValueStore] [TRACE] Flushing store.
2016-06-14 21:31:19.171 [main] [LoggedStore] [TRACE] Flushing store.
2016-06-14 21:31:19.171 [main] [RocksDbKeyValueStore] [TRACE] Flushing store: pending-requests.
2016-06-14 21:31:19.172 [main] [RocksDbKeyValueStore] [TRACE] Flushed store: pending-requests.
2016-06-14 21:31:19.172 [main] [LoggedStore] [TRACE] Flushed store.
2016-06-14 21:31:19.172 [main] [SerializedKeyValueStore] [TRACE] Flushed store.


> On June 14, 2016, 8:11 p.m., Navina Ramesh wrote:
> > samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala, line 276
> > <https://reviews.apache.org/r/48459/diff/3/?file=1416306#file1416306line276>
> >
> >     Documentation is not clear. It says "It does not retry if there is a failure".. But you are retrying upto "maxretries".
> >     
> >     It will be useful to define what is considered a failure, for example, what exceptions are ok for retry and which ones aren't. If there isn't any clear distinction between the exceptions, you can mention that as well

Typo. I meant to say "It does not retry *indefinitely* if there is a failure.


> On June 14, 2016, 8:11 p.m., Navina Ramesh wrote:
> > samza-api/src/main/java/org/apache/samza/system/ExtendedSystemAdmin.java, line 30
> > <https://reviews.apache.org/r/48459/diff/3/?file=1416300#file1416300line30>
> >
> >     Why do we need 2 methods for getSystemStreamPartitionCounts? I think you can change the method signature since this interface was introduced in this current version. Was that the reason you added a new method? 
> >     
> >     I could be missing something very obvious here :D
> >     Seems like "getSystemStreamPartitionCounts(Set<String> streamNames)" is used only in a Mock implementation

I didn't realize the old method was added in this release, so I was keeping it for backward compatibility. Removed it.

Thanks for noticing.


- Jake


-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/48459/#review137418
-----------------------------------------------------------


On June 13, 2016, 12:22 a.m., Jake Maes wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/48459/
> -----------------------------------------------------------
> 
> (Updated June 13, 2016, 12:22 a.m.)
> 
> 
> Review request for samza, Boris Shkolnik, Chris Pettitt, Jake Maes, Navina Ramesh, Jagadish Venkatraman, Xinyu Liu, and Yi Pan (Data Infrastructure).
> 
> 
> Bugs: SAMZA-964
>     https://issues.apache.org/jira/browse/SAMZA-964
> 
> 
> Repository: samza
> 
> 
> Description
> -------
> 
> SAMZA-964 Improve the performance of the continuous OFFSET checkpointing for logged stores
> 
> 1. Cache metadata more aggressively. Only expire metadata if we get Kafka exceptions. This applies for all cases EXCEPT the partition count monitor, which uses the TTL from the StreamMetadataCache
> 2. Reduce excessive Offset fetching. 
> 3. Do not allow unbounded exponential backoff for offset checkpointing, just skip the offset file. Exponential backoff can balloon the commit time and stall the event loop. So we will only retry up to 3 times for a max delay of 400ms
> 4. Add some trace log messages to help track/time KV Store flushes (the other culprit for the slowdown)
> 
> 
> Diffs
> -----
> 
>   samza-api/src/main/java/org/apache/samza/system/ExtendedSystemAdmin.java daa2212cf1d54e90861657fab86b2e780d7e89e2 
>   samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamSystemConsumer.java 0a6661c423a09944aa211223cad205958d3b1fee 
>   samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala c7b05203a1958a62af9dec04b215d985c4646dc4 
>   samza-core/src/main/scala/org/apache/samza/system/StreamMetadataCache.scala 18b47ec3393978e403cadd8754f3fa5fd68654e9 
>   samza-core/src/test/scala/org/apache/samza/coordinator/TestJobCoordinator.scala 110c3a910aa0bae77dfe5eebbf82286b56dc4654 
>   samza-core/src/test/scala/org/apache/samza/storage/TestTaskStorageManager.scala c8ea64c7c67dd6bf789d2a3445d620ccef1beac0 
>   samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala 23aa58dff6b5e282bb634d3913cacd73003402ea 
>   samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemAdmin.scala 6c292234dcdd54eaca05f3e1a3fc401e205d6066 
>   samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStore.scala f0965aec5f3ec2a214dc40c70832c58273623749 
>   samza-kv/src/main/scala/org/apache/samza/storage/kv/CachedStore.scala c28f8db8cb59bd5415e78535877acc1e5bee0f67 
>   samza-kv/src/main/scala/org/apache/samza/storage/kv/LoggedStore.scala 7bba6ff37d8266674e7f15c10c7c146f4a41fc91 
>   samza-kv/src/main/scala/org/apache/samza/storage/kv/SerializedKeyValueStore.scala 8e183efcdec6fd3f921fc2bfe1971c95715930ed 
> 
> Diff: https://reviews.apache.org/r/48459/diff/
> 
> 
> Testing
> -------
> 
> New unit tests. 
> 
> ./gradlew clean build
> bin/check-all.sh on my Mac
> 
> Manual testing with 2 test jobs and the big job that had the performance issue.
> 
> 
> Thanks,
> 
> Jake Maes
> 
>


Re: Review Request 48459: SAMZA-964 Improve the performance of the continuous OFFSET checkpointing for logged stores

Posted by Navina Ramesh <nr...@linkedin.com>.

> On June 14, 2016, 8:11 p.m., Navina Ramesh wrote:
> > samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala, line 276
> > <https://reviews.apache.org/r/48459/diff/3/?file=1416306#file1416306line276>
> >
> >     Documentation is not clear. It says "It does not retry if there is a failure".. But you are retrying upto "maxretries".
> >     
> >     It will be useful to define what is considered a failure, for example, what exceptions are ok for retry and which ones aren't. If there isn't any clear distinction between the exceptions, you can mention that as well
> 
> Jake Maes wrote:
>     Typo. I meant to say "It does not retry *indefinitely* if there is a failure.

Got it. Thanks for fixing it!


> On June 14, 2016, 8:11 p.m., Navina Ramesh wrote:
> > samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala, line 284
> > <https://reviews.apache.org/r/48459/diff/3/?file=1416306#file1416306line284>
> >
> >     I don't like what we are doing here - setting the ttl to be max value. However, I don't see a better way of doing it. :(
> 
> Jake Maes wrote:
>     There's no reason to fetch metadata if everything is working, so initially we want to use the cache no matter what. If there's a problem, we should refresh. 
>     
>     To accomplish this, I considered 2 ways:
>     1. Update the metadata cache to explicitly expire problematic entries. This would be a much more invasive change and potentially less performant. 
>     2. Lower the cache TTL if there is a problem so the cache refreshes itself. 
>     
>     Both rely on a large or infinite initial TTL.

Got it


> On June 14, 2016, 8:11 p.m., Navina Ramesh wrote:
> > samza-kv/src/main/scala/org/apache/samza/storage/kv/CachedStore.scala, line 192
> > <https://reviews.apache.org/r/48459/diff/3/?file=1416309#file1416309line192>
> >
> >     nit: Do we really need these trace lines? If we do, can you change it to something like: "Begin flush to store" and "Flush to store complete"
> 
> Jake Maes wrote:
>     They could be submitted in a separate patch, but they are useful trace messages. 
>     The original message only tells us that the flush method was called but doesn't help to trace the calls in the various KV Store wrappers. These new messages can also be used to time the operation. 
>     
>     I don't see any value to making the messages more verbose. They currently read like this:
>     2016-06-14 21:31:19.171 [main] [SerializedKeyValueStore] [TRACE] Flushing store.
>     2016-06-14 21:31:19.171 [main] [LoggedStore] [TRACE] Flushing store.
>     2016-06-14 21:31:19.171 [main] [RocksDbKeyValueStore] [TRACE] Flushing store: pending-requests.
>     2016-06-14 21:31:19.172 [main] [RocksDbKeyValueStore] [TRACE] Flushed store: pending-requests.
>     2016-06-14 21:31:19.172 [main] [LoggedStore] [TRACE] Flushed store.
>     2016-06-14 21:31:19.172 [main] [SerializedKeyValueStore] [TRACE] Flushed store.

oh cool. I didn't realize this. Thanks for pointing it out.


- Navina


-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/48459/#review137418
-----------------------------------------------------------


On June 14, 2016, 11:14 p.m., Jake Maes wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/48459/
> -----------------------------------------------------------
> 
> (Updated June 14, 2016, 11:14 p.m.)
> 
> 
> Review request for samza, Boris Shkolnik, Chris Pettitt, Jake Maes, Navina Ramesh, Jagadish Venkatraman, Xinyu Liu, and Yi Pan (Data Infrastructure).
> 
> 
> Bugs: SAMZA-964
>     https://issues.apache.org/jira/browse/SAMZA-964
> 
> 
> Repository: samza
> 
> 
> Description
> -------
> 
> SAMZA-964 Improve the performance of the continuous OFFSET checkpointing for logged stores
> 
> 1. Cache metadata more aggressively. Only expire metadata if we get Kafka exceptions. This applies for all cases EXCEPT the partition count monitor, which uses the TTL from the StreamMetadataCache
> 2. Reduce excessive Offset fetching. 
> 3. Do not allow unbounded exponential backoff for offset checkpointing, just skip the offset file. Exponential backoff can balloon the commit time and stall the event loop. So we will only retry up to 3 times for a max delay of 400ms
> 4. Add some trace log messages to help track/time KV Store flushes (the other culprit for the slowdown)
> 
> 
> Diffs
> -----
> 
>   samza-api/src/main/java/org/apache/samza/system/ExtendedSystemAdmin.java daa2212cf1d54e90861657fab86b2e780d7e89e2 
>   samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamSystemConsumer.java 0a6661c423a09944aa211223cad205958d3b1fee 
>   samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala c7b05203a1958a62af9dec04b215d985c4646dc4 
>   samza-core/src/main/scala/org/apache/samza/system/StreamMetadataCache.scala 18b47ec3393978e403cadd8754f3fa5fd68654e9 
>   samza-core/src/test/scala/org/apache/samza/coordinator/TestJobCoordinator.scala 110c3a910aa0bae77dfe5eebbf82286b56dc4654 
>   samza-core/src/test/scala/org/apache/samza/storage/TestTaskStorageManager.scala c8ea64c7c67dd6bf789d2a3445d620ccef1beac0 
>   samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala 23aa58dff6b5e282bb634d3913cacd73003402ea 
>   samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemAdmin.scala 6c292234dcdd54eaca05f3e1a3fc401e205d6066 
>   samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStore.scala f0965aec5f3ec2a214dc40c70832c58273623749 
>   samza-kv/src/main/scala/org/apache/samza/storage/kv/CachedStore.scala c28f8db8cb59bd5415e78535877acc1e5bee0f67 
>   samza-kv/src/main/scala/org/apache/samza/storage/kv/LoggedStore.scala 7bba6ff37d8266674e7f15c10c7c146f4a41fc91 
>   samza-kv/src/main/scala/org/apache/samza/storage/kv/SerializedKeyValueStore.scala 8e183efcdec6fd3f921fc2bfe1971c95715930ed 
> 
> Diff: https://reviews.apache.org/r/48459/diff/
> 
> 
> Testing
> -------
> 
> New unit tests. 
> 
> ./gradlew clean build
> bin/check-all.sh on my Mac
> 
> Manual testing with 2 test jobs and the big job that had the performance issue.
> 
> 
> Thanks,
> 
> Jake Maes
> 
>


Re: Review Request 48459: SAMZA-964 Improve the performance of the continuous OFFSET checkpointing for logged stores

Posted by Navina Ramesh <nr...@linkedin.com>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/48459/#review137418
-----------------------------------------------------------




samza-api/src/main/java/org/apache/samza/system/ExtendedSystemAdmin.java (line 30)
<https://reviews.apache.org/r/48459/#comment202551>

    Why do we need 2 methods for getSystemStreamPartitionCounts? I think you can change the method signature since this interface was introduced in this current version. Was that the reason you added a new method? 
    
    I could be missing something very obvious here :D
    Seems like "getSystemStreamPartitionCounts(Set<String> streamNames)" is used only in a Mock implementation



samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala (line 230)
<https://reviews.apache.org/r/48459/#comment202715>

    nit: Doc can be improved by explaining why this is more efficient. That will be more informative. Or remove it altogther.



samza-core/src/main/scala/org/apache/samza/system/StreamMetadataCache.scala (line 67)
<https://reviews.apache.org/r/48459/#comment202719>

    Seems like we need a more precise interface for accessing stream's metadata cache. 
    An interface that will allow specifying list of ssps and the cacheTTL. 
    
    Note to self: Make changes to systemAdmin interface after the next release :)



samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala (line 274)
<https://reviews.apache.org/r/48459/#comment202713>

    Documentation is not clear. It says "It does not retry if there is a failure".. But you are retrying upto "maxretries".
    
    It will be useful to define what is considered a failure, for example, what exceptions are ok for retry and which ones aren't. If there isn't any clear distinction between the exceptions, you can mention that as well



samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala (line 279)
<https://reviews.apache.org/r/48459/#comment202714>

    nit: more accurate documentation here.  You can move it out of the function definition



samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala (line 282)
<https://reviews.apache.org/r/48459/#comment202721>

    I don't like what we are doing here - setting the ttl to be max value. However, I don't see a better way of doing it. :(



samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala (line 322)
<https://reviews.apache.org/r/48459/#comment202722>

    I strongly feel the need to make retries as a part of the ExponentialSleepStrategy. It will certainly be used in many places. Considering the urgency of this fix and the fact that the runLoop is in Scale, we can table it for another day.



samza-kv/src/main/scala/org/apache/samza/storage/kv/CachedStore.scala (line 191)
<https://reviews.apache.org/r/48459/#comment202723>

    nit: Do we really need these trace lines? If we do, can you change it to something like: "Begin flush to store" and "Flush to store complete"



samza-kv/src/main/scala/org/apache/samza/storage/kv/LoggedStore.scala (line 103)
<https://reviews.apache.org/r/48459/#comment202724>

    Same comment as above


- Navina Ramesh


On June 13, 2016, 12:22 a.m., Jake Maes wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/48459/
> -----------------------------------------------------------
> 
> (Updated June 13, 2016, 12:22 a.m.)
> 
> 
> Review request for samza, Boris Shkolnik, Chris Pettitt, Jake Maes, Navina Ramesh, Jagadish Venkatraman, Xinyu Liu, and Yi Pan (Data Infrastructure).
> 
> 
> Bugs: SAMZA-964
>     https://issues.apache.org/jira/browse/SAMZA-964
> 
> 
> Repository: samza
> 
> 
> Description
> -------
> 
> SAMZA-964 Improve the performance of the continuous OFFSET checkpointing for logged stores
> 
> 1. Cache metadata more aggressively. Only expire metadata if we get Kafka exceptions. This applies for all cases EXCEPT the partition count monitor, which uses the TTL from the StreamMetadataCache
> 2. Reduce excessive Offset fetching. 
> 3. Do not allow unbounded exponential backoff for offset checkpointing, just skip the offset file. Exponential backoff can balloon the commit time and stall the event loop. So we will only retry up to 3 times for a max delay of 400ms
> 4. Add some trace log messages to help track/time KV Store flushes (the other culprit for the slowdown)
> 
> 
> Diffs
> -----
> 
>   samza-api/src/main/java/org/apache/samza/system/ExtendedSystemAdmin.java daa2212cf1d54e90861657fab86b2e780d7e89e2 
>   samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamSystemConsumer.java 0a6661c423a09944aa211223cad205958d3b1fee 
>   samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala c7b05203a1958a62af9dec04b215d985c4646dc4 
>   samza-core/src/main/scala/org/apache/samza/system/StreamMetadataCache.scala 18b47ec3393978e403cadd8754f3fa5fd68654e9 
>   samza-core/src/test/scala/org/apache/samza/coordinator/TestJobCoordinator.scala 110c3a910aa0bae77dfe5eebbf82286b56dc4654 
>   samza-core/src/test/scala/org/apache/samza/storage/TestTaskStorageManager.scala c8ea64c7c67dd6bf789d2a3445d620ccef1beac0 
>   samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala 23aa58dff6b5e282bb634d3913cacd73003402ea 
>   samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemAdmin.scala 6c292234dcdd54eaca05f3e1a3fc401e205d6066 
>   samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStore.scala f0965aec5f3ec2a214dc40c70832c58273623749 
>   samza-kv/src/main/scala/org/apache/samza/storage/kv/CachedStore.scala c28f8db8cb59bd5415e78535877acc1e5bee0f67 
>   samza-kv/src/main/scala/org/apache/samza/storage/kv/LoggedStore.scala 7bba6ff37d8266674e7f15c10c7c146f4a41fc91 
>   samza-kv/src/main/scala/org/apache/samza/storage/kv/SerializedKeyValueStore.scala 8e183efcdec6fd3f921fc2bfe1971c95715930ed 
> 
> Diff: https://reviews.apache.org/r/48459/diff/
> 
> 
> Testing
> -------
> 
> New unit tests. 
> 
> ./gradlew clean build
> bin/check-all.sh on my Mac
> 
> Manual testing with 2 test jobs and the big job that had the performance issue.
> 
> 
> Thanks,
> 
> Jake Maes
> 
>


Re: Review Request 48459: SAMZA-964 Improve the performance of the continuous OFFSET checkpointing for logged stores

Posted by Jake Maes <ja...@gmail.com>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/48459/
-----------------------------------------------------------

(Updated June 13, 2016, 12:22 a.m.)


Review request for samza, Boris Shkolnik, Chris Pettitt, Jake Maes, Navina Ramesh, Jagadish Venkatraman, Xinyu Liu, and Yi Pan (Data Infrastructure).


Bugs: SAMZA-964
    https://issues.apache.org/jira/browse/SAMZA-964


Repository: samza


Description
-------

SAMZA-964 Improve the performance of the continuous OFFSET checkpointing for logged stores

1. Cache metadata more aggressively. Only expire metadata if we get Kafka exceptions. This applies for all cases EXCEPT the partition count monitor, which uses the TTL from the StreamMetadataCache
2. Reduce excessive Offset fetching. 
3. Do not allow unbounded exponential backoff for offset checkpointing, just skip the offset file. Exponential backoff can balloon the commit time and stall the event loop. So we will only retry up to 3 times for a max delay of 400ms
4. Add some trace log messages to help track/time KV Store flushes (the other culprit for the slowdown)


Diffs
-----

  samza-api/src/main/java/org/apache/samza/system/ExtendedSystemAdmin.java daa2212cf1d54e90861657fab86b2e780d7e89e2 
  samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamSystemConsumer.java 0a6661c423a09944aa211223cad205958d3b1fee 
  samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala c7b05203a1958a62af9dec04b215d985c4646dc4 
  samza-core/src/main/scala/org/apache/samza/system/StreamMetadataCache.scala 18b47ec3393978e403cadd8754f3fa5fd68654e9 
  samza-core/src/test/scala/org/apache/samza/coordinator/TestJobCoordinator.scala 110c3a910aa0bae77dfe5eebbf82286b56dc4654 
  samza-core/src/test/scala/org/apache/samza/storage/TestTaskStorageManager.scala c8ea64c7c67dd6bf789d2a3445d620ccef1beac0 
  samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala 23aa58dff6b5e282bb634d3913cacd73003402ea 
  samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemAdmin.scala 6c292234dcdd54eaca05f3e1a3fc401e205d6066 
  samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStore.scala f0965aec5f3ec2a214dc40c70832c58273623749 
  samza-kv/src/main/scala/org/apache/samza/storage/kv/CachedStore.scala c28f8db8cb59bd5415e78535877acc1e5bee0f67 
  samza-kv/src/main/scala/org/apache/samza/storage/kv/LoggedStore.scala 7bba6ff37d8266674e7f15c10c7c146f4a41fc91 
  samza-kv/src/main/scala/org/apache/samza/storage/kv/SerializedKeyValueStore.scala 8e183efcdec6fd3f921fc2bfe1971c95715930ed 

Diff: https://reviews.apache.org/r/48459/diff/


Testing (updated)
-------

New unit tests. 

./gradlew clean build
bin/check-all.sh on my Mac

Manual testing with 2 test jobs and the big job that had the performance issue.


Thanks,

Jake Maes


Re: Review Request 48459: SAMZA-964 Improve the performance of the continuous OFFSET checkpointing for logged stores

Posted by Jake Maes <ja...@gmail.com>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/48459/
-----------------------------------------------------------

(Updated June 12, 2016, 11:54 p.m.)


Review request for samza, Boris Shkolnik, Chris Pettitt, Jake Maes, Navina Ramesh, Jagadish Venkatraman, Xinyu Liu, and Yi Pan (Data Infrastructure).


Bugs: SAMZA-964
    https://issues.apache.org/jira/browse/SAMZA-964


Repository: samza


Description
-------

SAMZA-964 Improve the performance of the continuous OFFSET checkpointing for logged stores

1. Cache metadata more aggressively. Only expire metadata if we get Kafka exceptions. This applies for all cases EXCEPT the partition count monitor, which uses the TTL from the StreamMetadataCache
2. Reduce excessive Offset fetching. 
3. Do not allow unbounded exponential backoff for offset checkpointing, just skip the offset file. Exponential backoff can balloon the commit time and stall the event loop. So we will only retry up to 3 times for a max delay of 400ms
4. Add some trace log messages to help track/time KV Store flushes (the other culprit for the slowdown)


Diffs (updated)
-----

  samza-api/src/main/java/org/apache/samza/system/ExtendedSystemAdmin.java daa2212cf1d54e90861657fab86b2e780d7e89e2 
  samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamSystemConsumer.java 0a6661c423a09944aa211223cad205958d3b1fee 
  samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala c7b05203a1958a62af9dec04b215d985c4646dc4 
  samza-core/src/main/scala/org/apache/samza/system/StreamMetadataCache.scala 18b47ec3393978e403cadd8754f3fa5fd68654e9 
  samza-core/src/test/scala/org/apache/samza/coordinator/TestJobCoordinator.scala 110c3a910aa0bae77dfe5eebbf82286b56dc4654 
  samza-core/src/test/scala/org/apache/samza/storage/TestTaskStorageManager.scala c8ea64c7c67dd6bf789d2a3445d620ccef1beac0 
  samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala 23aa58dff6b5e282bb634d3913cacd73003402ea 
  samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemAdmin.scala 6c292234dcdd54eaca05f3e1a3fc401e205d6066 
  samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStore.scala f0965aec5f3ec2a214dc40c70832c58273623749 
  samza-kv/src/main/scala/org/apache/samza/storage/kv/CachedStore.scala c28f8db8cb59bd5415e78535877acc1e5bee0f67 
  samza-kv/src/main/scala/org/apache/samza/storage/kv/LoggedStore.scala 7bba6ff37d8266674e7f15c10c7c146f4a41fc91 
  samza-kv/src/main/scala/org/apache/samza/storage/kv/SerializedKeyValueStore.scala 8e183efcdec6fd3f921fc2bfe1971c95715930ed 

Diff: https://reviews.apache.org/r/48459/diff/


Testing
-------

New unit tests. 

./gradlew clean build

Manual testing with 2 test jobs and the big job that had the performance issue.


Thanks,

Jake Maes


Re: Review Request 48459: SAMZA-964 Improve the performance of the continuous OFFSET checkpointing for logged stores

Posted by Jake Maes <ja...@gmail.com>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/48459/
-----------------------------------------------------------

(Updated June 9, 2016, 4:48 a.m.)


Review request for samza, Boris Shkolnik, Chris Pettitt, Jake Maes, Navina Ramesh, Jagadish Venkatraman, Xinyu Liu, and Yi Pan (Data Infrastructure).


Bugs: SAMZA-964
    https://issues.apache.org/jira/browse/SAMZA-964


Repository: samza


Description
-------

SAMZA-964 Improve the performance of the continuous OFFSET checkpointing for logged stores

1. Cache metadata more aggressively. Only expire metadata if we get Kafka exceptions. This applies for all cases EXCEPT the partition count monitor, which uses the TTL from the StreamMetadataCache
2. Reduce excessive Offset fetching. 
3. Do not allow unbounded exponential backoff for offset checkpointing, just skip the offset file. Exponential backoff can balloon the commit time and stall the event loop. So we will only retry up to 3 times for a max delay of 400ms
4. Add some trace log messages to help track/time KV Store flushes (the other culprit for the slowdown)


Diffs (updated)
-----

  samza-api/src/main/java/org/apache/samza/system/ExtendedSystemAdmin.java daa2212cf1d54e90861657fab86b2e780d7e89e2 
  samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamSystemConsumer.java 0a6661c423a09944aa211223cad205958d3b1fee 
  samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala c7b05203a1958a62af9dec04b215d985c4646dc4 
  samza-core/src/main/scala/org/apache/samza/system/StreamMetadataCache.scala 18b47ec3393978e403cadd8754f3fa5fd68654e9 
  samza-core/src/test/scala/org/apache/samza/coordinator/TestJobCoordinator.scala 110c3a910aa0bae77dfe5eebbf82286b56dc4654 
  samza-core/src/test/scala/org/apache/samza/storage/TestTaskStorageManager.scala c8ea64c7c67dd6bf789d2a3445d620ccef1beac0 
  samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala 23aa58dff6b5e282bb634d3913cacd73003402ea 
  samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemAdmin.scala 6c292234dcdd54eaca05f3e1a3fc401e205d6066 
  samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStore.scala f0965aec5f3ec2a214dc40c70832c58273623749 
  samza-kv/src/main/scala/org/apache/samza/storage/kv/CachedStore.scala c28f8db8cb59bd5415e78535877acc1e5bee0f67 
  samza-kv/src/main/scala/org/apache/samza/storage/kv/LoggedStore.scala 7bba6ff37d8266674e7f15c10c7c146f4a41fc91 
  samza-kv/src/main/scala/org/apache/samza/storage/kv/SerializedKeyValueStore.scala 8e183efcdec6fd3f921fc2bfe1971c95715930ed 

Diff: https://reviews.apache.org/r/48459/diff/


Testing
-------

New unit tests. 

./gradlew clean build

Manual testing with 2 test jobs and the big job that had the performance issue.


Thanks,

Jake Maes