You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by wl...@apache.org on 2023/03/04 02:42:31 UTC
[gobblin] branch master updated: [GOBBLIN-1793] Add metrics to measure and isolate bottleneck for init… (#3652)
This is an automated email from the ASF dual-hosted git repository.
wlo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 2d3095a53 [GOBBLIN-1793] Add metrics to measure and isolate bottleneck for init… (#3652)
2d3095a53 is described below
commit 2d3095a5366f7e3adf63579c8c237854d5630990
Author: umustafi <um...@gmail.com>
AuthorDate: Fri Mar 3 18:42:24 2023 -0800
[GOBBLIN-1793] Add metrics to measure and isolate bottleneck for init… (#3652)
* [GOBBLIN-1793] Add metrics to measure and isolate bottleneck for initializing scheduler
* remove unecessary checks
* convert millsecond measurement to nanosecond
---------
Co-authored-by: Urmi Mustafi <um...@umustafi-mn1.linkedin.biz>
---
.../gobblin/runtime/metrics/RuntimeMetrics.java | 15 ++++++-
.../scheduler/GobblinServiceJobScheduler.java | 49 +++++++++++++++++-----
2 files changed, 51 insertions(+), 13 deletions(-)
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/metrics/RuntimeMetrics.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/metrics/RuntimeMetrics.java
index b24ef000d..3ad15c933 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/metrics/RuntimeMetrics.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/metrics/RuntimeMetrics.java
@@ -54,9 +54,20 @@ public class RuntimeMetrics {
public static final String GOBBLIN_MYSQL_QUOTA_MANAGER_QUOTA_REQUESTS_EXCEEDED = ServiceMetricNames.GOBBLIN_SERVICE_PREFIX + "gobblin.mysql.quota.manager.quotaRequests.exceeded";
public static final String GOBBLIN_MYSQL_QUOTA_MANAGER_TIME_TO_CHECK_QUOTA = ServiceMetricNames.GOBBLIN_SERVICE_PREFIX + "gobblin.mysql.quota.manager.time.to.check.quota";
- public static final String GOBBLIN_JOB_SCHEDULER_AVERAGE_GET_SPEC_SPEED_WHILE_LOADING_ALL_SPECS_MILLIS = ServiceMetricNames.GOBBLIN_SERVICE_PREFIX + ".jobScheduler.getSpecSpeedDuringStartupAvgMillis";
+ // The following metrics are used to identify the bottlenecks for initializing the job scheduler
+ public static final String GOBBLIN_JOB_SCHEDULER_AVERAGE_GET_SPEC_SPEED_WHILE_LOADING_ALL_SPECS_NANOS = ServiceMetricNames.GOBBLIN_SERVICE_PREFIX + ".jobScheduler.getSpecSpeedDuringStartupAvgNanos";
public static final String GOBBLIN_JOB_SCHEDULER_LOAD_SPECS_BATCH_SIZE = ServiceMetricNames.GOBBLIN_SERVICE_PREFIX + ".jobScheduler.loadSpecBatchSize";
- public static final String GOBBLIN_JOB_SCHEDULER_TIME_TO_INITIALIZE_SCHEDULER_MILLIS = ServiceMetricNames.GOBBLIN_SERVICE_PREFIX + ".jobScheduler.timeToInitializeSchedulerMillis";
+ public static final String
+ GOBBLIN_JOB_SCHEDULER_TIME_TO_INITIALIZE_SCHEDULER_NANOS = ServiceMetricNames.GOBBLIN_SERVICE_PREFIX + ".jobScheduler.timeToInitializeSchedulerNanos";
+ public static final String
+ GOBBLIN_JOB_SCHEDULER_TIME_TO_OBTAIN_SPEC_URIS_NANOS = ServiceMetricNames.GOBBLIN_SERVICE_PREFIX + ".jobScheduler.timeToObtainSpecUrisNanos";
+ public static final String
+ GOBBLIN_JOB_SCHEDULER_INDIVIDUAL_GET_SPEC_SPEED_NANOS = ServiceMetricNames.GOBBLIN_SERVICE_PREFIX + ".jobScheduler.individualGetSpecSpeedNanos";
+ public static final String GOBBLIN_JOB_SCHEDULER_ADD_SPEC_TIME_NANOS = ServiceMetricNames.GOBBLIN_SERVICE_PREFIX + ".jobScheduler.totalAddSpecTimeNanos";
+ public static final String
+ GOBBLIN_JOB_SCHEDULER_FLOW_COMPILATION_TIME_NANOS = ServiceMetricNames.GOBBLIN_SERVICE_PREFIX + ".jobScheduler.specCompilationTimeNanos";
+ public static final String
+ GOBBLIN_JOB_SCHEDULER_TIME_TO_SCHEDULE_ONE_JOB_NANOS = ServiceMetricNames.GOBBLIN_SERVICE_PREFIX + ".jobScheduler.timeToScheduleOneJobNanos";
// Metadata keys
public static final String TOPIC = "topic";
public static final String GROUP_ID = "groupId";
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java
index 54b9a6a26..8c64ed129 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java
@@ -113,9 +113,19 @@ public class GobblinServiceJobScheduler extends JobScheduler implements SpecCata
private String serviceName;
private volatile Long averageGetSpecTimeValue = -1L;
private volatile Long timeToInitializeSchedulerValue = -1L;
- private final ContextAwareGauge averageGetSpecTimeMillis = metricContext.newContextAwareGauge(RuntimeMetrics.GOBBLIN_JOB_SCHEDULER_AVERAGE_GET_SPEC_SPEED_WHILE_LOADING_ALL_SPECS_MILLIS, () -> this.averageGetSpecTimeValue);;
+ private volatile Long timeToObtainSpecUrisNanosValue = -1L;
+ private volatile Long individualGetSpecSpeedNanosValue = -1L;
+ private volatile Long addSpecTimeNanosValue = -1L;
+ private volatile Long flowCompilationTimeNanosValue = -1L;
+ private volatile Long timeToScheduleOneJobValue = -1L;
+ private final ContextAwareGauge averageGetSpecTimeNanos = metricContext.newContextAwareGauge(RuntimeMetrics.GOBBLIN_JOB_SCHEDULER_AVERAGE_GET_SPEC_SPEED_WHILE_LOADING_ALL_SPECS_NANOS, () -> this.averageGetSpecTimeValue);;
private final ContextAwareGauge batchSize = metricContext.newContextAwareGauge(RuntimeMetrics.GOBBLIN_JOB_SCHEDULER_LOAD_SPECS_BATCH_SIZE, () -> this.loadSpecsBatchSize);
- private final ContextAwareGauge timeToInitalizeSchedulerMillis = metricContext.newContextAwareGauge(RuntimeMetrics.GOBBLIN_JOB_SCHEDULER_TIME_TO_INITIALIZE_SCHEDULER_MILLIS, () -> this.timeToInitializeSchedulerValue);
+ private final ContextAwareGauge timeToInitalizeSchedulerNanos = metricContext.newContextAwareGauge(RuntimeMetrics.GOBBLIN_JOB_SCHEDULER_TIME_TO_INITIALIZE_SCHEDULER_NANOS, () -> this.timeToInitializeSchedulerValue);
+ private final ContextAwareGauge timeToObtainSpecUrisNanos = metricContext.newContextAwareGauge(RuntimeMetrics.GOBBLIN_JOB_SCHEDULER_TIME_TO_OBTAIN_SPEC_URIS_NANOS, () -> timeToObtainSpecUrisNanosValue);
+ private final ContextAwareGauge individualGetSpecSpeedNanos = metricContext.newContextAwareGauge(RuntimeMetrics.GOBBLIN_JOB_SCHEDULER_INDIVIDUAL_GET_SPEC_SPEED_NANOS, () -> individualGetSpecSpeedNanosValue);
+ private final ContextAwareGauge addSpecTimeNanos = metricContext.newContextAwareGauge(RuntimeMetrics.GOBBLIN_JOB_SCHEDULER_ADD_SPEC_TIME_NANOS, () -> addSpecTimeNanosValue);
+ private final ContextAwareGauge flowCompilationTimeNanos = metricContext.newContextAwareGauge(RuntimeMetrics.GOBBLIN_JOB_SCHEDULER_FLOW_COMPILATION_TIME_NANOS, () -> flowCompilationTimeNanosValue);
+ private final ContextAwareGauge timeToScheduleOneJob = metricContext.newContextAwareGauge(RuntimeMetrics.GOBBLIN_JOB_SCHEDULER_TIME_TO_SCHEDULE_ONE_JOB_NANOS, () -> timeToScheduleOneJobValue);
private static final MetricContext metricContext = Instrumented.getMetricContext(new org.apache.gobblin.configuration.State(),
GobblinServiceJobScheduler.class);
private static final ContextAwareMeter scheduledFlows = metricContext.contextAwareMeter(ServiceMetricNames.SCHEDULED_FLOW_METER);
@@ -123,7 +133,7 @@ public class GobblinServiceJobScheduler extends JobScheduler implements SpecCata
/**
* If current instances is nominated as a handler for DR traffic from down GaaS-Instance.
- * Note this is, currently, different from leadership change/fail-over handling, where the traffice could come
+ * Note this is, currently, different from leadership change/fail-over handling, where the traffic could come
* from GaaS instance out of current GaaS Cluster:
* e.g. There are multi-datacenter deployment of GaaS Cluster. Intra-datacenter fail-over could be handled by
* leadership change mechanism, while inter-datacenter fail-over would be handled by DR handling mechanism.
@@ -158,11 +168,16 @@ public class GobblinServiceJobScheduler extends JobScheduler implements SpecCata
this.quotaManager = quotaManager;
// Check that these metrics do not exist before adding, mainly for testing purpose which creates multiple instances
// of the scheduler. If one metric exists, then the others should as well.
- MetricFilter filter = MetricFilter.contains(RuntimeMetrics.GOBBLIN_JOB_SCHEDULER_AVERAGE_GET_SPEC_SPEED_WHILE_LOADING_ALL_SPECS_MILLIS);
+ MetricFilter filter = MetricFilter.contains(RuntimeMetrics.GOBBLIN_JOB_SCHEDULER_AVERAGE_GET_SPEC_SPEED_WHILE_LOADING_ALL_SPECS_NANOS);
if (metricContext.getGauges(filter).isEmpty()) {
- metricContext.register(this.averageGetSpecTimeMillis);
+ metricContext.register(this.averageGetSpecTimeNanos);
metricContext.register(this.batchSize);
- metricContext.register(timeToInitalizeSchedulerMillis);
+ metricContext.register(this.timeToInitalizeSchedulerNanos);
+ metricContext.register(this.timeToObtainSpecUrisNanos);
+ metricContext.register(this.individualGetSpecSpeedNanos);
+ metricContext.register(this.addSpecTimeNanos);
+ metricContext.register(this.flowCompilationTimeNanos);
+ metricContext.register(this.timeToScheduleOneJob);
}
}
@@ -237,7 +252,8 @@ public class GobblinServiceJobScheduler extends JobScheduler implements SpecCata
*/
private void scheduleSpecsFromCatalog() {
int numSpecs = this.flowCatalog.get().getSize();
- long startTime = System.currentTimeMillis();
+ _log.info("Scheduling specs from catalog: {} flows to schedule", numSpecs);
+ long startTime = System.nanoTime();
Iterator<URI> uriIterator;
HashSet<URI> urisLeftToSchedule = new HashSet<>();
try {
@@ -248,6 +264,7 @@ public class GobblinServiceJobScheduler extends JobScheduler implements SpecCata
} catch (IOException e) {
throw new RuntimeException(e);
}
+ this.timeToObtainSpecUrisNanosValue = System.nanoTime() - startTime;
try {
// If current instances nominated as DR handler, will take additional URIS from FlowCatalog.
@@ -266,10 +283,10 @@ public class GobblinServiceJobScheduler extends JobScheduler implements SpecCata
long batchGetEndTime;
while (startOffset < numSpecs) {
- batchGetStartTime = System.currentTimeMillis();
+ batchGetStartTime = System.nanoTime();
Collection<Spec> batchOfSpecs = this.flowCatalog.get().getSpecsPaginated(startOffset, this.loadSpecsBatchSize);
Iterator<Spec> batchOfSpecsIterator = batchOfSpecs.iterator();
- batchGetEndTime = System.currentTimeMillis();
+ batchGetEndTime = System.nanoTime();
while (batchOfSpecsIterator.hasNext()) {
Spec spec = batchOfSpecsIterator.next();
@@ -289,10 +306,13 @@ public class GobblinServiceJobScheduler extends JobScheduler implements SpecCata
// Ensure we did not miss any specs due to ordering changing (deletions/insertions) while loading
Iterator<URI> urisLeft = urisLeftToSchedule.iterator();
+ long individualGetSpecStartTime;
while (urisLeft.hasNext()) {
URI uri = urisLeft.next();
try {
+ individualGetSpecStartTime = System.nanoTime();
Spec spec = this.flowCatalog.get().getSpecWrapper(uri);
+ this.individualGetSpecSpeedNanosValue = System.nanoTime() - individualGetSpecStartTime;
addSpecHelperMethod(spec);
} catch (Exception e) {
// If there is an uncaught error thrown during compilation, log it and continue adding flows
@@ -302,7 +322,7 @@ public class GobblinServiceJobScheduler extends JobScheduler implements SpecCata
}
this.flowCatalog.get().getMetrics().updateGetSpecTime(startTime);
- this.timeToInitializeSchedulerValue = System.currentTimeMillis() - startTime;
+ this.timeToInitializeSchedulerValue = System.nanoTime() - startTime;
}
/**
@@ -365,6 +385,7 @@ public class GobblinServiceJobScheduler extends JobScheduler implements SpecCata
*/
@Override
public AddSpecResponse onAddSpec(Spec addedSpec) {
+ long startTime = System.nanoTime();
if (this.helixManager.isPresent() && !this.helixManager.get().isConnected()) {
// Specs in store will be notified when Scheduler is added as listener to FlowCatalog, so ignore
// .. Specs if in cluster mode and Helix is not yet initialized
@@ -384,8 +405,10 @@ public class GobblinServiceJobScheduler extends JobScheduler implements SpecCata
boolean isExplain = flowSpec.isExplain();
String response = null;
+ long compileStartTime = System.nanoTime();
// always try to compile the flow to verify if it is compilable
Dag<JobExecutionPlan> dag = this.orchestrator.getSpecCompiler().compileFlow(flowSpec);
+ this.flowCompilationTimeNanosValue = System.nanoTime() - compileStartTime;
// If dag is null then a compilation error has occurred
if (dag != null && !dag.isEmpty()) {
response = dag.toString();
@@ -400,7 +423,6 @@ public class GobblinServiceJobScheduler extends JobScheduler implements SpecCata
addedSpec, isExplain, compileSuccess, this.isActive);
return new AddSpecResponse<>(response);
}
-
// Check quota limits against adhoc flows before saving the schedule
// In warm standby mode, this quota check will happen on restli API layer when we accept the flow
if (!this.warmStandbyEnabled && !jobConfig.containsKey(ConfigurationKeys.JOB_SCHEDULE_KEY)) {
@@ -430,6 +452,7 @@ public class GobblinServiceJobScheduler extends JobScheduler implements SpecCata
|| (this.lastUpdatedTimeForFlowSpec.get(uriString).equals(modificationTime) && !isRunImmediately)) {
_log.warn("Ignoring the spec {} modified at time {} because we have a more updated version from time {}",
addedSpec, modificationTime,this.lastUpdatedTimeForFlowSpec.get(uriString));
+ this.addSpecTimeNanosValue = System.nanoTime() - startTime;
return new AddSpecResponse(response);
}
}
@@ -441,11 +464,14 @@ public class GobblinServiceJobScheduler extends JobScheduler implements SpecCata
if (jobConfig.containsKey(ConfigurationKeys.JOB_SCHEDULE_KEY)) {
_log.info("{} Scheduling flow spec: {} ", this.serviceName, addedSpec);
try {
+ long scheduleStartTime = System.nanoTime();
scheduleJob(jobConfig, null);
+ this.timeToScheduleOneJobValue = System.nanoTime() - scheduleStartTime;
} catch (JobException je) {
_log.error("{} Failed to schedule or run FlowSpec {}", serviceName, addedSpec, je);
this.scheduledFlowSpecs.remove(addedSpec.getUri().toString());
this.lastUpdatedTimeForFlowSpec.remove(flowSpecUri.toString());
+ this.addSpecTimeNanosValue = System.nanoTime() - startTime;
return null;
}
if (PropertiesUtils.getPropAsBoolean(jobConfig, ConfigurationKeys.FLOW_RUN_IMMEDIATELY, "false")) {
@@ -457,6 +483,7 @@ public class GobblinServiceJobScheduler extends JobScheduler implements SpecCata
this.jobExecutor.execute(new NonScheduledJobRunner(flowSpecUri, true, jobConfig, null));
}
+ this.addSpecTimeNanosValue = System.nanoTime() - startTime;
return new AddSpecResponse<>(response);
}