You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by su...@apache.org on 2021/06/01 23:16:22 UTC
[gobblin] branch master updated: [GOBBLIN-1452] Add meters for
successful/failed dags in total and by flowGroup
This is an automated email from the ASF dual-hosted git repository.
suvasude pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 557f255 [GOBBLIN-1452] Add meters for successful/failed dags in total and by flowGroup
557f255 is described below
commit 557f255d973255e50420f821b512f7c805552ae1
Author: Jack Moseley <jm...@linkedin.com>
AuthorDate: Tue Jun 1 16:16:16 2021 -0700
[GOBBLIN-1452] Add meters for successful/failed dags in total and by flowGroup
Closes #3290 from jack-moseley/flow-meters
---
.../apache/gobblin/metrics/ServiceMetricNames.java | 2 ++
.../service/modules/orchestration/DagManager.java | 37 +++++++++++++++++++---
2 files changed, 35 insertions(+), 4 deletions(-)
diff --git a/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/ServiceMetricNames.java b/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/ServiceMetricNames.java
index 269e5bc..7e6bb55 100644
--- a/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/ServiceMetricNames.java
+++ b/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/ServiceMetricNames.java
@@ -40,6 +40,8 @@ public class ServiceMetricNames {
public static final String CREATE_FLOW_METER = "CreateFlow";
public static final String DELETE_FLOW_METER = "DeleteFlow";
public static final String RUN_IMMEDIATELY_FLOW_METER = "RunImmediatelyFlow";
+ public static final String SUCCESSFUL_FLOW_METER = "SuccessfulFlows";
+ public static final String FAILED_FLOW_METER = "FailedFlows";
public static final String RUNNING_FLOWS_COUNTER = "RunningFlows";
public static final String SERVICE_USERS = "ServiceUsers";
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
index 727c438..37632d7 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
@@ -59,6 +59,7 @@ import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.instrumented.Instrumented;
import org.apache.gobblin.metrics.ContextAwareCounter;
import org.apache.gobblin.metrics.ContextAwareGauge;
+import org.apache.gobblin.metrics.ContextAwareMeter;
import org.apache.gobblin.metrics.MetricContext;
import org.apache.gobblin.metrics.RootMetricContext;
import org.apache.gobblin.metrics.ServiceMetricNames;
@@ -431,6 +432,8 @@ public class DagManager extends AbstractIdleService {
private final Map<String, Integer> perUserQuota;
private final AtomicLong orchestrationDelay = new AtomicLong(0);
private static Map<String, FlowState> flowGauges = Maps.newHashMap();
+ private ContextAwareMeter allSuccessfulMeter;
+ private ContextAwareMeter allFailedMeter;
private JobStatusRetriever jobStatusRetriever;
private DagStateStore dagStateStore;
@@ -460,6 +463,10 @@ public class DagManager extends AbstractIdleService {
this.jobStatusPolledTimer = Optional.of(this.metricContext.timer(ServiceMetricNames.JOB_STATUS_POLLED_TIMER));
ContextAwareGauge<Long> orchestrationDelayMetric = metricContext.newContextAwareGauge(ServiceMetricNames.FLOW_ORCHESTRATION_DELAY,
() -> orchestrationDelay.get());
+ this.allSuccessfulMeter = metricContext.contextAwareMeter(
+ MetricRegistry.name(ServiceMetricNames.GOBBLIN_SERVICE_PREFIX, ServiceMetricNames.SUCCESSFUL_FLOW_METER));
+ this.allFailedMeter = metricContext.contextAwareMeter(
+ MetricRegistry.name(ServiceMetricNames.GOBBLIN_SERVICE_PREFIX, ServiceMetricNames.FAILED_FLOW_METER));
this.metricContext.register(orchestrationDelayMetric);
} else {
this.metricContext = null;
@@ -1133,6 +1140,12 @@ public class DagManager extends AbstractIdleService {
return counters;
}
+
+ private ContextAwareMeter getGroupMeterForDag(String dagId, String meterName) {
+ String flowGroup = DagManagerUtils.getFlowId(this.dags.get(dagId)).getFlowGroup();
+ return metricContext.contextAwareMeter(MetricRegistry.name(ServiceMetricNames.GOBBLIN_SERVICE_PREFIX, flowGroup, meterName));
+ }
+
/**
* Perform clean up. Remove a dag from the dagstore if the dag is complete and update internal state.
*/
@@ -1140,7 +1153,6 @@ public class DagManager extends AbstractIdleService {
List<String> dagIdstoClean = new ArrayList<>();
//Clean up failed dags
for (String dagId : this.failedDagIdsFinishRunning) {
- addFailedDag(dagId);
//Skip monitoring of any other jobs of the failed dag.
LinkedList<DagNode<JobExecutionPlan>> dagNodeList = this.dagToJobs.get(dagId);
while (!dagNodeList.isEmpty()) {
@@ -1148,7 +1160,7 @@ public class DagManager extends AbstractIdleService {
deleteJobState(dagId, dagNode);
}
log.info("Dag {} has finished with status FAILED; Cleaning up dag from the state store.", dagId);
- flowGauges.put(DagManagerUtils.getFlowId(this.dags.get(dagId)).toString(), FlowState.FAILED);
+ onFlowFailure(dagId);
// send an event before cleaning up dag
DagManagerUtils.emitFlowEvent(this.eventSubmitter, this.dags.get(dagId), TimingEvent.FlowTimings.FLOW_FAILED);
dagIdstoClean.add(dagId);
@@ -1159,12 +1171,12 @@ public class DagManager extends AbstractIdleService {
if (!hasRunningJobs(dagId) && !this.failedDagIdsFinishRunning.contains(dagId)) {
String status = TimingEvent.FlowTimings.FLOW_SUCCEEDED;
if (this.failedDagIdsFinishAllPossible.contains(dagId)) {
- addFailedDag(dagId);
+ onFlowFailure(dagId);
status = TimingEvent.FlowTimings.FLOW_FAILED;
this.failedDagIdsFinishAllPossible.remove(dagId);
flowGauges.put(DagManagerUtils.getFlowId(this.dags.get(dagId)).toString(), FlowState.FAILED);
} else {
- flowGauges.put(DagManagerUtils.getFlowId(this.dags.get(dagId)).toString(), FlowState.SUCCESSFUL);
+ onFlowSuccess(dagId);
}
log.info("Dag {} has finished with status {}; Cleaning up dag from the state store.", dagId, status);
// send an event before cleaning up dag
@@ -1178,6 +1190,23 @@ public class DagManager extends AbstractIdleService {
}
}
+ private void onFlowSuccess(String dagId) {
+ if (this.metricContext != null) {
+ flowGauges.put(DagManagerUtils.getFlowId(this.dags.get(dagId)).toString(), FlowState.SUCCESSFUL);
+ this.allSuccessfulMeter.mark();
+ getGroupMeterForDag(dagId, ServiceMetricNames.SUCCESSFUL_FLOW_METER).mark();
+ }
+ }
+
+ private void onFlowFailure(String dagId) {
+ addFailedDag(dagId);
+ if (this.metricContext != null) {
+ flowGauges.put(DagManagerUtils.getFlowId(this.dags.get(dagId)).toString(), FlowState.FAILED);
+ this.allFailedMeter.mark();
+ getGroupMeterForDag(dagId, ServiceMetricNames.FAILED_FLOW_METER).mark();
+ }
+ }
+
/**
* Add a dag to failed dag state store
*/