You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by su...@apache.org on 2020/03/13 04:26:21 UTC

[incubator-gobblin] branch master updated: [GOBBLIN-1073] Add proxy user and requester quota to GaaS

This is an automated email from the ASF dual-hosted git repository.

suvasude pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new d703b8f  [GOBBLIN-1073] Add proxy user and requester quota to GaaS
d703b8f is described below

commit d703b8f5201dd343ea543d52af63eb68e38121f1
Author: Jack Moseley <jm...@linkedin.com>
AuthorDate: Thu Mar 12 21:26:12 2020 -0700

    [GOBBLIN-1073] Add proxy user and requester quota to GaaS
    
    Closes #2913 from jack-moseley/gaas-quota
---
 .../service/modules/orchestration/DagManager.java  | 112 ++++++++++++++++++++-
 .../modules/orchestration/DagManagerUtils.java     |  14 +++
 .../modules/orchestration/DagManagerTest.java      |   2 +-
 3 files changed, 123 insertions(+), 5 deletions(-)

diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
index fbee417..0187c8a 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
@@ -29,6 +29,7 @@ import java.util.Map;
 import java.util.Properties;
 import java.util.Set;
 import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
@@ -123,6 +124,11 @@ public class DagManager extends AbstractIdleService {
   private static final String JOB_STATUS_RETRIEVER_CLASS_KEY = JOB_STATUS_RETRIEVER_KEY + ".class";
   private static final String DEFAULT_JOB_STATUS_RETRIEVER_CLASS = FsJobStatusRetriever.class.getName();
   private static final String DAG_STATESTORE_CLASS_KEY = DAG_MANAGER_PREFIX + "dagStateStoreClass";
+  private static final String USER_JOB_QUOTA_KEY = DAG_MANAGER_PREFIX + "defaultJobQuota";
+  private static final Integer DEFAULT_USER_JOB_QUOTA = Integer.MAX_VALUE;
+  private static final String PER_USER_QUOTA = DAG_MANAGER_PREFIX + "perUserQuota";
+
+  private static final String QUOTA_SEPERATOR = ":";
 
   /**
    * Action to be performed on a {@link Dag}, in case of a job failure. Currently, we allow 2 modes:
@@ -165,6 +171,8 @@ public class DagManager extends AbstractIdleService {
   private final JobStatusRetriever jobStatusRetriever;
   private final Config config;
   private final Optional<EventSubmitter> eventSubmitter;
+  private final int defaultQuota;
+  private final Map<String, Integer> perUserQuota;
 
   private volatile boolean isActive = false;
 
@@ -183,6 +191,12 @@ public class DagManager extends AbstractIdleService {
       this.eventSubmitter = Optional.absent();
     }
 
+    this.defaultQuota = ConfigUtils.getInt(config, USER_JOB_QUOTA_KEY, DEFAULT_USER_JOB_QUOTA);
+    this.perUserQuota = new HashMap<>();
+    for (String userQuota : ConfigUtils.getStringList(config, PER_USER_QUOTA)) {
+      this.perUserQuota.put(userQuota.split(QUOTA_SEPERATOR)[0], Integer.parseInt(userQuota.split(QUOTA_SEPERATOR)[1]));
+    }
+
     try {
       this.jobStatusRetriever = createJobStatusRetriever(config);
     } catch (ReflectiveOperationException e) {
@@ -328,7 +342,7 @@ public class DagManager extends AbstractIdleService {
         this.dagManagerThreads = new DagManagerThread[numThreads];
         for (int i = 0; i < numThreads; i++) {
           DagManagerThread dagManagerThread = new DagManagerThread(jobStatusRetriever, dagStateStore,
-              queue[i], cancelQueue[i], instrumentationEnabled);
+              queue[i], cancelQueue[i], instrumentationEnabled, defaultQuota, perUserQuota);
           this.dagManagerThreads[i] = dagManagerThread;
           this.scheduledExecutorPool.scheduleAtFixedRate(dagManagerThread, 0, this.pollingInterval, TimeUnit.SECONDS);
         }
@@ -362,6 +376,8 @@ public class DagManager extends AbstractIdleService {
    */
   public static class DagManagerThread implements Runnable {
     private final Map<DagNode<JobExecutionPlan>, Dag<JobExecutionPlan>> jobToDag = new HashMap<>();
+    private static final Map<String, Integer> proxyUserToJobCount = new ConcurrentHashMap<>();
+    private static final Map<String, Integer> requesterToJobCount = new ConcurrentHashMap<>();
     private final Map<String, Dag<JobExecutionPlan>> dags = new HashMap<>();
     // dagToJobs holds a map of dagId to running jobs of that dag
     final Map<String, LinkedList<DagNode<JobExecutionPlan>>> dagToJobs = new HashMap<>();
@@ -371,6 +387,8 @@ public class DagManager extends AbstractIdleService {
     private final MetricContext metricContext;
     private final Optional<EventSubmitter> eventSubmitter;
     private final Optional<Timer> jobStatusPolledTimer;
+    private final int defaultQuota;
+    private final Map<String, Integer> perUserQuota;
 
     private JobStatusRetriever jobStatusRetriever;
     private DagStateStore dagStateStore;
@@ -381,11 +399,14 @@ public class DagManager extends AbstractIdleService {
      * Constructor.
      */
     DagManagerThread(JobStatusRetriever jobStatusRetriever, DagStateStore dagStateStore,
-        BlockingQueue<Dag<JobExecutionPlan>> queue, BlockingQueue<String> cancelQueue, boolean instrumentationEnabled) {
+        BlockingQueue<Dag<JobExecutionPlan>> queue, BlockingQueue<String> cancelQueue, boolean instrumentationEnabled,
+        int defaultQuota, Map<String, Integer> perUserQuota) {
       this.jobStatusRetriever = jobStatusRetriever;
       this.dagStateStore = dagStateStore;
       this.queue = queue;
       this.cancelQueue = cancelQueue;
+      this.defaultQuota = defaultQuota;
+      this.perUserQuota = perUserQuota;
       if (instrumentationEnabled) {
         this.metricContext = Instrumented.getMetricContext(ConfigUtils.configToState(ConfigFactory.empty()), getClass());
         this.eventSubmitter = Optional.of(new EventSubmitter.Builder(this.metricContext, "org.apache.gobblin.service").build());
@@ -706,11 +727,12 @@ public class DagManager extends AbstractIdleService {
       JobSpec jobSpec = DagManagerUtils.getJobSpec(dagNode);
       Map<String, String> jobMetadata = TimingEventUtils.getJobMetadata(Maps.newHashMap(), jobExecutionPlan);
 
-      String specExecutorUri = dagNode.getValue().getSpecExecutor().getUri().toString();
+      String specExecutorUri = DagManagerUtils.getSpecExecutorUri(dagNode);
 
       // Run this spec on selected executor
       SpecProducer producer = null;
       try {
+        checkQuota(dagNode);
         producer = DagManagerUtils.getSpecProducer(dagNode);
         TimingEvent jobOrchestrationTimer = this.eventSubmitter.isPresent() ? this.eventSubmitter.get().
             getTimingEvent(TimingEvent.LauncherTimings.JOB_ORCHESTRATED) : null;
@@ -747,6 +769,59 @@ public class DagManager extends AbstractIdleService {
       }
     }
 
+    private void checkQuota(DagNode<JobExecutionPlan> dagNode) throws IOException {
+      String proxyUser = ConfigUtils.getString(dagNode.getValue().getJobSpec().getConfig(), AzkabanProjectConfig.USER_TO_PROXY, null);
+      String specExecutorUri = DagManagerUtils.getSpecExecutorUri(dagNode);
+      boolean proxyUserCheck = true;
+      if (proxyUser != null) {
+        proxyUserCheck = incrementMapAndCheckQuota(proxyUserToJobCount, proxyUser, dagNode);
+      }
+
+      String serializedRequesters = DagManagerUtils.getSerializedRequesterList(dagNode);
+      boolean requesterCheck = true;
+      String requesterMessage = null;
+      if (serializedRequesters != null) {
+        for (ServiceRequester requester : RequesterService.deserialize(serializedRequesters)) {
+          requesterCheck &= incrementMapAndCheckQuota(requesterToJobCount, requester.getName(), dagNode);
+          if (!requesterCheck && requesterMessage == null) {
+            requesterMessage = "Quota exceeded for requester " + requester.getName() + " on executor " + specExecutorUri + ": quota="
+                + getQuotaForUser(requester.getName()) + ", runningJobs=" + requesterToJobCount.get(DagManagerUtils.getUserQuotaKey(requester.getName(), dagNode));
+          }
+        }
+      }
+
+      // Throw errors for reach quota at the end to avoid inconsistent job counts
+      if (!proxyUserCheck) {
+        throw new IOException("Quota exceeded for proxy user " + proxyUser + " on executor " + specExecutorUri +
+            ": quota=" + getQuotaForUser(proxyUser) + ", runningJobs=" + proxyUserToJobCount.get(DagManagerUtils.getUserQuotaKey(proxyUser, dagNode)));
+      }
+
+      if (!requesterCheck) {
+        throw new IOException(requesterMessage);
+      }
+    }
+
+    /**
+     * Increment quota by one for the given map and key.
+     * @return true if quota is not reached for this user or user is whitelisted, false otherwise.
+     */
+    private boolean incrementMapAndCheckQuota(Map<String, Integer> quotaMap, String user, DagNode<JobExecutionPlan> dagNode) {
+      String key = DagManagerUtils.getUserQuotaKey(user, dagNode);
+      int jobCount = quotaMap.getOrDefault(key, 0);
+
+      // Only increment job count for first attempt, since job is considered running between retries
+      if (dagNode.getValue().getCurrentAttempts() == 1) {
+        jobCount++;
+        quotaMap.put(key, jobCount);
+      }
+
+      return jobCount <= getQuotaForUser(user);
+    }
+
+    private int getQuotaForUser(String user) {
+      return perUserQuota.getOrDefault(user, defaultQuota);
+    }
+
     /**
      * Method that defines the actions to be performed when a job finishes either successfully or with failure.
      * This method updates the state of the dag and performs clean up actions as necessary.
@@ -762,6 +837,8 @@ public class DagManager extends AbstractIdleService {
       ExecutionStatus jobStatus = DagManagerUtils.getExecutionStatus(dagNode);
       log.info("Job {} of Dag {} has finished with status {}", jobName, dagId, jobStatus.name());
 
+      releaseQuota(dagNode);
+
       if (this.metricContext != null) {
         getRunningJobsCounter(dagNode).dec();
         getRunningJobsCounterForUser(dagNode).forEach(counter -> counter.dec());
@@ -785,6 +862,33 @@ public class DagManager extends AbstractIdleService {
       }
     }
 
+    /**
+     * Decrement the quota by one for the proxy user and requesters corresponding to the provided {@link DagNode}.
+     */
+    private void releaseQuota(DagNode<JobExecutionPlan> dagNode) {
+      String proxyUser = ConfigUtils.getString(dagNode.getValue().getJobSpec().getConfig(), AzkabanProjectConfig.USER_TO_PROXY, null);
+      if (proxyUser != null) {
+        String proxyUserKey = DagManagerUtils.getUserQuotaKey(proxyUser, dagNode);
+        if (proxyUserToJobCount.containsKey(proxyUserKey) && proxyUserToJobCount.get(proxyUserKey) > 0) {
+          proxyUserToJobCount.put(proxyUserKey, proxyUserToJobCount.get(proxyUserKey) - 1);
+        }
+      }
+
+      String serializedRequesters = DagManagerUtils.getSerializedRequesterList(dagNode);
+      if (serializedRequesters != null) {
+        try {
+          for (ServiceRequester requester : RequesterService.deserialize(serializedRequesters)) {
+            String requesterKey = DagManagerUtils.getUserQuotaKey(requester.getName(), dagNode);
+            if (requesterToJobCount.containsKey(requesterKey) && requesterToJobCount.get(requesterKey) > 0) {
+              requesterToJobCount.put(requesterKey, requesterToJobCount.get(requesterKey) - 1);
+            }
+          }
+        } catch (IOException e) {
+          log.error("Failed to release quota for requester list " + serializedRequesters, e);
+        }
+      }
+    }
+
     private void deleteJobState(String dagId, DagNode<JobExecutionPlan> dagNode) {
       this.jobToDag.remove(dagNode);
       this.dagToJobs.get(dagId).remove(dagNode);
@@ -828,7 +932,7 @@ public class DagManager extends AbstractIdleService {
       }
 
       try {
-        String serializedRequesters = ConfigUtils.getString(configs, RequesterService.REQUESTER_LIST, null);
+        String serializedRequesters = DagManagerUtils.getSerializedRequesterList(dagNode);
         if (StringUtils.isNotEmpty(serializedRequesters)) {
           List<ServiceRequester> requesters = RequesterService.deserialize(serializedRequesters);
           for (ServiceRequester requester : requesters) {
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerUtils.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerUtils.java
index 6bfd7de..5290444 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerUtils.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerUtils.java
@@ -35,6 +35,7 @@ import org.apache.gobblin.runtime.api.JobSpec;
 import org.apache.gobblin.runtime.api.SpecProducer;
 import org.apache.gobblin.service.ExecutionStatus;
 import org.apache.gobblin.service.FlowId;
+import org.apache.gobblin.service.RequesterService;
 import org.apache.gobblin.service.modules.flowgraph.Dag;
 import org.apache.gobblin.service.modules.flowgraph.Dag.DagNode;
 import org.apache.gobblin.service.modules.orchestration.DagManager.FailureOption;
@@ -44,6 +45,7 @@ import org.apache.gobblin.util.ConfigUtils;
 
 public class DagManagerUtils {
   static long NO_SLA = -1L;
+  static String QUOTA_KEY_SEPERATOR = ",";
 
   static FlowId getFlowId(Dag<JobExecutionPlan> dag) {
     return getFlowId(dag.getStartNodes().get(0));
@@ -220,6 +222,18 @@ public class DagManagerUtils {
     return FailureOption.valueOf(failureOption);
   }
 
+  static String getSpecExecutorUri(DagNode<JobExecutionPlan> dagNode) {
+    return dagNode.getValue().getSpecExecutor().getUri().toString();
+  }
+
+  static String getSerializedRequesterList(DagNode<JobExecutionPlan> dagNode) {
+    return ConfigUtils.getString(dagNode.getValue().getJobSpec().getConfig(), RequesterService.REQUESTER_LIST, null);
+  }
+
+  static String getUserQuotaKey(String user, DagNode<JobExecutionPlan> dagNode) {
+    return user + QUOTA_KEY_SEPERATOR + getSpecExecutorUri(dagNode);
+  }
+
   /**
    * Increment the value of {@link JobExecutionPlan#currentAttempts}
    */
diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerTest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerTest.java
index c74314c..60f25d1 100644
--- a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerTest.java
+++ b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerTest.java
@@ -78,7 +78,7 @@ public class DagManagerTest {
     this.queue = new LinkedBlockingQueue<>();
     this.cancelQueue = new LinkedBlockingQueue<>();
     this._dagManagerThread = new DagManager.DagManagerThread(_jobStatusRetriever, _dagStateStore, queue, cancelQueue,
-        true);
+        true, 5, new HashMap<>());
 
     Field jobToDagField = DagManager.DagManagerThread.class.getDeclaredField("jobToDag");
     jobToDagField.setAccessible(true);