You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by su...@apache.org on 2019/11/13 02:26:29 UTC

[incubator-gobblin] branch master updated: [GOBBLIN-956] Continue loading dags until queue is drained

This is an automated email from the ASF dual-hosted git repository.

suvasude pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 3e350ad  [GOBBLIN-956] Continue loading dags until queue is drained
3e350ad is described below

commit 3e350ad39bf9b1d524d3946cb3f1d5ff1c2a4efb
Author: Jack Moseley <jm...@linkedin.com>
AuthorDate: Tue Nov 12 18:26:19 2019 -0800

    [GOBBLIN-956] Continue loading dags until queue is drained
    
    Dear Gobblin maintainers,
    
    Please accept this PR. I understand that it will
    not be reviewed until I have checked off all the
    steps below!
    
    ### JIRA
    - [x] My PR addresses the following [Gobblin JIRA]
    (https://issues.apache.org/jira/browse/GOBBLIN/)
    issues and references them in the PR title. For
    example, "[GOBBLIN-XXX] My Gobblin PR"
        -
    https://issues.apache.org/jira/browse/GOBBLIN-956
    
    ### Description
    - [x] Here are some details about my PR, including
    screenshots (if applicable):
    
    If many dags are added at once (such as on gaas
    startup), each `DagManagerThread` will still only
    load one each run (10 seconds). Added a while loop
    to continue loading dags until the queue is
    drained instead.
    
    ### Tests
    - [x] My PR adds the following unit tests __OR__
    does not need testing for this extremely good
    reason:
    Tested locally
    
    ### Commits
    - [x] My commits all reference JIRA issues in
    their subject lines, and I have squashed multiple
    commits if they address the same issue. In
    addition, my commits follow the guidelines from
    "[How to write a good git commit
    message](http://chris.beams.io/posts/git-
    commit/)":
        1. Subject is separated from body by a blank line
        2. Subject is limited to 50 characters
        3. Subject does not end with a period
        4. Subject uses the imperative mood ("add", not
    "adding")
        5. Body wraps at 72 characters
        6. Body explains "what" and "why", not "how"
    
    Closes #2805 from jack-moseley/dag-queue
---
 .../service/modules/orchestration/DagManager.java       | 17 ++++++++++-------
 1 file changed, 10 insertions(+), 7 deletions(-)

diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
index 06eb277..a5bcd4a 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
@@ -386,15 +386,18 @@ public class DagManager extends AbstractIdleService {
           cancelDag(nextDagToCancel);
         }
 
-        Dag<JobExecutionPlan> dag = queue.poll();
-        //Poll the queue for a new Dag to execute.
-        if (dag != null) {
-          if (dag.isEmpty()) {
-            log.info("Empty dag; ignoring the dag");
+        while (!queue.isEmpty()) {
+          Dag<JobExecutionPlan> dag = queue.poll();
+          //Poll the queue for a new Dag to execute.
+          if (dag != null) {
+            if (dag.isEmpty()) {
+              log.info("Empty dag; ignoring the dag");
+            }
+            //Initialize dag.
+            initialize(dag);
           }
-          //Initialize dag.
-          initialize(dag);
         }
+
         log.debug("Polling job statuses..");
         //Poll and update the job statuses of running jobs.
         pollAndAdvanceDag();