You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by bh...@apache.org on 2021/04/29 17:48:08 UTC

[samza] branch master updated: SAMZA-2652: Application Master High Availability metric - change counter to gauge (#1497)

This is an automated email from the ASF dual-hosted git repository.

bharathkk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git


The following commit(s) were added to refs/heads/master by this push:
     new 8250a43  SAMZA-2652: Application Master High Availability metric - change counter to gauge (#1497)
8250a43 is described below

commit 8250a4380374e71bfcc9fe1ba43fd4cda773e7a0
Author: lakshmi-manasa-g <mg...@linkedin.com>
AuthorDate: Thu Apr 29 10:48:01 2021 -0700

    SAMZA-2652: Application Master High Availability metric - change counter to gauge (#1497)
    
    Description: some of the AM HA metrics introduced in PR #1455 are counters. these get incremented correctly but go back to 0 and can not be measured a little later in time.
    
    Changes: making these counters as gauges.
---
 .../coordinator/JobCoordinatorMetadataManager.java | 35 +++++++++++-----------
 .../TestJobCoordinatorMetadataManager.java         | 10 +++----
 .../samza/job/yarn/SamzaAppMasterMetrics.scala     |  4 +--
 3 files changed, 24 insertions(+), 25 deletions(-)

diff --git a/samza-core/src/main/java/org/apache/samza/coordinator/JobCoordinatorMetadataManager.java b/samza-core/src/main/java/org/apache/samza/coordinator/JobCoordinatorMetadataManager.java
index 860c596..7e40382 100644
--- a/samza-core/src/main/java/org/apache/samza/coordinator/JobCoordinatorMetadataManager.java
+++ b/samza-core/src/main/java/org/apache/samza/coordinator/JobCoordinatorMetadataManager.java
@@ -36,7 +36,6 @@ import org.apache.samza.coordinator.stream.messages.SetJobCoordinatorMetadataMes
 import org.apache.samza.job.JobCoordinatorMetadata;
 import org.apache.samza.job.model.JobModel;
 import org.apache.samza.metadatastore.MetadataStore;
-import org.apache.samza.metrics.Counter;
 import org.apache.samza.metrics.Gauge;
 import org.apache.samza.metrics.MetricsRegistry;
 import org.apache.samza.serializers.Serde;
@@ -284,44 +283,44 @@ public class JobCoordinatorMetadataManager {
     private static final String METADATA_WRITE_FAILED_COUNT = "metadata-write-failed-count";
     private static final String NEW_DEPLOYMENT = "new-deployment";
 
-    private final Counter applicationAttemptCount;
-    private final Counter metadataGenerationFailedCount;
-    private final Counter metadataReadFailedCount;
-    private final Counter metadataWriteFailedCount;
+    private final Gauge<Integer> applicationAttemptCount;
+    private final Gauge<Integer> metadataGenerationFailedCount;
+    private final Gauge<Integer> metadataReadFailedCount;
+    private final Gauge<Integer> metadataWriteFailedCount;
     private final Gauge<Integer> jobModelChangedAcrossApplicationAttempt;
     private final Gauge<Integer> configChangedAcrossApplicationAttempt;
     private final Gauge<Integer> newDeployment;
 
     public JobCoordinatorMetadataManagerMetrics(MetricsRegistry registry) {
-      applicationAttemptCount = registry.newCounter(GROUP, APPLICATION_ATTEMPT_COUNT);
+      applicationAttemptCount = registry.newGauge(GROUP, APPLICATION_ATTEMPT_COUNT, 0);
       configChangedAcrossApplicationAttempt =
           registry.newGauge(GROUP, CONFIG_CHANGED, 0);
       jobModelChangedAcrossApplicationAttempt =
           registry.newGauge(GROUP, JOB_MODEL_CHANGED, 0);
-      metadataGenerationFailedCount = registry.newCounter(GROUP,
-          METADATA_GENERATION_FAILED_COUNT);
-      metadataReadFailedCount = registry.newCounter(GROUP, METADATA_READ_FAILED_COUNT);
-      metadataWriteFailedCount = registry.newCounter(GROUP, METADATA_WRITE_FAILED_COUNT);
+      metadataGenerationFailedCount = registry.newGauge(GROUP,
+          METADATA_GENERATION_FAILED_COUNT, 0);
+      metadataReadFailedCount = registry.newGauge(GROUP, METADATA_READ_FAILED_COUNT, 0);
+      metadataWriteFailedCount = registry.newGauge(GROUP, METADATA_WRITE_FAILED_COUNT, 0);
       newDeployment = registry.newGauge(GROUP, NEW_DEPLOYMENT, 0);
     }
 
     @VisibleForTesting
-    Counter getApplicationAttemptCount() {
+    Gauge<Integer> getApplicationAttemptCount() {
       return applicationAttemptCount;
     }
 
     @VisibleForTesting
-    Counter getMetadataGenerationFailedCount() {
+    Gauge<Integer> getMetadataGenerationFailedCount() {
       return metadataGenerationFailedCount;
     }
 
     @VisibleForTesting
-    Counter getMetadataReadFailedCount() {
+    Gauge<Integer> getMetadataReadFailedCount() {
       return metadataReadFailedCount;
     }
 
     @VisibleForTesting
-    Counter getMetadataWriteFailedCount() {
+    Gauge<Integer> getMetadataWriteFailedCount() {
       return metadataWriteFailedCount;
     }
 
@@ -341,19 +340,19 @@ public class JobCoordinatorMetadataManager {
     }
 
     void incrementApplicationAttemptCount() {
-      applicationAttemptCount.inc();
+      applicationAttemptCount.set(applicationAttemptCount.getValue() + 1);
     }
 
     void incrementMetadataGenerationFailedCount() {
-      metadataGenerationFailedCount.inc();
+      metadataGenerationFailedCount.set(metadataGenerationFailedCount.getValue() + 1);
     }
 
     void incrementMetadataReadFailedCount() {
-      metadataReadFailedCount.inc();
+      metadataReadFailedCount.set(metadataReadFailedCount.getValue() + 1);
     }
 
     void incrementMetadataWriteFailedCount() {
-      metadataWriteFailedCount.inc();
+      metadataWriteFailedCount.set(metadataWriteFailedCount.getValue() + 1);
     }
 
     void setConfigChangedAcrossApplicationAttempt(int value) {
diff --git a/samza-core/src/test/java/org/apache/samza/coordinator/TestJobCoordinatorMetadataManager.java b/samza-core/src/test/java/org/apache/samza/coordinator/TestJobCoordinatorMetadataManager.java
index d623aed..b1739d9 100644
--- a/samza-core/src/test/java/org/apache/samza/coordinator/TestJobCoordinatorMetadataManager.java
+++ b/samza-core/src/test/java/org/apache/samza/coordinator/TestJobCoordinatorMetadataManager.java
@@ -141,13 +141,13 @@ public class TestJobCoordinatorMetadataManager {
     JobCoordinatorMetadata newMetadataWithNoChange =
         new JobCoordinatorMetadata(OLD_EPOCH_ID, OLD_CONFIG_ID, OLD_JOB_MODEL_ID);
     assertEquals("Application attempt count should be 0", 0,
-        metrics.getApplicationAttemptCount().getCount());
+        metrics.getApplicationAttemptCount().getValue().intValue());
 
     metadataChanged =
         jobCoordinatorMetadataManager.checkForMetadataChanges(previousMetadata, newMetadataWithNoChange);
     assertFalse("Metadata check should return false", metadataChanged);
     assertEquals("Application attempt count should be 1", 1,
-        metrics.getApplicationAttemptCount().getCount());
+        metrics.getApplicationAttemptCount().getValue().intValue());
   }
 
   @Test
@@ -161,7 +161,7 @@ public class TestJobCoordinatorMetadataManager {
     } catch (Exception e) {
       assertTrue("Expecting SamzaException to be thrown", e instanceof SamzaException);
       assertEquals("Metadata generation failed count should be 1", 1,
-          jobCoordinatorMetadataManager.getMetrics().getMetadataGenerationFailedCount().getCount());
+          jobCoordinatorMetadataManager.getMetrics().getMetadataGenerationFailedCount().getValue().intValue());
     }
   }
 
@@ -211,7 +211,7 @@ public class TestJobCoordinatorMetadataManager {
     JobCoordinatorMetadata actualMetadata = jobCoordinatorMetadataManager.readJobCoordinatorMetadata();
     assertNull("Read failed should return null", actualMetadata);
     assertEquals("Metadata read failed count should be 1", 1,
-        jobCoordinatorMetadataManager.getMetrics().getMetadataReadFailedCount().getCount());
+        jobCoordinatorMetadataManager.getMetrics().getMetadataReadFailedCount().getValue().intValue());
   }
 
   @Test
@@ -240,7 +240,7 @@ public class TestJobCoordinatorMetadataManager {
     } catch (Exception e) {
       assertTrue("Expecting SamzaException to be thrown", e instanceof SamzaException);
       assertEquals("Metadata write failed count should be 1", 1,
-          jobCoordinatorMetadataManager.getMetrics().getMetadataWriteFailedCount().getCount());
+          jobCoordinatorMetadataManager.getMetrics().getMetadataWriteFailedCount().getValue().intValue());
     }
   }
 }
diff --git a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterMetrics.scala b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterMetrics.scala
index b85e3c5..fb540f7 100644
--- a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterMetrics.scala
+++ b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterMetrics.scala
@@ -41,12 +41,12 @@ class SamzaAppMasterMetrics(val config: Config,
   val registry: ReadableMetricsRegistry) extends MetricsHelper with Logging {
 
   private val metricsConfig = new MetricsConfig(config)
-  val containersFromPreviousAttempts = newCounter("container-from-previous-attempt")
+  val containersFromPreviousAttempts = newGauge("container-from-previous-attempt", 0L)
   val reporters = MetricsReporterLoader.getMetricsReporters(metricsConfig, SamzaAppMasterMetrics.sourceName).asScala
   reporters.values.foreach(_.register(SamzaAppMasterMetrics.sourceName, registry))
 
   def setContainersFromPreviousAttempts(containerCount: Int) {
-    containersFromPreviousAttempts.inc(containerCount)
+    containersFromPreviousAttempts.set(containerCount)
   }
 
   def start() {