You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by su...@apache.org on 2020/01/17 21:16:27 UTC

[incubator-gobblin] branch master updated: [GOBBLIN-1027] add metrics for users running gaas jobs

This is an automated email from the ASF dual-hosted git repository.

suvasude pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new cd53dcf  [GOBBLIN-1027] add metrics for users running gaas jobs
cd53dcf is described below

commit cd53dcfa2c046f53db29689e1d687bf4412565aa
Author: Arjun <ab...@linkedin.com>
AuthorDate: Fri Jan 17 13:16:20 2020 -0800

    [GOBBLIN-1027] add metrics for users running gaas jobs
    
    Closes #2870 from arjun4084346/gaasUsersMetrics
---
 .../apache/gobblin/metrics/ServiceMetricNames.java |  1 +
 .../orchestration/AzkabanProjectConfig.java        |  5 ++--
 .../apache/gobblin/runtime/api/SpecCatalog.java    |  2 +-
 .../service/modules/orchestration/DagManager.java  | 33 ++++++++++++++++++++++
 .../modules/flow/MultiHopFlowCompilerTest.java     |  3 +-
 5 files changed, 39 insertions(+), 5 deletions(-)

diff --git a/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/ServiceMetricNames.java b/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/ServiceMetricNames.java
index 3a8ecaa..9e79c61 100644
--- a/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/ServiceMetricNames.java
+++ b/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/ServiceMetricNames.java
@@ -37,5 +37,6 @@ public class ServiceMetricNames {
   public static final String RUN_IMMEDIATELY_FLOW_METER = "RunImmediatelyFlow";
 
   public static final String RUNNING_FLOWS_COUNTER = "RunningFlows";
+  public static final String SERVICE_USERS = "ServiceUsers";
   public static final String COMPILED = "Compiled";
 }
diff --git a/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanProjectConfig.java b/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanProjectConfig.java
index 4b8bb9c..3ad9785 100644
--- a/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanProjectConfig.java
+++ b/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanProjectConfig.java
@@ -40,21 +40,20 @@ import com.typesafe.config.ConfigFactory;
  */
 public class AzkabanProjectConfig {
   private final String azkabanServerUrl;
-
   private final String azkabanProjectName;
   private final String azkabanProjectDescription;
   private final String azkabanProjectFlowName;
   private final String azkabanGroupAdminUsers;
   private final Optional<String> azkabanUserToProxy;
-
   private final Optional<List<String>> azkabanZipJarNames;
   private final Optional<String> azkabanZipJarUrlTemplate;
   private final Optional<String> azkabanZipJarVersion;
   private final Optional<List<String>> azkabanZipAdditionalFiles;
   private final Boolean failIfJarNotFound;
-
   private final JobSpec jobSpec;
 
+  public static final String USER_TO_PROXY = "user.to.proxy";
+
   public AzkabanProjectConfig(JobSpec jobSpec) {
     // Extract config objects
     this.jobSpec = jobSpec;
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/SpecCatalog.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/SpecCatalog.java
index d5c1e35..cc55cc4 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/SpecCatalog.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/SpecCatalog.java
@@ -119,7 +119,7 @@ public interface SpecCatalog extends SpecCatalogListenersContainer, Instrumentab
     }
 
     public void updateGetSpecTime(long startTime) {
-      log.info("updateGetSpecTime...");
+      log.debug("updateGetSpecTime...");
       Instrumented.updateTimer(Optional.of(this.timeForSpecCatalogGet), System.currentTimeMillis() - startTime, TimeUnit.MILLISECONDS);
     }
 
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
index 62b1cfc..1dff94a 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
@@ -36,6 +36,8 @@ import java.util.concurrent.LinkedBlockingDeque;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 
+import org.apache.commons.lang3.StringUtils;
+
 import com.codahale.metrics.MetricRegistry;
 import com.codahale.metrics.Timer;
 import com.google.common.base.Optional;
@@ -63,6 +65,8 @@ import org.apache.gobblin.runtime.api.SpecProducer;
 import org.apache.gobblin.runtime.api.TopologySpec;
 import org.apache.gobblin.service.ExecutionStatus;
 import org.apache.gobblin.service.FlowConfigResourceLocalHandler;
+import org.apache.gobblin.service.RequesterService;
+import org.apache.gobblin.service.ServiceRequester;
 import org.apache.gobblin.service.modules.flowgraph.Dag;
 import org.apache.gobblin.service.modules.flowgraph.Dag.DagNode;
 import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
@@ -701,6 +705,7 @@ public class DagManager extends AbstractIdleService {
 
         if (this.metricContext != null) {
           getRunningJobsCounter(dagNode).inc();
+          getRunningJobsCounterForUser(dagNode).forEach(counter -> counter.inc());
         }
 
         addSpecFuture.get();
@@ -738,6 +743,7 @@ public class DagManager extends AbstractIdleService {
 
       if (this.metricContext != null) {
         getRunningJobsCounter(dagNode).dec();
+        getRunningJobsCounterForUser(dagNode).forEach(counter -> counter.dec());
       }
 
       switch (jobStatus) {
@@ -788,6 +794,33 @@ public class DagManager extends AbstractIdleService {
               dagNode.getValue().getSpecExecutor().getUri().toString()));
     }
 
+    private List<ContextAwareCounter> getRunningJobsCounterForUser(DagNode<JobExecutionPlan> dagNode) {
+      Config configs = dagNode.getValue().getJobSpec().getConfig();
+      String proxy = ConfigUtils.getString(configs, AzkabanProjectConfig.USER_TO_PROXY, null);
+      List<ContextAwareCounter> counters = new ArrayList<>();
+
+      if (StringUtils.isNotEmpty(proxy)) {
+        counters.add(metricContext.contextAwareCounter(
+            MetricRegistry.name(
+                MetricReportUtils.GOBBLIN_SERVICE_METRICS_PREFIX,
+                ServiceMetricNames.SERVICE_USERS, proxy)));
+      }
+
+      try {
+        String serializedRequesters = ConfigUtils.getString(configs, RequesterService.REQUESTER_LIST, null);
+        if (StringUtils.isNotEmpty(serializedRequesters)) {
+          List<ServiceRequester> requesters = RequesterService.deserialize(serializedRequesters);
+          for (ServiceRequester requester : requesters) {
+            counters.add(metricContext.contextAwareCounter(MetricRegistry
+                .name(MetricReportUtils.GOBBLIN_SERVICE_METRICS_PREFIX, ServiceMetricNames.SERVICE_USERS, requester.getName())));
+          }
+        }
+      } catch (IOException e) {
+        log.error("Error while fetching requester list.", e);
+      }
+
+      return counters;
+    }
     /**
      * Perform clean up. Remove a dag from the dagstore if the dag is complete and update internal state.
      */
diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompilerTest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompilerTest.java
index 644b75a..c03aa39 100644
--- a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompilerTest.java
+++ b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompilerTest.java
@@ -84,6 +84,7 @@ import org.apache.gobblin.service.modules.flowgraph.FlowEdge;
 import org.apache.gobblin.service.modules.flowgraph.FlowEdgeFactory;
 import org.apache.gobblin.service.modules.flowgraph.FlowGraph;
 import org.apache.gobblin.service.modules.flowgraph.FlowGraphConfigurationKeys;
+import org.apache.gobblin.service.modules.orchestration.AzkabanProjectConfig;
 import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
 import org.apache.gobblin.service.modules.template_catalog.FSFlowTemplateCatalog;
 import org.apache.gobblin.util.CompletedFuture;
@@ -648,7 +649,7 @@ public class MultiHopFlowCompilerTest {
 
     Assert.assertTrue(dag.isEmpty());
     Assert.assertEquals(spec.getCompilationErrors().size(), 1);
-    Assert.assertTrue(spec.getCompilationErrors().iterator().next().contains("user.to.proxy"));
+    Assert.assertTrue(spec.getCompilationErrors().iterator().next().contains(AzkabanProjectConfig.USER_TO_PROXY));
   }
 
   @Test (dependsOnMethods = "testUnresolvedFlow")