You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by su...@apache.org on 2020/04/23 20:14:39 UTC

[incubator-gobblin] branch master updated: [GOBBLIN-1123][GOBBLIN-1124][GOBBLIN-1115] Report orchestration delay for Gobblin Service flows

This is an automated email from the ASF dual-hosted git repository.

suvasude pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new a52e1a8  [GOBBLIN-1123][GOBBLIN-1124][GOBBLIN-1115] Report orchestration delay for Gobblin Service flows
a52e1a8 is described below

commit a52e1a8fe0958a813e4c1967879cc17a12b444ce
Author: sv2000 <su...@gmail.com>
AuthorDate: Thu Apr 23 13:14:31 2020 -0700

    [GOBBLIN-1123][GOBBLIN-1124][GOBBLIN-1115] Report orchestration delay for Gobblin Service flows
    
    Closes #2963 from sv2000/gaasOrchestrationDelay
---
 .../metrics/reporter/util/MetricReportUtils.java   |  5 -----
 .../apache/gobblin/metrics/ServiceMetricNames.java | 14 +++++++-----
 .../service/FlowConfigResourceLocalHandler.java    | 19 +++++++++-------
 .../gobblin/runtime/AbstractJobLauncher.java       |  5 ++---
 .../modules/core/GobblinServiceManager.java        | 14 ++++++------
 .../service/modules/orchestration/DagManager.java  | 25 ++++++++++++++++++----
 .../modules/orchestration/Orchestrator.java        |  3 +--
 7 files changed, 50 insertions(+), 35 deletions(-)

diff --git a/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/reporter/util/MetricReportUtils.java b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/reporter/util/MetricReportUtils.java
index b51dcc6..b9eefec 100644
--- a/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/reporter/util/MetricReportUtils.java
+++ b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/reporter/util/MetricReportUtils.java
@@ -41,11 +41,6 @@ public class MetricReportUtils {
 
   public static final int SCHEMA_VERSION = 1;
   private static Optional<SpecificDatumReader<MetricReport>> READER = Optional.absent();
-  // These prefixes can be used to distinguish metrics reported by GobblinService from other metrics reported by Gobblin
-  // This can be used in conjunction with MetricNameRegexFilter to filter out metrics in any MetricReporter
-  public static final String GOBBLIN_SERVICE_METRICS_PREFIX = "GobblinService";
-  public static final String GOBBLIN_JOB_METRICS_PREFIX = "JobMetrics";
-
   /**
    * Parses a {@link org.apache.gobblin.metrics.MetricReport} from a byte array representing a json input.
    * @param reuse MetricReport to reuse.
diff --git a/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/ServiceMetricNames.java b/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/ServiceMetricNames.java
index 9fe3515..89d62a4 100644
--- a/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/ServiceMetricNames.java
+++ b/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/ServiceMetricNames.java
@@ -17,7 +17,10 @@
 package org.apache.gobblin.metrics;
 
 public class ServiceMetricNames {
-  private static final String GOBBLIN_SERVICE_PREFIX = "gobblin.service.";
+  // These prefixes can be used to distinguish metrics reported by GobblinService from other metrics reported by Gobblin
+  // This can be used in conjunction with MetricNameRegexFilter to filter out metrics in any MetricReporter
+  public static final String GOBBLIN_SERVICE_PREFIX = "GobblinService";
+  public static final String GOBBLIN_JOB_METRICS_PREFIX = "JobMetrics";
 
   // Flow Compilation Meters and Timer
   public static final String FLOW_COMPILATION_SUCCESSFUL_METER = GOBBLIN_SERVICE_PREFIX + "flowCompilation.successful";
@@ -26,12 +29,13 @@ public class ServiceMetricNames {
   public static final String DATA_AUTHORIZATION_TIMER = GOBBLIN_SERVICE_PREFIX + "flowCompilation.dataAuthorization.time";
 
   // Flow Orchestration Meters and Timer
-  public static final String FLOW_ORCHESTRATION_SUCCESSFUL_METER = GOBBLIN_SERVICE_PREFIX + "flowOrchestration.successful";
-  public static final String FLOW_ORCHESTRATION_FAILED_METER = GOBBLIN_SERVICE_PREFIX + "flowOrchestration.failed";
-  public static final String FLOW_ORCHESTRATION_TIMER = GOBBLIN_SERVICE_PREFIX + "flowOrchestration.time";
+  public static final String FLOW_ORCHESTRATION_SUCCESSFUL_METER = GOBBLIN_SERVICE_PREFIX + ".flowOrchestration.successful";
+  public static final String FLOW_ORCHESTRATION_FAILED_METER = GOBBLIN_SERVICE_PREFIX + ".flowOrchestration.failed";
+  public static final String FLOW_ORCHESTRATION_TIMER = GOBBLIN_SERVICE_PREFIX + ".flowOrchestration.time";
+  public static final String FLOW_ORCHESTRATION_DELAY = GOBBLIN_SERVICE_PREFIX + ".flowOrchestration.delay";
 
   //Job status poll timer
-  public static final String JOB_STATUS_POLLED_TIMER = GOBBLIN_SERVICE_PREFIX + "jobStatusPoll.time";
+  public static final String JOB_STATUS_POLLED_TIMER = GOBBLIN_SERVICE_PREFIX + ".jobStatusPoll.time";
 
   public static final String CREATE_FLOW_METER = "CreateFlow";
   public static final String DELETE_FLOW_METER = "DeleteFlow";
diff --git a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigResourceLocalHandler.java b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigResourceLocalHandler.java
index 0534887..6885db4 100644
--- a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigResourceLocalHandler.java
+++ b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigResourceLocalHandler.java
@@ -17,6 +17,12 @@
 
 package org.apache.gobblin.service;
 
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.Properties;
+
+import org.apache.commons.lang.StringUtils;
+
 import com.codahale.metrics.MetricRegistry;
 import com.google.common.collect.Maps;
 import com.linkedin.data.template.StringMap;
@@ -29,19 +35,16 @@ import com.linkedin.restli.server.RestLiServiceException;
 import com.linkedin.restli.server.UpdateResponse;
 import com.typesafe.config.Config;
 import com.typesafe.config.ConfigFactory;
-import java.net.URI;
-import java.net.URISyntaxException;
-import java.util.Properties;
+
 import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
-import org.apache.commons.lang.StringUtils;
+
 import org.apache.gobblin.config.ConfigBuilder;
 import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.instrumented.Instrumented;
 import org.apache.gobblin.metrics.ContextAwareMeter;
 import org.apache.gobblin.metrics.MetricContext;
 import org.apache.gobblin.metrics.ServiceMetricNames;
-import org.apache.gobblin.metrics.reporter.util.MetricReportUtils;
 import org.apache.gobblin.runtime.api.FlowSpec;
 import org.apache.gobblin.runtime.api.SpecNotFoundException;
 import org.apache.gobblin.runtime.spec_catalog.FlowCatalog;
@@ -64,11 +67,11 @@ public class FlowConfigResourceLocalHandler implements FlowConfigsResourceHandle
     this.flowCatalog = flowCatalog;
     MetricContext metricContext = Instrumented.getMetricContext(ConfigUtils.configToState(ConfigFactory.empty()), getClass());
     this.createFlow = metricContext.contextAwareMeter(
-        MetricRegistry.name(MetricReportUtils.GOBBLIN_SERVICE_METRICS_PREFIX, ServiceMetricNames.CREATE_FLOW_METER));
+        MetricRegistry.name(ServiceMetricNames.GOBBLIN_SERVICE_PREFIX, ServiceMetricNames.CREATE_FLOW_METER));
     this.deleteFlow = metricContext.contextAwareMeter(
-        MetricRegistry.name(MetricReportUtils.GOBBLIN_SERVICE_METRICS_PREFIX, ServiceMetricNames.DELETE_FLOW_METER));
+        MetricRegistry.name(ServiceMetricNames.GOBBLIN_SERVICE_PREFIX, ServiceMetricNames.DELETE_FLOW_METER));
     this.runImmediatelyFlow = metricContext.contextAwareMeter(
-        MetricRegistry.name(MetricReportUtils.GOBBLIN_SERVICE_METRICS_PREFIX, ServiceMetricNames.RUN_IMMEDIATELY_FLOW_METER));
+        MetricRegistry.name(ServiceMetricNames.GOBBLIN_SERVICE_PREFIX, ServiceMetricNames.RUN_IMMEDIATELY_FLOW_METER));
   }
 
   /**
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/AbstractJobLauncher.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/AbstractJobLauncher.java
index 7d3081b..98249aa 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/AbstractJobLauncher.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/AbstractJobLauncher.java
@@ -64,13 +64,12 @@ import org.apache.gobblin.metrics.ContextAwareGauge;
 import org.apache.gobblin.metrics.GobblinMetrics;
 import org.apache.gobblin.metrics.GobblinMetricsRegistry;
 import org.apache.gobblin.metrics.MetricContext;
-import org.apache.gobblin.metrics.RootMetricContext;
+import org.apache.gobblin.metrics.ServiceMetricNames;
 import org.apache.gobblin.metrics.Tag;
 import org.apache.gobblin.metrics.event.EventName;
 import org.apache.gobblin.metrics.event.EventSubmitter;
 import org.apache.gobblin.metrics.event.JobEvent;
 import org.apache.gobblin.metrics.event.TimingEvent;
-import org.apache.gobblin.metrics.reporter.util.MetricReportUtils;
 import org.apache.gobblin.runtime.api.EventMetadataGenerator;
 import org.apache.gobblin.runtime.api.JobSpec;
 import org.apache.gobblin.runtime.api.JobTemplate;
@@ -417,7 +416,7 @@ public abstract class AbstractJobLauncher implements JobLauncher {
 
         if (this.runtimeMetricContext.isPresent()) {
           String workunitCreationGaugeName = MetricRegistry
-              .name(MetricReportUtils.GOBBLIN_JOB_METRICS_PREFIX, TimingEvent.LauncherTimings.WORK_UNITS_CREATION,
+              .name(ServiceMetricNames.GOBBLIN_JOB_METRICS_PREFIX, TimingEvent.LauncherTimings.WORK_UNITS_CREATION,
                   jobState.getJobName());
           long workUnitsCreationTime = workUnitsCreationTimer.getDuration() / TimeUnit.SECONDS.toMillis(1);
           ContextAwareGauge<Integer> workunitCreationGauge = this.runtimeMetricContext.get()
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceManager.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceManager.java
index 01bdb4d..fefbfa2 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceManager.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceManager.java
@@ -17,9 +17,6 @@
 
 package org.apache.gobblin.service.modules.core;
 
-import lombok.Getter;
-import lombok.Setter;
-
 import java.io.IOException;
 import java.lang.reflect.InvocationTargetException;
 import java.net.URI;
@@ -28,9 +25,6 @@ import java.util.Map;
 import java.util.Properties;
 import java.util.concurrent.TimeUnit;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 import org.apache.commons.cli.CommandLine;
 import org.apache.commons.cli.DefaultParser;
 import org.apache.commons.cli.HelpFormatter;
@@ -44,6 +38,8 @@ import org.apache.helix.ControllerChangeListener;
 import org.apache.helix.HelixManager;
 import org.apache.helix.NotificationContext;
 import org.apache.helix.model.Message;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import com.codahale.metrics.MetricRegistry;
 import com.google.common.annotations.VisibleForTesting;
@@ -64,6 +60,9 @@ import com.linkedin.restli.server.resources.BaseResource;
 import com.typesafe.config.Config;
 import com.typesafe.config.ConfigFactory;
 
+import lombok.Getter;
+import lombok.Setter;
+
 import org.apache.gobblin.annotation.Alpha;
 import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.instrumented.Instrumented;
@@ -72,7 +71,6 @@ import org.apache.gobblin.metrics.ContextAwareGauge;
 import org.apache.gobblin.metrics.ContextAwareHistogram;
 import org.apache.gobblin.metrics.MetricContext;
 import org.apache.gobblin.metrics.ServiceMetricNames;
-import org.apache.gobblin.metrics.reporter.util.MetricReportUtils;
 import org.apache.gobblin.restli.EmbeddedRestliServer;
 import org.apache.gobblin.runtime.api.TopologySpec;
 import org.apache.gobblin.runtime.app.ApplicationException;
@@ -226,7 +224,7 @@ public class GobblinServiceManager implements ApplicationLauncher, StandardMetri
     // Initialize Helix leader guage
     helixLeaderGauges = Optional.of(new HelixLeaderState());
     String helixLeaderStateGaugeName =
-        MetricRegistry.name(MetricReportUtils.GOBBLIN_SERVICE_METRICS_PREFIX, ServiceMetricNames.HELIX_LEADER_STATE);
+        MetricRegistry.name(ServiceMetricNames.GOBBLIN_SERVICE_PREFIX, ServiceMetricNames.HELIX_LEADER_STATE);
     ContextAwareGauge<Integer> gauge = metricContext.newContextAwareGauge(helixLeaderStateGaugeName, () -> helixLeaderGauges.get().state.getValue());
     metricContext.register(helixLeaderStateGaugeName, gauge);
 
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
index a730787..e2a11b6 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
@@ -36,6 +36,7 @@ import java.util.concurrent.Future;
 import java.util.concurrent.LinkedBlockingDeque;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.commons.lang3.StringUtils;
 
@@ -56,11 +57,11 @@ import org.apache.gobblin.annotation.Alpha;
 import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.instrumented.Instrumented;
 import org.apache.gobblin.metrics.ContextAwareCounter;
+import org.apache.gobblin.metrics.ContextAwareGauge;
 import org.apache.gobblin.metrics.MetricContext;
 import org.apache.gobblin.metrics.ServiceMetricNames;
 import org.apache.gobblin.metrics.event.EventSubmitter;
 import org.apache.gobblin.metrics.event.TimingEvent;
-import org.apache.gobblin.metrics.reporter.util.MetricReportUtils;
 import org.apache.gobblin.runtime.api.JobSpec;
 import org.apache.gobblin.runtime.api.Spec;
 import org.apache.gobblin.runtime.api.SpecProducer;
@@ -389,6 +390,7 @@ public class DagManager extends AbstractIdleService {
     private final Optional<Timer> jobStatusPolledTimer;
     private final int defaultQuota;
     private final Map<String, Integer> perUserQuota;
+    private final AtomicLong orchestrationDelay = new AtomicLong(0);
 
     private JobStatusRetriever jobStatusRetriever;
     private DagStateStore dagStateStore;
@@ -411,6 +413,9 @@ public class DagManager extends AbstractIdleService {
         this.metricContext = Instrumented.getMetricContext(ConfigUtils.configToState(ConfigFactory.empty()), getClass());
         this.eventSubmitter = Optional.of(new EventSubmitter.Builder(this.metricContext, "org.apache.gobblin.service").build());
         this.jobStatusPolledTimer = Optional.of(this.metricContext.timer(ServiceMetricNames.JOB_STATUS_POLLED_TIMER));
+        ContextAwareGauge<Long> orchestrationDelayMetric = metricContext.newContextAwareGauge(ServiceMetricNames.FLOW_ORCHESTRATION_DELAY,
+            () -> orchestrationDelay.get());
+        this.metricContext.register(orchestrationDelayMetric);
       } else {
         this.metricContext = null;
         this.eventSubmitter = Optional.absent();
@@ -511,13 +516,18 @@ public class DagManager extends AbstractIdleService {
 
       this.dags.put(dagId, dag);
       log.debug("Dag {} - determining if any jobs are already running.", DagManagerUtils.getFullyQualifiedDagName(dag));
+
+      //A flag to indicate if the flow is already running.
+      boolean isDagRunning = false;
       //Are there any jobs already in the running state? This check is for Dags already running
       //before a leadership change occurs.
       for (DagNode<JobExecutionPlan> dagNode : dag.getNodes()) {
         if (DagManagerUtils.getExecutionStatus(dagNode) == RUNNING) {
           addJobState(dagId, dagNode);
+          isDagRunning = true;
         }
       }
+
       log.debug("Dag {} submitting jobs ready for execution.", DagManagerUtils.getFullyQualifiedDagName(dag));
       //Determine the next set of jobs to run and submit them for execution
       Map<String, Set<DagNode<JobExecutionPlan>>> nextSubmitted = submitNext(dagId);
@@ -528,6 +538,13 @@ public class DagManager extends AbstractIdleService {
       // Set flow status to running
       DagManagerUtils.emitFlowEvent(this.eventSubmitter, dag, TimingEvent.FlowTimings.FLOW_RUNNING);
 
+      // Report the orchestration delay the first time the Dag is initialized. Orchestration delay is defined as
+      // the time difference between the instant when a flow first transitions to the running state and the instant
+      // when the flow is submitted to Gobblin service.
+      if (!isDagRunning) {
+        this.orchestrationDelay.set(System.currentTimeMillis() - DagManagerUtils.getFlowExecId(dag));
+      }
+
       log.info("Dag {} Initialization complete.", DagManagerUtils.getFullyQualifiedDagName(dag));
     }
 
@@ -914,7 +931,7 @@ public class DagManager extends AbstractIdleService {
     private ContextAwareCounter getRunningJobsCounter(DagNode<JobExecutionPlan> dagNode) {
       return metricContext.contextAwareCounter(
           MetricRegistry.name(
-              MetricReportUtils.GOBBLIN_SERVICE_METRICS_PREFIX,
+              ServiceMetricNames.GOBBLIN_SERVICE_PREFIX,
               ServiceMetricNames.RUNNING_FLOWS_COUNTER,
               dagNode.getValue().getSpecExecutor().getUri().toString()));
     }
@@ -927,7 +944,7 @@ public class DagManager extends AbstractIdleService {
       if (StringUtils.isNotEmpty(proxy)) {
         counters.add(metricContext.contextAwareCounter(
             MetricRegistry.name(
-                MetricReportUtils.GOBBLIN_SERVICE_METRICS_PREFIX,
+                ServiceMetricNames.GOBBLIN_SERVICE_PREFIX,
                 ServiceMetricNames.SERVICE_USERS, proxy)));
       }
 
@@ -937,7 +954,7 @@ public class DagManager extends AbstractIdleService {
           List<ServiceRequester> requesters = RequesterService.deserialize(serializedRequesters);
           for (ServiceRequester requester : requesters) {
             counters.add(metricContext.contextAwareCounter(MetricRegistry
-                .name(MetricReportUtils.GOBBLIN_SERVICE_METRICS_PREFIX, ServiceMetricNames.SERVICE_USERS, requester.getName())));
+                .name(ServiceMetricNames.GOBBLIN_SERVICE_PREFIX, ServiceMetricNames.SERVICE_USERS, requester.getName())));
           }
         }
       } catch (IOException e) {
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
index b7b6593..cefd513 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
@@ -54,7 +54,6 @@ import org.apache.gobblin.metrics.ServiceMetricNames;
 import org.apache.gobblin.metrics.Tag;
 import org.apache.gobblin.metrics.event.EventSubmitter;
 import org.apache.gobblin.metrics.event.TimingEvent;
-import org.apache.gobblin.metrics.reporter.util.MetricReportUtils;
 import org.apache.gobblin.runtime.api.FlowSpec;
 import org.apache.gobblin.runtime.api.JobSpec;
 import org.apache.gobblin.runtime.api.Spec;
@@ -230,7 +229,7 @@ public class Orchestrator implements SpecCatalogListener, Instrumentable {
       String flowName = flowConfig.getString(ConfigurationKeys.FLOW_NAME_KEY);
 
       if (!flowGauges.containsKey(spec.getUri().toString())) {
-        String flowCompiledGaugeName = MetricRegistry.name(MetricReportUtils.GOBBLIN_SERVICE_METRICS_PREFIX, flowGroup, flowName, ServiceMetricNames.COMPILED);
+        String flowCompiledGaugeName = MetricRegistry.name(ServiceMetricNames.GOBBLIN_SERVICE_PREFIX, flowGroup, flowName, ServiceMetricNames.COMPILED);
         flowGauges.put(spec.getUri().toString(), new FlowCompiledState());
         ContextAwareGauge<Integer> gauge = RootMetricContext.get().newContextAwareGauge(flowCompiledGaugeName, () -> flowGauges.get(spec.getUri().toString()).state.value);
         RootMetricContext.get().register(flowCompiledGaugeName, gauge);