You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by wl...@apache.org on 2023/03/04 02:42:31 UTC

[gobblin] branch master updated: [GOBBLIN-1793] Add metrics to measure and isolate bottleneck for init… (#3652)

This is an automated email from the ASF dual-hosted git repository.

wlo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 2d3095a53 [GOBBLIN-1793] Add metrics to measure and isolate bottleneck for init… (#3652)
2d3095a53 is described below

commit 2d3095a5366f7e3adf63579c8c237854d5630990
Author: umustafi <um...@gmail.com>
AuthorDate: Fri Mar 3 18:42:24 2023 -0800

    [GOBBLIN-1793] Add metrics to measure and isolate bottleneck for init… (#3652)
    
    * [GOBBLIN-1793] Add metrics to measure and isolate bottleneck for initializing scheduler
    
    * remove unecessary checks
    
    * convert millsecond measurement to nanosecond
    
    ---------
    
    Co-authored-by: Urmi Mustafi <um...@umustafi-mn1.linkedin.biz>
---
 .../gobblin/runtime/metrics/RuntimeMetrics.java    | 15 ++++++-
 .../scheduler/GobblinServiceJobScheduler.java      | 49 +++++++++++++++++-----
 2 files changed, 51 insertions(+), 13 deletions(-)

diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/metrics/RuntimeMetrics.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/metrics/RuntimeMetrics.java
index b24ef000d..3ad15c933 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/metrics/RuntimeMetrics.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/metrics/RuntimeMetrics.java
@@ -54,9 +54,20 @@ public class RuntimeMetrics {
   public static final String GOBBLIN_MYSQL_QUOTA_MANAGER_QUOTA_REQUESTS_EXCEEDED = ServiceMetricNames.GOBBLIN_SERVICE_PREFIX + "gobblin.mysql.quota.manager.quotaRequests.exceeded";
   public static final String GOBBLIN_MYSQL_QUOTA_MANAGER_TIME_TO_CHECK_QUOTA = ServiceMetricNames.GOBBLIN_SERVICE_PREFIX + "gobblin.mysql.quota.manager.time.to.check.quota";
 
-  public static final String GOBBLIN_JOB_SCHEDULER_AVERAGE_GET_SPEC_SPEED_WHILE_LOADING_ALL_SPECS_MILLIS = ServiceMetricNames.GOBBLIN_SERVICE_PREFIX + ".jobScheduler.getSpecSpeedDuringStartupAvgMillis";
+  // The following metrics are used to identify the bottlenecks for initializing the job scheduler
+  public static final String GOBBLIN_JOB_SCHEDULER_AVERAGE_GET_SPEC_SPEED_WHILE_LOADING_ALL_SPECS_NANOS = ServiceMetricNames.GOBBLIN_SERVICE_PREFIX + ".jobScheduler.getSpecSpeedDuringStartupAvgNanos";
   public static final String GOBBLIN_JOB_SCHEDULER_LOAD_SPECS_BATCH_SIZE = ServiceMetricNames.GOBBLIN_SERVICE_PREFIX + ".jobScheduler.loadSpecBatchSize";
-  public static final String GOBBLIN_JOB_SCHEDULER_TIME_TO_INITIALIZE_SCHEDULER_MILLIS = ServiceMetricNames.GOBBLIN_SERVICE_PREFIX + ".jobScheduler.timeToInitializeSchedulerMillis";
+  public static final String
+      GOBBLIN_JOB_SCHEDULER_TIME_TO_INITIALIZE_SCHEDULER_NANOS = ServiceMetricNames.GOBBLIN_SERVICE_PREFIX + ".jobScheduler.timeToInitializeSchedulerNanos";
+  public static final String
+      GOBBLIN_JOB_SCHEDULER_TIME_TO_OBTAIN_SPEC_URIS_NANOS = ServiceMetricNames.GOBBLIN_SERVICE_PREFIX + ".jobScheduler.timeToObtainSpecUrisNanos";
+  public static final String
+      GOBBLIN_JOB_SCHEDULER_INDIVIDUAL_GET_SPEC_SPEED_NANOS = ServiceMetricNames.GOBBLIN_SERVICE_PREFIX + ".jobScheduler.individualGetSpecSpeedNanos";
+  public static final String GOBBLIN_JOB_SCHEDULER_ADD_SPEC_TIME_NANOS = ServiceMetricNames.GOBBLIN_SERVICE_PREFIX + ".jobScheduler.totalAddSpecTimeNanos";
+  public static final String
+      GOBBLIN_JOB_SCHEDULER_FLOW_COMPILATION_TIME_NANOS = ServiceMetricNames.GOBBLIN_SERVICE_PREFIX + ".jobScheduler.specCompilationTimeNanos";
+  public static final String
+      GOBBLIN_JOB_SCHEDULER_TIME_TO_SCHEDULE_ONE_JOB_NANOS = ServiceMetricNames.GOBBLIN_SERVICE_PREFIX + ".jobScheduler.timeToScheduleOneJobNanos";
   // Metadata keys
   public static final String TOPIC = "topic";
   public static final String GROUP_ID = "groupId";
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java
index 54b9a6a26..8c64ed129 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java
@@ -113,9 +113,19 @@ public class GobblinServiceJobScheduler extends JobScheduler implements SpecCata
   private String serviceName;
   private volatile Long averageGetSpecTimeValue = -1L;
   private volatile Long timeToInitializeSchedulerValue = -1L;
-  private final ContextAwareGauge averageGetSpecTimeMillis = metricContext.newContextAwareGauge(RuntimeMetrics.GOBBLIN_JOB_SCHEDULER_AVERAGE_GET_SPEC_SPEED_WHILE_LOADING_ALL_SPECS_MILLIS, () -> this.averageGetSpecTimeValue);;
+  private volatile Long timeToObtainSpecUrisNanosValue = -1L;
+  private volatile Long individualGetSpecSpeedNanosValue = -1L;
+  private volatile Long addSpecTimeNanosValue = -1L;
+  private volatile Long flowCompilationTimeNanosValue = -1L;
+  private volatile Long timeToScheduleOneJobValue = -1L;
+  private final ContextAwareGauge averageGetSpecTimeNanos = metricContext.newContextAwareGauge(RuntimeMetrics.GOBBLIN_JOB_SCHEDULER_AVERAGE_GET_SPEC_SPEED_WHILE_LOADING_ALL_SPECS_NANOS, () -> this.averageGetSpecTimeValue);;
   private final ContextAwareGauge batchSize = metricContext.newContextAwareGauge(RuntimeMetrics.GOBBLIN_JOB_SCHEDULER_LOAD_SPECS_BATCH_SIZE, () -> this.loadSpecsBatchSize);
-  private final ContextAwareGauge timeToInitalizeSchedulerMillis = metricContext.newContextAwareGauge(RuntimeMetrics.GOBBLIN_JOB_SCHEDULER_TIME_TO_INITIALIZE_SCHEDULER_MILLIS, () -> this.timeToInitializeSchedulerValue);
+  private final ContextAwareGauge timeToInitalizeSchedulerNanos = metricContext.newContextAwareGauge(RuntimeMetrics.GOBBLIN_JOB_SCHEDULER_TIME_TO_INITIALIZE_SCHEDULER_NANOS, () -> this.timeToInitializeSchedulerValue);
+  private final ContextAwareGauge timeToObtainSpecUrisNanos = metricContext.newContextAwareGauge(RuntimeMetrics.GOBBLIN_JOB_SCHEDULER_TIME_TO_OBTAIN_SPEC_URIS_NANOS, () -> timeToObtainSpecUrisNanosValue);
+  private final ContextAwareGauge individualGetSpecSpeedNanos = metricContext.newContextAwareGauge(RuntimeMetrics.GOBBLIN_JOB_SCHEDULER_INDIVIDUAL_GET_SPEC_SPEED_NANOS, () -> individualGetSpecSpeedNanosValue);
+  private final ContextAwareGauge addSpecTimeNanos = metricContext.newContextAwareGauge(RuntimeMetrics.GOBBLIN_JOB_SCHEDULER_ADD_SPEC_TIME_NANOS, () -> addSpecTimeNanosValue);
+  private final ContextAwareGauge flowCompilationTimeNanos = metricContext.newContextAwareGauge(RuntimeMetrics.GOBBLIN_JOB_SCHEDULER_FLOW_COMPILATION_TIME_NANOS, () -> flowCompilationTimeNanosValue);
+  private final ContextAwareGauge timeToScheduleOneJob = metricContext.newContextAwareGauge(RuntimeMetrics.GOBBLIN_JOB_SCHEDULER_TIME_TO_SCHEDULE_ONE_JOB_NANOS, () -> timeToScheduleOneJobValue);
   private static final MetricContext metricContext = Instrumented.getMetricContext(new org.apache.gobblin.configuration.State(),
       GobblinServiceJobScheduler.class);
   private static final ContextAwareMeter scheduledFlows = metricContext.contextAwareMeter(ServiceMetricNames.SCHEDULED_FLOW_METER);
@@ -123,7 +133,7 @@ public class GobblinServiceJobScheduler extends JobScheduler implements SpecCata
 
   /**
    * If current instances is nominated as a handler for DR traffic from down GaaS-Instance.
-   * Note this is, currently, different from leadership change/fail-over handling, where the traffice could come
+   * Note this is, currently, different from leadership change/fail-over handling, where the traffic could come
    * from GaaS instance out of current GaaS Cluster:
    * e.g. There are multi-datacenter deployment of GaaS Cluster. Intra-datacenter fail-over could be handled by
    * leadership change mechanism, while inter-datacenter fail-over would be handled by DR handling mechanism.
@@ -158,11 +168,16 @@ public class GobblinServiceJobScheduler extends JobScheduler implements SpecCata
     this.quotaManager = quotaManager;
     // Check that these metrics do not exist before adding, mainly for testing purpose which creates multiple instances
     // of the scheduler. If one metric exists, then the others should as well.
-    MetricFilter filter = MetricFilter.contains(RuntimeMetrics.GOBBLIN_JOB_SCHEDULER_AVERAGE_GET_SPEC_SPEED_WHILE_LOADING_ALL_SPECS_MILLIS);
+    MetricFilter filter = MetricFilter.contains(RuntimeMetrics.GOBBLIN_JOB_SCHEDULER_AVERAGE_GET_SPEC_SPEED_WHILE_LOADING_ALL_SPECS_NANOS);
     if (metricContext.getGauges(filter).isEmpty()) {
-      metricContext.register(this.averageGetSpecTimeMillis);
+      metricContext.register(this.averageGetSpecTimeNanos);
       metricContext.register(this.batchSize);
-      metricContext.register(timeToInitalizeSchedulerMillis);
+      metricContext.register(this.timeToInitalizeSchedulerNanos);
+      metricContext.register(this.timeToObtainSpecUrisNanos);
+      metricContext.register(this.individualGetSpecSpeedNanos);
+      metricContext.register(this.addSpecTimeNanos);
+      metricContext.register(this.flowCompilationTimeNanos);
+      metricContext.register(this.timeToScheduleOneJob);
     }
   }
 
@@ -237,7 +252,8 @@ public class GobblinServiceJobScheduler extends JobScheduler implements SpecCata
    */
   private void scheduleSpecsFromCatalog() {
     int numSpecs = this.flowCatalog.get().getSize();
-    long startTime = System.currentTimeMillis();
+    _log.info("Scheduling specs from catalog: {} flows to schedule", numSpecs);
+    long startTime = System.nanoTime();
     Iterator<URI> uriIterator;
     HashSet<URI> urisLeftToSchedule = new HashSet<>();
     try {
@@ -248,6 +264,7 @@ public class GobblinServiceJobScheduler extends JobScheduler implements SpecCata
     } catch (IOException e) {
       throw new RuntimeException(e);
     }
+    this.timeToObtainSpecUrisNanosValue = System.nanoTime() - startTime;
 
     try {
       // If current instances nominated as DR handler, will take additional URIS from FlowCatalog.
@@ -266,10 +283,10 @@ public class GobblinServiceJobScheduler extends JobScheduler implements SpecCata
     long batchGetEndTime;
 
     while (startOffset < numSpecs) {
-      batchGetStartTime  = System.currentTimeMillis();
+      batchGetStartTime  = System.nanoTime();
       Collection<Spec> batchOfSpecs = this.flowCatalog.get().getSpecsPaginated(startOffset, this.loadSpecsBatchSize);
       Iterator<Spec> batchOfSpecsIterator = batchOfSpecs.iterator();
-      batchGetEndTime = System.currentTimeMillis();
+      batchGetEndTime = System.nanoTime();
 
       while (batchOfSpecsIterator.hasNext()) {
         Spec spec = batchOfSpecsIterator.next();
@@ -289,10 +306,13 @@ public class GobblinServiceJobScheduler extends JobScheduler implements SpecCata
 
     // Ensure we did not miss any specs due to ordering changing (deletions/insertions) while loading
     Iterator<URI> urisLeft = urisLeftToSchedule.iterator();
+    long individualGetSpecStartTime;
     while (urisLeft.hasNext()) {
         URI uri = urisLeft.next();
         try {
+          individualGetSpecStartTime = System.nanoTime();
           Spec spec = this.flowCatalog.get().getSpecWrapper(uri);
+          this.individualGetSpecSpeedNanosValue = System.nanoTime() - individualGetSpecStartTime;
           addSpecHelperMethod(spec);
         } catch (Exception e) {
           // If there is an uncaught error thrown during compilation, log it and continue adding flows
@@ -302,7 +322,7 @@ public class GobblinServiceJobScheduler extends JobScheduler implements SpecCata
     }
 
     this.flowCatalog.get().getMetrics().updateGetSpecTime(startTime);
-    this.timeToInitializeSchedulerValue = System.currentTimeMillis() - startTime;
+    this.timeToInitializeSchedulerValue = System.nanoTime() - startTime;
   }
 
   /**
@@ -365,6 +385,7 @@ public class GobblinServiceJobScheduler extends JobScheduler implements SpecCata
    */
   @Override
   public AddSpecResponse onAddSpec(Spec addedSpec) {
+    long startTime = System.nanoTime();
     if (this.helixManager.isPresent() && !this.helixManager.get().isConnected()) {
       // Specs in store will be notified when Scheduler is added as listener to FlowCatalog, so ignore
       // .. Specs if in cluster mode and Helix is not yet initialized
@@ -384,8 +405,10 @@ public class GobblinServiceJobScheduler extends JobScheduler implements SpecCata
     boolean isExplain = flowSpec.isExplain();
     String response = null;
 
+    long compileStartTime = System.nanoTime();
     // always try to compile the flow to verify if it is compilable
     Dag<JobExecutionPlan> dag = this.orchestrator.getSpecCompiler().compileFlow(flowSpec);
+    this.flowCompilationTimeNanosValue = System.nanoTime() - compileStartTime;
     // If dag is null then a compilation error has occurred
     if (dag != null && !dag.isEmpty()) {
       response = dag.toString();
@@ -400,7 +423,6 @@ public class GobblinServiceJobScheduler extends JobScheduler implements SpecCata
           addedSpec, isExplain, compileSuccess, this.isActive);
       return new AddSpecResponse<>(response);
     }
-
     // Check quota limits against adhoc flows before saving the schedule
     // In warm standby mode, this quota check will happen on restli API layer when we accept the flow
     if (!this.warmStandbyEnabled && !jobConfig.containsKey(ConfigurationKeys.JOB_SCHEDULE_KEY)) {
@@ -430,6 +452,7 @@ public class GobblinServiceJobScheduler extends JobScheduler implements SpecCata
           || (this.lastUpdatedTimeForFlowSpec.get(uriString).equals(modificationTime) && !isRunImmediately)) {
         _log.warn("Ignoring the spec {} modified at time {} because we have a more updated version from time {}",
             addedSpec, modificationTime,this.lastUpdatedTimeForFlowSpec.get(uriString));
+        this.addSpecTimeNanosValue = System.nanoTime() - startTime;
         return new AddSpecResponse(response);
       }
     }
@@ -441,11 +464,14 @@ public class GobblinServiceJobScheduler extends JobScheduler implements SpecCata
     if (jobConfig.containsKey(ConfigurationKeys.JOB_SCHEDULE_KEY)) {
       _log.info("{} Scheduling flow spec: {} ", this.serviceName, addedSpec);
       try {
+        long scheduleStartTime = System.nanoTime();
         scheduleJob(jobConfig, null);
+        this.timeToScheduleOneJobValue = System.nanoTime() - scheduleStartTime;
       } catch (JobException je) {
         _log.error("{} Failed to schedule or run FlowSpec {}", serviceName, addedSpec, je);
         this.scheduledFlowSpecs.remove(addedSpec.getUri().toString());
         this.lastUpdatedTimeForFlowSpec.remove(flowSpecUri.toString());
+        this.addSpecTimeNanosValue = System.nanoTime() - startTime;
         return null;
       }
       if (PropertiesUtils.getPropAsBoolean(jobConfig, ConfigurationKeys.FLOW_RUN_IMMEDIATELY, "false")) {
@@ -457,6 +483,7 @@ public class GobblinServiceJobScheduler extends JobScheduler implements SpecCata
       this.jobExecutor.execute(new NonScheduledJobRunner(flowSpecUri, true, jobConfig, null));
     }
 
+    this.addSpecTimeNanosValue = System.nanoTime() - startTime;
     return new AddSpecResponse<>(response);
   }