You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by su...@apache.org on 2021/06/01 23:16:22 UTC

[gobblin] branch master updated: [GOBBLIN-1452] Add meters for successful/failed dags in total and by flowGroup

This is an automated email from the ASF dual-hosted git repository.

suvasude pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 557f255  [GOBBLIN-1452] Add meters for successful/failed dags in total and by flowGroup
557f255 is described below

commit 557f255d973255e50420f821b512f7c805552ae1
Author: Jack Moseley <jm...@linkedin.com>
AuthorDate: Tue Jun 1 16:16:16 2021 -0700

    [GOBBLIN-1452] Add meters for successful/failed dags in total and by flowGroup
    
    Closes #3290 from jack-moseley/flow-meters
---
 .../apache/gobblin/metrics/ServiceMetricNames.java |  2 ++
 .../service/modules/orchestration/DagManager.java  | 37 +++++++++++++++++++---
 2 files changed, 35 insertions(+), 4 deletions(-)

diff --git a/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/ServiceMetricNames.java b/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/ServiceMetricNames.java
index 269e5bc..7e6bb55 100644
--- a/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/ServiceMetricNames.java
+++ b/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/ServiceMetricNames.java
@@ -40,6 +40,8 @@ public class ServiceMetricNames {
   public static final String CREATE_FLOW_METER = "CreateFlow";
   public static final String DELETE_FLOW_METER = "DeleteFlow";
   public static final String RUN_IMMEDIATELY_FLOW_METER = "RunImmediatelyFlow";
+  public static final String SUCCESSFUL_FLOW_METER = "SuccessfulFlows";
+  public static final String FAILED_FLOW_METER = "FailedFlows";
 
   public static final String RUNNING_FLOWS_COUNTER = "RunningFlows";
   public static final String SERVICE_USERS = "ServiceUsers";
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
index 727c438..37632d7 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
@@ -59,6 +59,7 @@ import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.instrumented.Instrumented;
 import org.apache.gobblin.metrics.ContextAwareCounter;
 import org.apache.gobblin.metrics.ContextAwareGauge;
+import org.apache.gobblin.metrics.ContextAwareMeter;
 import org.apache.gobblin.metrics.MetricContext;
 import org.apache.gobblin.metrics.RootMetricContext;
 import org.apache.gobblin.metrics.ServiceMetricNames;
@@ -431,6 +432,8 @@ public class DagManager extends AbstractIdleService {
     private final Map<String, Integer> perUserQuota;
     private final AtomicLong orchestrationDelay = new AtomicLong(0);
     private static Map<String, FlowState> flowGauges = Maps.newHashMap();
+    private ContextAwareMeter allSuccessfulMeter;
+    private ContextAwareMeter allFailedMeter;
 
     private JobStatusRetriever jobStatusRetriever;
     private DagStateStore dagStateStore;
@@ -460,6 +463,10 @@ public class DagManager extends AbstractIdleService {
         this.jobStatusPolledTimer = Optional.of(this.metricContext.timer(ServiceMetricNames.JOB_STATUS_POLLED_TIMER));
         ContextAwareGauge<Long> orchestrationDelayMetric = metricContext.newContextAwareGauge(ServiceMetricNames.FLOW_ORCHESTRATION_DELAY,
             () -> orchestrationDelay.get());
+        this.allSuccessfulMeter = metricContext.contextAwareMeter(
+            MetricRegistry.name(ServiceMetricNames.GOBBLIN_SERVICE_PREFIX, ServiceMetricNames.SUCCESSFUL_FLOW_METER));
+        this.allFailedMeter = metricContext.contextAwareMeter(
+            MetricRegistry.name(ServiceMetricNames.GOBBLIN_SERVICE_PREFIX, ServiceMetricNames.FAILED_FLOW_METER));
         this.metricContext.register(orchestrationDelayMetric);
       } else {
         this.metricContext = null;
@@ -1133,6 +1140,12 @@ public class DagManager extends AbstractIdleService {
 
       return counters;
     }
+
+    private ContextAwareMeter getGroupMeterForDag(String dagId, String meterName) {
+      String flowGroup = DagManagerUtils.getFlowId(this.dags.get(dagId)).getFlowGroup();
+      return metricContext.contextAwareMeter(MetricRegistry.name(ServiceMetricNames.GOBBLIN_SERVICE_PREFIX, flowGroup, meterName));
+    }
+
     /**
      * Perform clean up. Remove a dag from the dagstore if the dag is complete and update internal state.
      */
@@ -1140,7 +1153,6 @@ public class DagManager extends AbstractIdleService {
       List<String> dagIdstoClean = new ArrayList<>();
       //Clean up failed dags
       for (String dagId : this.failedDagIdsFinishRunning) {
-        addFailedDag(dagId);
         //Skip monitoring of any other jobs of the failed dag.
         LinkedList<DagNode<JobExecutionPlan>> dagNodeList = this.dagToJobs.get(dagId);
         while (!dagNodeList.isEmpty()) {
@@ -1148,7 +1160,7 @@ public class DagManager extends AbstractIdleService {
           deleteJobState(dagId, dagNode);
         }
         log.info("Dag {} has finished with status FAILED; Cleaning up dag from the state store.", dagId);
-        flowGauges.put(DagManagerUtils.getFlowId(this.dags.get(dagId)).toString(), FlowState.FAILED);
+        onFlowFailure(dagId);
         // send an event before cleaning up dag
         DagManagerUtils.emitFlowEvent(this.eventSubmitter, this.dags.get(dagId), TimingEvent.FlowTimings.FLOW_FAILED);
         dagIdstoClean.add(dagId);
@@ -1159,12 +1171,12 @@ public class DagManager extends AbstractIdleService {
         if (!hasRunningJobs(dagId) && !this.failedDagIdsFinishRunning.contains(dagId)) {
           String status = TimingEvent.FlowTimings.FLOW_SUCCEEDED;
           if (this.failedDagIdsFinishAllPossible.contains(dagId)) {
-            addFailedDag(dagId);
+            onFlowFailure(dagId);
             status = TimingEvent.FlowTimings.FLOW_FAILED;
             this.failedDagIdsFinishAllPossible.remove(dagId);
             flowGauges.put(DagManagerUtils.getFlowId(this.dags.get(dagId)).toString(), FlowState.FAILED);
           } else {
-            flowGauges.put(DagManagerUtils.getFlowId(this.dags.get(dagId)).toString(), FlowState.SUCCESSFUL);
+            onFlowSuccess(dagId);
           }
           log.info("Dag {} has finished with status {}; Cleaning up dag from the state store.", dagId, status);
           // send an event before cleaning up dag
@@ -1178,6 +1190,23 @@ public class DagManager extends AbstractIdleService {
       }
     }
 
+    private void onFlowSuccess(String dagId) {
+      if (this.metricContext != null) {
+        flowGauges.put(DagManagerUtils.getFlowId(this.dags.get(dagId)).toString(), FlowState.SUCCESSFUL);
+        this.allSuccessfulMeter.mark();
+        getGroupMeterForDag(dagId, ServiceMetricNames.SUCCESSFUL_FLOW_METER).mark();
+      }
+    }
+
+    private void onFlowFailure(String dagId) {
+      addFailedDag(dagId);
+      if (this.metricContext != null) {
+        flowGauges.put(DagManagerUtils.getFlowId(this.dags.get(dagId)).toString(), FlowState.FAILED);
+        this.allFailedMeter.mark();
+        getGroupMeterForDag(dagId, ServiceMetricNames.FAILED_FLOW_METER).mark();
+      }
+    }
+
     /**
      * Add a dag to failed dag state store
      */