You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by bh...@apache.org on 2021/04/29 17:48:08 UTC
[samza] branch master updated: SAMZA-2652: Application Master High
Availability metric - change counter to gauge (#1497)
This is an automated email from the ASF dual-hosted git repository.
bharathkk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git
The following commit(s) were added to refs/heads/master by this push:
new 8250a43 SAMZA-2652: Application Master High Availability metric - change counter to gauge (#1497)
8250a43 is described below
commit 8250a4380374e71bfcc9fe1ba43fd4cda773e7a0
Author: lakshmi-manasa-g <mg...@linkedin.com>
AuthorDate: Thu Apr 29 10:48:01 2021 -0700
SAMZA-2652: Application Master High Availability metric - change counter to gauge (#1497)
Description: some of the AM HA metrics introduced in PR #1455 are counters. these get incremented correctly but go back to 0 and can not be measured a little later in time.
Changes: making these counters as gauges.
---
.../coordinator/JobCoordinatorMetadataManager.java | 35 +++++++++++-----------
.../TestJobCoordinatorMetadataManager.java | 10 +++----
.../samza/job/yarn/SamzaAppMasterMetrics.scala | 4 +--
3 files changed, 24 insertions(+), 25 deletions(-)
diff --git a/samza-core/src/main/java/org/apache/samza/coordinator/JobCoordinatorMetadataManager.java b/samza-core/src/main/java/org/apache/samza/coordinator/JobCoordinatorMetadataManager.java
index 860c596..7e40382 100644
--- a/samza-core/src/main/java/org/apache/samza/coordinator/JobCoordinatorMetadataManager.java
+++ b/samza-core/src/main/java/org/apache/samza/coordinator/JobCoordinatorMetadataManager.java
@@ -36,7 +36,6 @@ import org.apache.samza.coordinator.stream.messages.SetJobCoordinatorMetadataMes
import org.apache.samza.job.JobCoordinatorMetadata;
import org.apache.samza.job.model.JobModel;
import org.apache.samza.metadatastore.MetadataStore;
-import org.apache.samza.metrics.Counter;
import org.apache.samza.metrics.Gauge;
import org.apache.samza.metrics.MetricsRegistry;
import org.apache.samza.serializers.Serde;
@@ -284,44 +283,44 @@ public class JobCoordinatorMetadataManager {
private static final String METADATA_WRITE_FAILED_COUNT = "metadata-write-failed-count";
private static final String NEW_DEPLOYMENT = "new-deployment";
- private final Counter applicationAttemptCount;
- private final Counter metadataGenerationFailedCount;
- private final Counter metadataReadFailedCount;
- private final Counter metadataWriteFailedCount;
+ private final Gauge<Integer> applicationAttemptCount;
+ private final Gauge<Integer> metadataGenerationFailedCount;
+ private final Gauge<Integer> metadataReadFailedCount;
+ private final Gauge<Integer> metadataWriteFailedCount;
private final Gauge<Integer> jobModelChangedAcrossApplicationAttempt;
private final Gauge<Integer> configChangedAcrossApplicationAttempt;
private final Gauge<Integer> newDeployment;
public JobCoordinatorMetadataManagerMetrics(MetricsRegistry registry) {
- applicationAttemptCount = registry.newCounter(GROUP, APPLICATION_ATTEMPT_COUNT);
+ applicationAttemptCount = registry.newGauge(GROUP, APPLICATION_ATTEMPT_COUNT, 0);
configChangedAcrossApplicationAttempt =
registry.newGauge(GROUP, CONFIG_CHANGED, 0);
jobModelChangedAcrossApplicationAttempt =
registry.newGauge(GROUP, JOB_MODEL_CHANGED, 0);
- metadataGenerationFailedCount = registry.newCounter(GROUP,
- METADATA_GENERATION_FAILED_COUNT);
- metadataReadFailedCount = registry.newCounter(GROUP, METADATA_READ_FAILED_COUNT);
- metadataWriteFailedCount = registry.newCounter(GROUP, METADATA_WRITE_FAILED_COUNT);
+ metadataGenerationFailedCount = registry.newGauge(GROUP,
+ METADATA_GENERATION_FAILED_COUNT, 0);
+ metadataReadFailedCount = registry.newGauge(GROUP, METADATA_READ_FAILED_COUNT, 0);
+ metadataWriteFailedCount = registry.newGauge(GROUP, METADATA_WRITE_FAILED_COUNT, 0);
newDeployment = registry.newGauge(GROUP, NEW_DEPLOYMENT, 0);
}
@VisibleForTesting
- Counter getApplicationAttemptCount() {
+ Gauge<Integer> getApplicationAttemptCount() {
return applicationAttemptCount;
}
@VisibleForTesting
- Counter getMetadataGenerationFailedCount() {
+ Gauge<Integer> getMetadataGenerationFailedCount() {
return metadataGenerationFailedCount;
}
@VisibleForTesting
- Counter getMetadataReadFailedCount() {
+ Gauge<Integer> getMetadataReadFailedCount() {
return metadataReadFailedCount;
}
@VisibleForTesting
- Counter getMetadataWriteFailedCount() {
+ Gauge<Integer> getMetadataWriteFailedCount() {
return metadataWriteFailedCount;
}
@@ -341,19 +340,19 @@ public class JobCoordinatorMetadataManager {
}
void incrementApplicationAttemptCount() {
- applicationAttemptCount.inc();
+ applicationAttemptCount.set(applicationAttemptCount.getValue() + 1);
}
void incrementMetadataGenerationFailedCount() {
- metadataGenerationFailedCount.inc();
+ metadataGenerationFailedCount.set(metadataGenerationFailedCount.getValue() + 1);
}
void incrementMetadataReadFailedCount() {
- metadataReadFailedCount.inc();
+ metadataReadFailedCount.set(metadataReadFailedCount.getValue() + 1);
}
void incrementMetadataWriteFailedCount() {
- metadataWriteFailedCount.inc();
+ metadataWriteFailedCount.set(metadataWriteFailedCount.getValue() + 1);
}
void setConfigChangedAcrossApplicationAttempt(int value) {
diff --git a/samza-core/src/test/java/org/apache/samza/coordinator/TestJobCoordinatorMetadataManager.java b/samza-core/src/test/java/org/apache/samza/coordinator/TestJobCoordinatorMetadataManager.java
index d623aed..b1739d9 100644
--- a/samza-core/src/test/java/org/apache/samza/coordinator/TestJobCoordinatorMetadataManager.java
+++ b/samza-core/src/test/java/org/apache/samza/coordinator/TestJobCoordinatorMetadataManager.java
@@ -141,13 +141,13 @@ public class TestJobCoordinatorMetadataManager {
JobCoordinatorMetadata newMetadataWithNoChange =
new JobCoordinatorMetadata(OLD_EPOCH_ID, OLD_CONFIG_ID, OLD_JOB_MODEL_ID);
assertEquals("Application attempt count should be 0", 0,
- metrics.getApplicationAttemptCount().getCount());
+ metrics.getApplicationAttemptCount().getValue().intValue());
metadataChanged =
jobCoordinatorMetadataManager.checkForMetadataChanges(previousMetadata, newMetadataWithNoChange);
assertFalse("Metadata check should return false", metadataChanged);
assertEquals("Application attempt count should be 1", 1,
- metrics.getApplicationAttemptCount().getCount());
+ metrics.getApplicationAttemptCount().getValue().intValue());
}
@Test
@@ -161,7 +161,7 @@ public class TestJobCoordinatorMetadataManager {
} catch (Exception e) {
assertTrue("Expecting SamzaException to be thrown", e instanceof SamzaException);
assertEquals("Metadata generation failed count should be 1", 1,
- jobCoordinatorMetadataManager.getMetrics().getMetadataGenerationFailedCount().getCount());
+ jobCoordinatorMetadataManager.getMetrics().getMetadataGenerationFailedCount().getValue().intValue());
}
}
@@ -211,7 +211,7 @@ public class TestJobCoordinatorMetadataManager {
JobCoordinatorMetadata actualMetadata = jobCoordinatorMetadataManager.readJobCoordinatorMetadata();
assertNull("Read failed should return null", actualMetadata);
assertEquals("Metadata read failed count should be 1", 1,
- jobCoordinatorMetadataManager.getMetrics().getMetadataReadFailedCount().getCount());
+ jobCoordinatorMetadataManager.getMetrics().getMetadataReadFailedCount().getValue().intValue());
}
@Test
@@ -240,7 +240,7 @@ public class TestJobCoordinatorMetadataManager {
} catch (Exception e) {
assertTrue("Expecting SamzaException to be thrown", e instanceof SamzaException);
assertEquals("Metadata write failed count should be 1", 1,
- jobCoordinatorMetadataManager.getMetrics().getMetadataWriteFailedCount().getCount());
+ jobCoordinatorMetadataManager.getMetrics().getMetadataWriteFailedCount().getValue().intValue());
}
}
}
diff --git a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterMetrics.scala b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterMetrics.scala
index b85e3c5..fb540f7 100644
--- a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterMetrics.scala
+++ b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterMetrics.scala
@@ -41,12 +41,12 @@ class SamzaAppMasterMetrics(val config: Config,
val registry: ReadableMetricsRegistry) extends MetricsHelper with Logging {
private val metricsConfig = new MetricsConfig(config)
- val containersFromPreviousAttempts = newCounter("container-from-previous-attempt")
+ val containersFromPreviousAttempts = newGauge("container-from-previous-attempt", 0L)
val reporters = MetricsReporterLoader.getMetricsReporters(metricsConfig, SamzaAppMasterMetrics.sourceName).asScala
reporters.values.foreach(_.register(SamzaAppMasterMetrics.sourceName, registry))
def setContainersFromPreviousAttempts(containerCount: Int) {
- containersFromPreviousAttempts.inc(containerCount)
+ containersFromPreviousAttempts.set(containerCount)
}
def start() {