You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by GitBox <gi...@apache.org> on 2022/02/22 18:50:37 UTC

[GitHub] [samza] lakshmi-manasa-g opened a new pull request #1585: SAMZA-2719: [Elasticity] fix container level metrics when elasticity is enabled

lakshmi-manasa-g opened a new pull request #1585:
URL: https://github.com/apache/samza/pull/1585


   Symptom: Certain container level metrics have incorrect values when elasticity is enabled. Metrics affected: chose-object, num-envelopes, process-calls, messages-chosen.
   
   Cause: With elasticity enabled, a container may process only some of the messages of an SSP and not all of the messages in the SSP. the messages processed belong to the key buckets within the container's model. However, pre-elasticity container level metrics counted the messages per ssp. 
   
   Changes: Adapt SystemConsumers and RunLoop to account for key buckets. SystemConsumers keeps track of all the SSP(with key buckets) registered with it by the RunLoop and increments only the metrics only if the message belongs to this SSP set.
   
   Tests: added unit tests to verify correct values for metrics
   
   API changes: None
   
   Usage/Upgrade instructions: None


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@samza.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] rmatharu commented on a change in pull request #1585: SAMZA-2719: [Elasticity] fix container level metrics when elasticity is enabled

Posted by GitBox <gi...@apache.org>.
rmatharu commented on a change in pull request #1585:
URL: https://github.com/apache/samza/pull/1585#discussion_r819982317



##########
File path: samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala
##########
@@ -112,13 +114,22 @@ class SystemConsumers (
    * Clock can be used to inject a custom clock when mocking this class in
    * tests. The default implementation returns the current system clock time.
    */
-  val clock: () => Long = () => System.nanoTime()) extends Logging with TimerUtil {
+  val clock: () => Long = () => System.nanoTime(),
+
+  val elasticityFactor: Int = 1) extends Logging with TimerUtil {
 
   /**
    * Mapping from the {@see SystemStreamPartition} to the registered offsets.
    */
   private val sspToRegisteredOffsets = new HashMap[SystemStreamPartition, String]()
 
+  /**
+   * Set of all the SystemStreamPartitions registered with this SystemConsumers
+   * With elasticity enabled, the SystemStreamPartitions are actually key buckets within a full SSP
+   * Without elasticity, there are no key buckets and hence is the full SSP

Review comment:
       Nit: 
   With elasticity-enabled, the SSPs have valid (i.e. >=0) keyBuckets, with 
   elasticity disabled keyBuckets on all SSPs are = -1.

##########
File path: samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala
##########
@@ -112,13 +114,22 @@ class SystemConsumers (
    * Clock can be used to inject a custom clock when mocking this class in
    * tests. The default implementation returns the current system clock time.
    */
-  val clock: () => Long = () => System.nanoTime()) extends Logging with TimerUtil {
+  val clock: () => Long = () => System.nanoTime(),
+
+  val elasticityFactor: Int = 1) extends Logging with TimerUtil {
 
   /**
    * Mapping from the {@see SystemStreamPartition} to the registered offsets.
    */
   private val sspToRegisteredOffsets = new HashMap[SystemStreamPartition, String]()
 
+  /**
+   * Set of all the SystemStreamPartitions registered with this SystemConsumers
+   * With elasticity enabled, the SystemStreamPartitions are actually key buckets within a full SSP
+   * Without elasticity, there are no key buckets and hence is the full SSP
+   */
+  private val sspKeyBucketsRegistered = new HashSet[SystemStreamPartition] ()
+

Review comment:
       yea looks like we really need that simplifiction.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@samza.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] lakshmi-manasa-g commented on a change in pull request #1585: SAMZA-2719: [Elasticity] fix container level metrics when elasticity is enabled

Posted by GitBox <gi...@apache.org>.
lakshmi-manasa-g commented on a change in pull request #1585:
URL: https://github.com/apache/samza/pull/1585#discussion_r820018242



##########
File path: samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala
##########
@@ -219,6 +231,7 @@ class SystemConsumers (
     // If elasticity is enabled then the RunLoop gives SSP with keybucket
     // but the MessageChooser does not know about the KeyBucket
     // hence, use an SSP without KeyBucket
+    sspKeyBucketsRegistered.add(ssp)

Review comment:
       udpated to use condition when correcting choseObejct metric




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@samza.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] lakshmi-manasa-g commented on a change in pull request #1585: SAMZA-2719: [Elasticity] fix container level metrics when elasticity is enabled

Posted by GitBox <gi...@apache.org>.
lakshmi-manasa-g commented on a change in pull request #1585:
URL: https://github.com/apache/samza/pull/1585#discussion_r819996936



##########
File path: samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala
##########
@@ -219,6 +231,7 @@ class SystemConsumers (
     // If elasticity is enabled then the RunLoop gives SSP with keybucket
     // but the MessageChooser does not know about the KeyBucket
     // hence, use an SSP without KeyBucket
+    sspKeyBucketsRegistered.add(ssp)

Review comment:
       `sspKeyBucketsRegistered` is used in one place only - where the choseObject is incremented. is the comment to not have `sspKeyBucketsRegistered` if elasticity factor = 1? With elasticity factor > 1, this book keeping is needed for correct counting of chose-object
   
   removeKeyBucket is needed when elasticity > 1. it is currently used in 4 places and without this it would mean an if then else check in all the places. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@samza.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] rmatharu commented on a change in pull request #1585: SAMZA-2719: [Elasticity] fix container level metrics when elasticity is enabled

Posted by GitBox <gi...@apache.org>.
rmatharu commented on a change in pull request #1585:
URL: https://github.com/apache/samza/pull/1585#discussion_r814165532



##########
File path: samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala
##########
@@ -335,7 +353,19 @@ class SystemConsumers (
       while (sspAndEnvelopeIterator.hasNext) {
         val sspAndEnvelope = sspAndEnvelopeIterator.next
         val systemStreamPartition = sspAndEnvelope.getKey
-        val envelopes = new ArrayDeque(sspAndEnvelope.getValue)
+        val filtered_envelopes = new util.ArrayList[IncomingMessageEnvelope](sspAndEnvelope.getValue)
+        // filter out all the envelopes with SSP not registered with this SystemConsumers
+        // with elasticity enabled, SSP of the envelope will be the SSP with KeyBucket
+        // and hence will filter out envelopes if their key bucket is not registered
+        // without elasticity, there are no key buckets
+        // and hence full SSP is registered and all envelopes of the SSP will be retained
+        filtered_envelopes.removeIf {
+          new Predicate[IncomingMessageEnvelope] {
+            def test(envelope: IncomingMessageEnvelope): Boolean =
+              !sspKeyBucketsRegistered.contains(envelope.getSystemStreamPartition(elasticityFactor))
+          }
+        }
+        val envelopes = new ArrayDeque[IncomingMessageEnvelope](filtered_envelopes)

Review comment:
       Then why does RunLoop need to decrement the metric when we have filtering here?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@samza.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] rmatharu commented on a change in pull request #1585: SAMZA-2719: [Elasticity] fix container level metrics when elasticity is enabled

Posted by GitBox <gi...@apache.org>.
rmatharu commented on a change in pull request #1585:
URL: https://github.com/apache/samza/pull/1585#discussion_r814164814



##########
File path: samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala
##########
@@ -112,13 +114,22 @@ class SystemConsumers (
    * Clock can be used to inject a custom clock when mocking this class in
    * tests. The default implementation returns the current system clock time.
    */
-  val clock: () => Long = () => System.nanoTime()) extends Logging with TimerUtil {
+  val clock: () => Long = () => System.nanoTime(),
+
+  val elasticityFactor: Int = 1) extends Logging with TimerUtil {
 
   /**
    * Mapping from the {@see SystemStreamPartition} to the registered offsets.
    */
   private val sspToRegisteredOffsets = new HashMap[SystemStreamPartition, String]()
 
+  /**
+   * Set of all the SystemStreamPartitions registered with this SystemConsumers
+   * With elasticity enabled, the SystemStreamPartitions are actually key buckets within a full SSP
+   * Without elasticity, there are no key buckets and hence is the full SSP
+   */
+  private val sspKeyBucketsRegistered = new HashSet[SystemStreamPartition] ()
+

Review comment:
       Seems like in some places in code we use SystemStreamPartition (with keybucket as -1) some places with keyBucket as not -1 and call it "full SSP" 
   would it be better to create a separate class called 
   SystemStreamPartitionKeyBucket extend SystemStreamPartition 
   so that we deal with SystemStreamPartitionKeyBucket wherever/whenever we want keyBucket !=-1, and wherever keyBucket is -1 we use SSP. 
   
   will make code much understandable and debuggable for future.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@samza.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] cameronlee314 merged pull request #1585: SAMZA-2719: [Elasticity] fix container level metrics when elasticity is enabled

Posted by GitBox <gi...@apache.org>.
cameronlee314 merged pull request #1585:
URL: https://github.com/apache/samza/pull/1585


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@samza.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] rmatharu commented on a change in pull request #1585: SAMZA-2719: [Elasticity] fix container level metrics when elasticity is enabled

Posted by GitBox <gi...@apache.org>.
rmatharu commented on a change in pull request #1585:
URL: https://github.com/apache/samza/pull/1585#discussion_r814161058



##########
File path: samza-core/src/main/java/org/apache/samza/container/RunLoop.java
##########
@@ -276,6 +276,9 @@ private void runTasks(IncomingMessageEnvelope envelope) {
           consumerMultiplexer.tryUpdate(envelope.getSystemStreamPartition(elasticityFactor));

Review comment:
       In the comments above 
   "this condition happens when only a subset of keyBuckets of an SSP are being consumed at this container (but not all keyBuckets), therefore, we 
   fake the consumption of the envelope, 
   and decrement the metric, since this envelope's processing is  skipped.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@samza.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] lakshmi-manasa-g commented on a change in pull request #1585: SAMZA-2719: [Elasticity] fix container level metrics when elasticity is enabled

Posted by GitBox <gi...@apache.org>.
lakshmi-manasa-g commented on a change in pull request #1585:
URL: https://github.com/apache/samza/pull/1585#discussion_r819043852



##########
File path: samza-core/src/main/java/org/apache/samza/container/RunLoop.java
##########
@@ -276,6 +276,9 @@ private void runTasks(IncomingMessageEnvelope envelope) {
           consumerMultiplexer.tryUpdate(envelope.getSystemStreamPartition(elasticityFactor));
           log.trace("updating the system consumers for ssp keyBucket {} not processed by this runloop",
               envelope.getSystemStreamPartition(elasticityFactor));
+          // since this envelope is not processed by the container, need to decrement the # envelopes metric
+          // # envelopes metric was incremented when the envelope was returned by the SystemConsumers
+          containerMetrics.envelopes().dec();

Review comment:
       added

##########
File path: samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala
##########
@@ -112,13 +114,22 @@ class SystemConsumers (
    * Clock can be used to inject a custom clock when mocking this class in
    * tests. The default implementation returns the current system clock time.
    */
-  val clock: () => Long = () => System.nanoTime()) extends Logging with TimerUtil {
+  val clock: () => Long = () => System.nanoTime(),
+
+  val elasticityFactor: Int = 1) extends Logging with TimerUtil {
 
   /**
    * Mapping from the {@see SystemStreamPartition} to the registered offsets.
    */
   private val sspToRegisteredOffsets = new HashMap[SystemStreamPartition, String]()
 
+  /**
+   * Set of all the SystemStreamPartitions registered with this SystemConsumers
+   * With elasticity enabled, the SystemStreamPartitions are actually key buckets within a full SSP
+   * Without elasticity, there are no key buckets and hence is the full SSP
+   */
+  private val sspKeyBucketsRegistered = new HashSet[SystemStreamPartition] ()
+

Review comment:
       involves rework of previous elasticity PRs. Prefer doing this is as a separate PR and not in this one. SAMZA-2726 for this

##########
File path: samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala
##########
@@ -335,7 +353,19 @@ class SystemConsumers (
       while (sspAndEnvelopeIterator.hasNext) {
         val sspAndEnvelope = sspAndEnvelopeIterator.next
         val systemStreamPartition = sspAndEnvelope.getKey
-        val envelopes = new ArrayDeque(sspAndEnvelope.getValue)
+        val filtered_envelopes = new util.ArrayList[IncomingMessageEnvelope](sspAndEnvelope.getValue)
+        // filter out all the envelopes with SSP not registered with this SystemConsumers
+        // with elasticity enabled, SSP of the envelope will be the SSP with KeyBucket
+        // and hence will filter out envelopes if their key bucket is not registered
+        // without elasticity, there are no key buckets
+        // and hence full SSP is registered and all envelopes of the SSP will be retained
+        filtered_envelopes.removeIf {
+          new Predicate[IncomingMessageEnvelope] {
+            def test(envelope: IncomingMessageEnvelope): Boolean =
+              !sspKeyBucketsRegistered.contains(envelope.getSystemStreamPartition(elasticityFactor))
+          }
+        }
+        val envelopes = new ArrayDeque[IncomingMessageEnvelope](filtered_envelopes)

Review comment:
       removing this filtering logic as this is an optimzation but needs more investigation. created SAMZA-2724 for this piece.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@samza.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] cameronlee314 commented on pull request #1585: SAMZA-2719: [Elasticity] fix container level metrics when elasticity is enabled

Posted by GitBox <gi...@apache.org>.
cameronlee314 commented on pull request #1585:
URL: https://github.com/apache/samza/pull/1585#issuecomment-1061348400


   Merging at the request of @rmatharu who is having some account issues


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@samza.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] rmatharu commented on a change in pull request #1585: SAMZA-2719: [Elasticity] fix container level metrics when elasticity is enabled

Posted by GitBox <gi...@apache.org>.
rmatharu commented on a change in pull request #1585:
URL: https://github.com/apache/samza/pull/1585#discussion_r819982545



##########
File path: samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala
##########
@@ -219,6 +231,7 @@ class SystemConsumers (
     // If elasticity is enabled then the RunLoop gives SSP with keybucket
     // but the MessageChooser does not know about the KeyBucket
     // hence, use an SSP without KeyBucket
+    sspKeyBucketsRegistered.add(ssp)

Review comment:
       should this be conditional on if the elascticity factor > 1 ?
   otherwise, the removeKeyBucket stuff can be skipped?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@samza.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] rmatharu commented on a change in pull request #1585: SAMZA-2719: [Elasticity] fix container level metrics when elasticity is enabled

Posted by GitBox <gi...@apache.org>.
rmatharu commented on a change in pull request #1585:
URL: https://github.com/apache/samza/pull/1585#discussion_r814161456



##########
File path: samza-core/src/main/java/org/apache/samza/container/RunLoop.java
##########
@@ -276,6 +276,9 @@ private void runTasks(IncomingMessageEnvelope envelope) {
           consumerMultiplexer.tryUpdate(envelope.getSystemStreamPartition(elasticityFactor));
           log.trace("updating the system consumers for ssp keyBucket {} not processed by this runloop",
               envelope.getSystemStreamPartition(elasticityFactor));
+          // since this envelope is not processed by the container, need to decrement the # envelopes metric
+          // # envelopes metric was incremented when the envelope was returned by the SystemConsumers
+          containerMetrics.envelopes().dec();

Review comment:
       Can we add a metric here called envelopesSkipped that we increment in this case?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@samza.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org