You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by su...@apache.org on 2020/01/17 21:16:27 UTC
[incubator-gobblin] branch master updated: [GOBBLIN-1027] add
metrics for users running gaas jobs
This is an automated email from the ASF dual-hosted git repository.
suvasude pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new cd53dcf [GOBBLIN-1027] add metrics for users running gaas jobs
cd53dcf is described below
commit cd53dcfa2c046f53db29689e1d687bf4412565aa
Author: Arjun <ab...@linkedin.com>
AuthorDate: Fri Jan 17 13:16:20 2020 -0800
[GOBBLIN-1027] add metrics for users running gaas jobs
Closes #2870 from arjun4084346/gaasUsersMetrics
---
.../apache/gobblin/metrics/ServiceMetricNames.java | 1 +
.../orchestration/AzkabanProjectConfig.java | 5 ++--
.../apache/gobblin/runtime/api/SpecCatalog.java | 2 +-
.../service/modules/orchestration/DagManager.java | 33 ++++++++++++++++++++++
.../modules/flow/MultiHopFlowCompilerTest.java | 3 +-
5 files changed, 39 insertions(+), 5 deletions(-)
diff --git a/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/ServiceMetricNames.java b/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/ServiceMetricNames.java
index 3a8ecaa..9e79c61 100644
--- a/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/ServiceMetricNames.java
+++ b/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/ServiceMetricNames.java
@@ -37,5 +37,6 @@ public class ServiceMetricNames {
public static final String RUN_IMMEDIATELY_FLOW_METER = "RunImmediatelyFlow";
public static final String RUNNING_FLOWS_COUNTER = "RunningFlows";
+ public static final String SERVICE_USERS = "ServiceUsers";
public static final String COMPILED = "Compiled";
}
diff --git a/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanProjectConfig.java b/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanProjectConfig.java
index 4b8bb9c..3ad9785 100644
--- a/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanProjectConfig.java
+++ b/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanProjectConfig.java
@@ -40,21 +40,20 @@ import com.typesafe.config.ConfigFactory;
*/
public class AzkabanProjectConfig {
private final String azkabanServerUrl;
-
private final String azkabanProjectName;
private final String azkabanProjectDescription;
private final String azkabanProjectFlowName;
private final String azkabanGroupAdminUsers;
private final Optional<String> azkabanUserToProxy;
-
private final Optional<List<String>> azkabanZipJarNames;
private final Optional<String> azkabanZipJarUrlTemplate;
private final Optional<String> azkabanZipJarVersion;
private final Optional<List<String>> azkabanZipAdditionalFiles;
private final Boolean failIfJarNotFound;
-
private final JobSpec jobSpec;
+ public static final String USER_TO_PROXY = "user.to.proxy";
+
public AzkabanProjectConfig(JobSpec jobSpec) {
// Extract config objects
this.jobSpec = jobSpec;
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/SpecCatalog.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/SpecCatalog.java
index d5c1e35..cc55cc4 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/SpecCatalog.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/SpecCatalog.java
@@ -119,7 +119,7 @@ public interface SpecCatalog extends SpecCatalogListenersContainer, Instrumentab
}
public void updateGetSpecTime(long startTime) {
- log.info("updateGetSpecTime...");
+ log.debug("updateGetSpecTime...");
Instrumented.updateTimer(Optional.of(this.timeForSpecCatalogGet), System.currentTimeMillis() - startTime, TimeUnit.MILLISECONDS);
}
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
index 62b1cfc..1dff94a 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
@@ -36,6 +36,8 @@ import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
+import org.apache.commons.lang3.StringUtils;
+
import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.Timer;
import com.google.common.base.Optional;
@@ -63,6 +65,8 @@ import org.apache.gobblin.runtime.api.SpecProducer;
import org.apache.gobblin.runtime.api.TopologySpec;
import org.apache.gobblin.service.ExecutionStatus;
import org.apache.gobblin.service.FlowConfigResourceLocalHandler;
+import org.apache.gobblin.service.RequesterService;
+import org.apache.gobblin.service.ServiceRequester;
import org.apache.gobblin.service.modules.flowgraph.Dag;
import org.apache.gobblin.service.modules.flowgraph.Dag.DagNode;
import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
@@ -701,6 +705,7 @@ public class DagManager extends AbstractIdleService {
if (this.metricContext != null) {
getRunningJobsCounter(dagNode).inc();
+ getRunningJobsCounterForUser(dagNode).forEach(counter -> counter.inc());
}
addSpecFuture.get();
@@ -738,6 +743,7 @@ public class DagManager extends AbstractIdleService {
if (this.metricContext != null) {
getRunningJobsCounter(dagNode).dec();
+ getRunningJobsCounterForUser(dagNode).forEach(counter -> counter.dec());
}
switch (jobStatus) {
@@ -788,6 +794,33 @@ public class DagManager extends AbstractIdleService {
dagNode.getValue().getSpecExecutor().getUri().toString()));
}
+ private List<ContextAwareCounter> getRunningJobsCounterForUser(DagNode<JobExecutionPlan> dagNode) {
+ Config configs = dagNode.getValue().getJobSpec().getConfig();
+ String proxy = ConfigUtils.getString(configs, AzkabanProjectConfig.USER_TO_PROXY, null);
+ List<ContextAwareCounter> counters = new ArrayList<>();
+
+ if (StringUtils.isNotEmpty(proxy)) {
+ counters.add(metricContext.contextAwareCounter(
+ MetricRegistry.name(
+ MetricReportUtils.GOBBLIN_SERVICE_METRICS_PREFIX,
+ ServiceMetricNames.SERVICE_USERS, proxy)));
+ }
+
+ try {
+ String serializedRequesters = ConfigUtils.getString(configs, RequesterService.REQUESTER_LIST, null);
+ if (StringUtils.isNotEmpty(serializedRequesters)) {
+ List<ServiceRequester> requesters = RequesterService.deserialize(serializedRequesters);
+ for (ServiceRequester requester : requesters) {
+ counters.add(metricContext.contextAwareCounter(MetricRegistry
+ .name(MetricReportUtils.GOBBLIN_SERVICE_METRICS_PREFIX, ServiceMetricNames.SERVICE_USERS, requester.getName())));
+ }
+ }
+ } catch (IOException e) {
+ log.error("Error while fetching requester list.", e);
+ }
+
+ return counters;
+ }
/**
* Perform clean up. Remove a dag from the dagstore if the dag is complete and update internal state.
*/
diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompilerTest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompilerTest.java
index 644b75a..c03aa39 100644
--- a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompilerTest.java
+++ b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompilerTest.java
@@ -84,6 +84,7 @@ import org.apache.gobblin.service.modules.flowgraph.FlowEdge;
import org.apache.gobblin.service.modules.flowgraph.FlowEdgeFactory;
import org.apache.gobblin.service.modules.flowgraph.FlowGraph;
import org.apache.gobblin.service.modules.flowgraph.FlowGraphConfigurationKeys;
+import org.apache.gobblin.service.modules.orchestration.AzkabanProjectConfig;
import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
import org.apache.gobblin.service.modules.template_catalog.FSFlowTemplateCatalog;
import org.apache.gobblin.util.CompletedFuture;
@@ -648,7 +649,7 @@ public class MultiHopFlowCompilerTest {
Assert.assertTrue(dag.isEmpty());
Assert.assertEquals(spec.getCompilationErrors().size(), 1);
- Assert.assertTrue(spec.getCompilationErrors().iterator().next().contains("user.to.proxy"));
+ Assert.assertTrue(spec.getCompilationErrors().iterator().next().contains(AzkabanProjectConfig.USER_TO_PROXY));
}
@Test (dependsOnMethods = "testUnresolvedFlow")