You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by GitBox <gi...@apache.org> on 2022/03/04 23:31:06 UTC

[GitHub] [samza] rmatharu commented on a change in pull request #1585: SAMZA-2719: [Elasticity] fix container level metrics when elasticity is enabled

rmatharu commented on a change in pull request #1585:
URL: https://github.com/apache/samza/pull/1585#discussion_r819982317



##########
File path: samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala
##########
@@ -112,13 +114,22 @@ class SystemConsumers (
    * Clock can be used to inject a custom clock when mocking this class in
    * tests. The default implementation returns the current system clock time.
    */
-  val clock: () => Long = () => System.nanoTime()) extends Logging with TimerUtil {
+  val clock: () => Long = () => System.nanoTime(),
+
+  val elasticityFactor: Int = 1) extends Logging with TimerUtil {
 
   /**
    * Mapping from the {@see SystemStreamPartition} to the registered offsets.
    */
   private val sspToRegisteredOffsets = new HashMap[SystemStreamPartition, String]()
 
+  /**
+   * Set of all the SystemStreamPartitions registered with this SystemConsumers
+   * With elasticity enabled, the SystemStreamPartitions are actually key buckets within a full SSP
+   * Without elasticity, there are no key buckets and hence is the full SSP

Review comment:
       Nit: 
   With elasticity-enabled, the SSPs have valid (i.e. >=0) keyBuckets, with 
   elasticity disabled keyBuckets on all SSPs are = -1.

##########
File path: samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala
##########
@@ -112,13 +114,22 @@ class SystemConsumers (
    * Clock can be used to inject a custom clock when mocking this class in
    * tests. The default implementation returns the current system clock time.
    */
-  val clock: () => Long = () => System.nanoTime()) extends Logging with TimerUtil {
+  val clock: () => Long = () => System.nanoTime(),
+
+  val elasticityFactor: Int = 1) extends Logging with TimerUtil {
 
   /**
    * Mapping from the {@see SystemStreamPartition} to the registered offsets.
    */
   private val sspToRegisteredOffsets = new HashMap[SystemStreamPartition, String]()
 
+  /**
+   * Set of all the SystemStreamPartitions registered with this SystemConsumers
+   * With elasticity enabled, the SystemStreamPartitions are actually key buckets within a full SSP
+   * Without elasticity, there are no key buckets and hence is the full SSP
+   */
+  private val sspKeyBucketsRegistered = new HashSet[SystemStreamPartition] ()
+

Review comment:
       yea looks like we really need that simplifiction.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@samza.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org