You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by su...@apache.org on 2020/04/23 20:14:39 UTC
[incubator-gobblin] branch master updated:
[GOBBLIN-1123][GOBBLIN-1124][GOBBLIN-1115] Report orchestration delay for
Gobblin Service flows
This is an automated email from the ASF dual-hosted git repository.
suvasude pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new a52e1a8 [GOBBLIN-1123][GOBBLIN-1124][GOBBLIN-1115] Report orchestration delay for Gobblin Service flows
a52e1a8 is described below
commit a52e1a8fe0958a813e4c1967879cc17a12b444ce
Author: sv2000 <su...@gmail.com>
AuthorDate: Thu Apr 23 13:14:31 2020 -0700
[GOBBLIN-1123][GOBBLIN-1124][GOBBLIN-1115] Report orchestration delay for Gobblin Service flows
Closes #2963 from sv2000/gaasOrchestrationDelay
---
.../metrics/reporter/util/MetricReportUtils.java | 5 -----
.../apache/gobblin/metrics/ServiceMetricNames.java | 14 +++++++-----
.../service/FlowConfigResourceLocalHandler.java | 19 +++++++++-------
.../gobblin/runtime/AbstractJobLauncher.java | 5 ++---
.../modules/core/GobblinServiceManager.java | 14 ++++++------
.../service/modules/orchestration/DagManager.java | 25 ++++++++++++++++++----
.../modules/orchestration/Orchestrator.java | 3 +--
7 files changed, 50 insertions(+), 35 deletions(-)
diff --git a/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/reporter/util/MetricReportUtils.java b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/reporter/util/MetricReportUtils.java
index b51dcc6..b9eefec 100644
--- a/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/reporter/util/MetricReportUtils.java
+++ b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/reporter/util/MetricReportUtils.java
@@ -41,11 +41,6 @@ public class MetricReportUtils {
public static final int SCHEMA_VERSION = 1;
private static Optional<SpecificDatumReader<MetricReport>> READER = Optional.absent();
- // These prefixes can be used to distinguish metrics reported by GobblinService from other metrics reported by Gobblin
- // This can be used in conjunction with MetricNameRegexFilter to filter out metrics in any MetricReporter
- public static final String GOBBLIN_SERVICE_METRICS_PREFIX = "GobblinService";
- public static final String GOBBLIN_JOB_METRICS_PREFIX = "JobMetrics";
-
/**
* Parses a {@link org.apache.gobblin.metrics.MetricReport} from a byte array representing a json input.
* @param reuse MetricReport to reuse.
diff --git a/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/ServiceMetricNames.java b/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/ServiceMetricNames.java
index 9fe3515..89d62a4 100644
--- a/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/ServiceMetricNames.java
+++ b/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/ServiceMetricNames.java
@@ -17,7 +17,10 @@
package org.apache.gobblin.metrics;
public class ServiceMetricNames {
- private static final String GOBBLIN_SERVICE_PREFIX = "gobblin.service.";
+ // These prefixes can be used to distinguish metrics reported by GobblinService from other metrics reported by Gobblin
+ // This can be used in conjunction with MetricNameRegexFilter to filter out metrics in any MetricReporter
+ public static final String GOBBLIN_SERVICE_PREFIX = "GobblinService";
+ public static final String GOBBLIN_JOB_METRICS_PREFIX = "JobMetrics";
// Flow Compilation Meters and Timer
public static final String FLOW_COMPILATION_SUCCESSFUL_METER = GOBBLIN_SERVICE_PREFIX + "flowCompilation.successful";
@@ -26,12 +29,13 @@ public class ServiceMetricNames {
public static final String DATA_AUTHORIZATION_TIMER = GOBBLIN_SERVICE_PREFIX + "flowCompilation.dataAuthorization.time";
// Flow Orchestration Meters and Timer
- public static final String FLOW_ORCHESTRATION_SUCCESSFUL_METER = GOBBLIN_SERVICE_PREFIX + "flowOrchestration.successful";
- public static final String FLOW_ORCHESTRATION_FAILED_METER = GOBBLIN_SERVICE_PREFIX + "flowOrchestration.failed";
- public static final String FLOW_ORCHESTRATION_TIMER = GOBBLIN_SERVICE_PREFIX + "flowOrchestration.time";
+ public static final String FLOW_ORCHESTRATION_SUCCESSFUL_METER = GOBBLIN_SERVICE_PREFIX + ".flowOrchestration.successful";
+ public static final String FLOW_ORCHESTRATION_FAILED_METER = GOBBLIN_SERVICE_PREFIX + ".flowOrchestration.failed";
+ public static final String FLOW_ORCHESTRATION_TIMER = GOBBLIN_SERVICE_PREFIX + ".flowOrchestration.time";
+ public static final String FLOW_ORCHESTRATION_DELAY = GOBBLIN_SERVICE_PREFIX + ".flowOrchestration.delay";
//Job status poll timer
- public static final String JOB_STATUS_POLLED_TIMER = GOBBLIN_SERVICE_PREFIX + "jobStatusPoll.time";
+ public static final String JOB_STATUS_POLLED_TIMER = GOBBLIN_SERVICE_PREFIX + ".jobStatusPoll.time";
public static final String CREATE_FLOW_METER = "CreateFlow";
public static final String DELETE_FLOW_METER = "DeleteFlow";
diff --git a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigResourceLocalHandler.java b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigResourceLocalHandler.java
index 0534887..6885db4 100644
--- a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigResourceLocalHandler.java
+++ b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigResourceLocalHandler.java
@@ -17,6 +17,12 @@
package org.apache.gobblin.service;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.Properties;
+
+import org.apache.commons.lang.StringUtils;
+
import com.codahale.metrics.MetricRegistry;
import com.google.common.collect.Maps;
import com.linkedin.data.template.StringMap;
@@ -29,19 +35,16 @@ import com.linkedin.restli.server.RestLiServiceException;
import com.linkedin.restli.server.UpdateResponse;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
-import java.net.URI;
-import java.net.URISyntaxException;
-import java.util.Properties;
+
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
-import org.apache.commons.lang.StringUtils;
+
import org.apache.gobblin.config.ConfigBuilder;
import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.instrumented.Instrumented;
import org.apache.gobblin.metrics.ContextAwareMeter;
import org.apache.gobblin.metrics.MetricContext;
import org.apache.gobblin.metrics.ServiceMetricNames;
-import org.apache.gobblin.metrics.reporter.util.MetricReportUtils;
import org.apache.gobblin.runtime.api.FlowSpec;
import org.apache.gobblin.runtime.api.SpecNotFoundException;
import org.apache.gobblin.runtime.spec_catalog.FlowCatalog;
@@ -64,11 +67,11 @@ public class FlowConfigResourceLocalHandler implements FlowConfigsResourceHandle
this.flowCatalog = flowCatalog;
MetricContext metricContext = Instrumented.getMetricContext(ConfigUtils.configToState(ConfigFactory.empty()), getClass());
this.createFlow = metricContext.contextAwareMeter(
- MetricRegistry.name(MetricReportUtils.GOBBLIN_SERVICE_METRICS_PREFIX, ServiceMetricNames.CREATE_FLOW_METER));
+ MetricRegistry.name(ServiceMetricNames.GOBBLIN_SERVICE_PREFIX, ServiceMetricNames.CREATE_FLOW_METER));
this.deleteFlow = metricContext.contextAwareMeter(
- MetricRegistry.name(MetricReportUtils.GOBBLIN_SERVICE_METRICS_PREFIX, ServiceMetricNames.DELETE_FLOW_METER));
+ MetricRegistry.name(ServiceMetricNames.GOBBLIN_SERVICE_PREFIX, ServiceMetricNames.DELETE_FLOW_METER));
this.runImmediatelyFlow = metricContext.contextAwareMeter(
- MetricRegistry.name(MetricReportUtils.GOBBLIN_SERVICE_METRICS_PREFIX, ServiceMetricNames.RUN_IMMEDIATELY_FLOW_METER));
+ MetricRegistry.name(ServiceMetricNames.GOBBLIN_SERVICE_PREFIX, ServiceMetricNames.RUN_IMMEDIATELY_FLOW_METER));
}
/**
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/AbstractJobLauncher.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/AbstractJobLauncher.java
index 7d3081b..98249aa 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/AbstractJobLauncher.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/AbstractJobLauncher.java
@@ -64,13 +64,12 @@ import org.apache.gobblin.metrics.ContextAwareGauge;
import org.apache.gobblin.metrics.GobblinMetrics;
import org.apache.gobblin.metrics.GobblinMetricsRegistry;
import org.apache.gobblin.metrics.MetricContext;
-import org.apache.gobblin.metrics.RootMetricContext;
+import org.apache.gobblin.metrics.ServiceMetricNames;
import org.apache.gobblin.metrics.Tag;
import org.apache.gobblin.metrics.event.EventName;
import org.apache.gobblin.metrics.event.EventSubmitter;
import org.apache.gobblin.metrics.event.JobEvent;
import org.apache.gobblin.metrics.event.TimingEvent;
-import org.apache.gobblin.metrics.reporter.util.MetricReportUtils;
import org.apache.gobblin.runtime.api.EventMetadataGenerator;
import org.apache.gobblin.runtime.api.JobSpec;
import org.apache.gobblin.runtime.api.JobTemplate;
@@ -417,7 +416,7 @@ public abstract class AbstractJobLauncher implements JobLauncher {
if (this.runtimeMetricContext.isPresent()) {
String workunitCreationGaugeName = MetricRegistry
- .name(MetricReportUtils.GOBBLIN_JOB_METRICS_PREFIX, TimingEvent.LauncherTimings.WORK_UNITS_CREATION,
+ .name(ServiceMetricNames.GOBBLIN_JOB_METRICS_PREFIX, TimingEvent.LauncherTimings.WORK_UNITS_CREATION,
jobState.getJobName());
long workUnitsCreationTime = workUnitsCreationTimer.getDuration() / TimeUnit.SECONDS.toMillis(1);
ContextAwareGauge<Integer> workunitCreationGauge = this.runtimeMetricContext.get()
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceManager.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceManager.java
index 01bdb4d..fefbfa2 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceManager.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceManager.java
@@ -17,9 +17,6 @@
package org.apache.gobblin.service.modules.core;
-import lombok.Getter;
-import lombok.Setter;
-
import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.net.URI;
@@ -28,9 +25,6 @@ import java.util.Map;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.DefaultParser;
import org.apache.commons.cli.HelpFormatter;
@@ -44,6 +38,8 @@ import org.apache.helix.ControllerChangeListener;
import org.apache.helix.HelixManager;
import org.apache.helix.NotificationContext;
import org.apache.helix.model.Message;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import com.codahale.metrics.MetricRegistry;
import com.google.common.annotations.VisibleForTesting;
@@ -64,6 +60,9 @@ import com.linkedin.restli.server.resources.BaseResource;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
+import lombok.Getter;
+import lombok.Setter;
+
import org.apache.gobblin.annotation.Alpha;
import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.instrumented.Instrumented;
@@ -72,7 +71,6 @@ import org.apache.gobblin.metrics.ContextAwareGauge;
import org.apache.gobblin.metrics.ContextAwareHistogram;
import org.apache.gobblin.metrics.MetricContext;
import org.apache.gobblin.metrics.ServiceMetricNames;
-import org.apache.gobblin.metrics.reporter.util.MetricReportUtils;
import org.apache.gobblin.restli.EmbeddedRestliServer;
import org.apache.gobblin.runtime.api.TopologySpec;
import org.apache.gobblin.runtime.app.ApplicationException;
@@ -226,7 +224,7 @@ public class GobblinServiceManager implements ApplicationLauncher, StandardMetri
// Initialize Helix leader guage
helixLeaderGauges = Optional.of(new HelixLeaderState());
String helixLeaderStateGaugeName =
- MetricRegistry.name(MetricReportUtils.GOBBLIN_SERVICE_METRICS_PREFIX, ServiceMetricNames.HELIX_LEADER_STATE);
+ MetricRegistry.name(ServiceMetricNames.GOBBLIN_SERVICE_PREFIX, ServiceMetricNames.HELIX_LEADER_STATE);
ContextAwareGauge<Integer> gauge = metricContext.newContextAwareGauge(helixLeaderStateGaugeName, () -> helixLeaderGauges.get().state.getValue());
metricContext.register(helixLeaderStateGaugeName, gauge);
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
index a730787..e2a11b6 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
@@ -36,6 +36,7 @@ import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.lang3.StringUtils;
@@ -56,11 +57,11 @@ import org.apache.gobblin.annotation.Alpha;
import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.instrumented.Instrumented;
import org.apache.gobblin.metrics.ContextAwareCounter;
+import org.apache.gobblin.metrics.ContextAwareGauge;
import org.apache.gobblin.metrics.MetricContext;
import org.apache.gobblin.metrics.ServiceMetricNames;
import org.apache.gobblin.metrics.event.EventSubmitter;
import org.apache.gobblin.metrics.event.TimingEvent;
-import org.apache.gobblin.metrics.reporter.util.MetricReportUtils;
import org.apache.gobblin.runtime.api.JobSpec;
import org.apache.gobblin.runtime.api.Spec;
import org.apache.gobblin.runtime.api.SpecProducer;
@@ -389,6 +390,7 @@ public class DagManager extends AbstractIdleService {
private final Optional<Timer> jobStatusPolledTimer;
private final int defaultQuota;
private final Map<String, Integer> perUserQuota;
+ private final AtomicLong orchestrationDelay = new AtomicLong(0);
private JobStatusRetriever jobStatusRetriever;
private DagStateStore dagStateStore;
@@ -411,6 +413,9 @@ public class DagManager extends AbstractIdleService {
this.metricContext = Instrumented.getMetricContext(ConfigUtils.configToState(ConfigFactory.empty()), getClass());
this.eventSubmitter = Optional.of(new EventSubmitter.Builder(this.metricContext, "org.apache.gobblin.service").build());
this.jobStatusPolledTimer = Optional.of(this.metricContext.timer(ServiceMetricNames.JOB_STATUS_POLLED_TIMER));
+ ContextAwareGauge<Long> orchestrationDelayMetric = metricContext.newContextAwareGauge(ServiceMetricNames.FLOW_ORCHESTRATION_DELAY,
+ () -> orchestrationDelay.get());
+ this.metricContext.register(orchestrationDelayMetric);
} else {
this.metricContext = null;
this.eventSubmitter = Optional.absent();
@@ -511,13 +516,18 @@ public class DagManager extends AbstractIdleService {
this.dags.put(dagId, dag);
log.debug("Dag {} - determining if any jobs are already running.", DagManagerUtils.getFullyQualifiedDagName(dag));
+
+ //A flag to indicate if the flow is already running.
+ boolean isDagRunning = false;
//Are there any jobs already in the running state? This check is for Dags already running
//before a leadership change occurs.
for (DagNode<JobExecutionPlan> dagNode : dag.getNodes()) {
if (DagManagerUtils.getExecutionStatus(dagNode) == RUNNING) {
addJobState(dagId, dagNode);
+ isDagRunning = true;
}
}
+
log.debug("Dag {} submitting jobs ready for execution.", DagManagerUtils.getFullyQualifiedDagName(dag));
//Determine the next set of jobs to run and submit them for execution
Map<String, Set<DagNode<JobExecutionPlan>>> nextSubmitted = submitNext(dagId);
@@ -528,6 +538,13 @@ public class DagManager extends AbstractIdleService {
// Set flow status to running
DagManagerUtils.emitFlowEvent(this.eventSubmitter, dag, TimingEvent.FlowTimings.FLOW_RUNNING);
+ // Report the orchestration delay the first time the Dag is initialized. Orchestration delay is defined as
+ // the time difference between the instant when a flow first transitions to the running state and the instant
+ // when the flow is submitted to Gobblin service.
+ if (!isDagRunning) {
+ this.orchestrationDelay.set(System.currentTimeMillis() - DagManagerUtils.getFlowExecId(dag));
+ }
+
log.info("Dag {} Initialization complete.", DagManagerUtils.getFullyQualifiedDagName(dag));
}
@@ -914,7 +931,7 @@ public class DagManager extends AbstractIdleService {
private ContextAwareCounter getRunningJobsCounter(DagNode<JobExecutionPlan> dagNode) {
return metricContext.contextAwareCounter(
MetricRegistry.name(
- MetricReportUtils.GOBBLIN_SERVICE_METRICS_PREFIX,
+ ServiceMetricNames.GOBBLIN_SERVICE_PREFIX,
ServiceMetricNames.RUNNING_FLOWS_COUNTER,
dagNode.getValue().getSpecExecutor().getUri().toString()));
}
@@ -927,7 +944,7 @@ public class DagManager extends AbstractIdleService {
if (StringUtils.isNotEmpty(proxy)) {
counters.add(metricContext.contextAwareCounter(
MetricRegistry.name(
- MetricReportUtils.GOBBLIN_SERVICE_METRICS_PREFIX,
+ ServiceMetricNames.GOBBLIN_SERVICE_PREFIX,
ServiceMetricNames.SERVICE_USERS, proxy)));
}
@@ -937,7 +954,7 @@ public class DagManager extends AbstractIdleService {
List<ServiceRequester> requesters = RequesterService.deserialize(serializedRequesters);
for (ServiceRequester requester : requesters) {
counters.add(metricContext.contextAwareCounter(MetricRegistry
- .name(MetricReportUtils.GOBBLIN_SERVICE_METRICS_PREFIX, ServiceMetricNames.SERVICE_USERS, requester.getName())));
+ .name(ServiceMetricNames.GOBBLIN_SERVICE_PREFIX, ServiceMetricNames.SERVICE_USERS, requester.getName())));
}
}
} catch (IOException e) {
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
index b7b6593..cefd513 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
@@ -54,7 +54,6 @@ import org.apache.gobblin.metrics.ServiceMetricNames;
import org.apache.gobblin.metrics.Tag;
import org.apache.gobblin.metrics.event.EventSubmitter;
import org.apache.gobblin.metrics.event.TimingEvent;
-import org.apache.gobblin.metrics.reporter.util.MetricReportUtils;
import org.apache.gobblin.runtime.api.FlowSpec;
import org.apache.gobblin.runtime.api.JobSpec;
import org.apache.gobblin.runtime.api.Spec;
@@ -230,7 +229,7 @@ public class Orchestrator implements SpecCatalogListener, Instrumentable {
String flowName = flowConfig.getString(ConfigurationKeys.FLOW_NAME_KEY);
if (!flowGauges.containsKey(spec.getUri().toString())) {
- String flowCompiledGaugeName = MetricRegistry.name(MetricReportUtils.GOBBLIN_SERVICE_METRICS_PREFIX, flowGroup, flowName, ServiceMetricNames.COMPILED);
+ String flowCompiledGaugeName = MetricRegistry.name(ServiceMetricNames.GOBBLIN_SERVICE_PREFIX, flowGroup, flowName, ServiceMetricNames.COMPILED);
flowGauges.put(spec.getUri().toString(), new FlowCompiledState());
ContextAwareGauge<Integer> gauge = RootMetricContext.get().newContextAwareGauge(flowCompiledGaugeName, () -> flowGauges.get(spec.getUri().toString()).state.value);
RootMetricContext.get().register(flowCompiledGaugeName, gauge);