You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by su...@apache.org on 2019/11/08 19:16:25 UTC

[incubator-gobblin] branch master updated: [GOBBLIN-950] Avoid persisting dag right after loading it on startup

This is an automated email from the ASF dual-hosted git repository.

suvasude pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 66376ec  [GOBBLIN-950] Avoid persisting dag right after loading it on startup
66376ec is described below

commit 66376ec7a57df20adc6cd097da0fffb86bd19f68
Author: Jack Moseley <jm...@linkedin.com>
AuthorDate: Fri Nov 8 11:16:16 2019 -0800

    [GOBBLIN-950] Avoid persisting dag right after loading it on startup
    
    Closes #2798 from jack-moseley/dag-optimizations
---
 .../gobblin/service/modules/orchestration/DagManager.java  | 14 +++++++++-----
 .../service/modules/orchestration/Orchestrator.java        |  2 +-
 .../service/modules/orchestration/DagManagerFlowTest.java  | 12 ++++++------
 3 files changed, 16 insertions(+), 12 deletions(-)

diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
index dcadc88..06eb277 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
@@ -225,13 +225,17 @@ public class DagManager extends AbstractIdleService {
   }
 
   /**
-   * Method to submit a {@link Dag} to the {@link DagManager}. The {@link DagManager} first persists the
+   * Method to submit a {@link Dag} to the {@link DagManager}. The {@link DagManager} optionally persists the
    * submitted dag to the {@link DagStateStore} and then adds the dag to a {@link BlockingQueue} to be picked up
    * by one of the {@link DagManagerThread}s.
+   * @param dag {@link Dag} to be added
+   * @param persist whether to persist the dag to the {@link DagStateStore}
    */
-  synchronized void addDag(Dag<JobExecutionPlan> dag) throws IOException {
-    //Persist the dag
-    this.dagStateStore.writeCheckpoint(dag);
+  synchronized void addDag(Dag<JobExecutionPlan> dag, boolean persist) throws IOException {
+    if (persist) {
+      //Persist the dag
+      this.dagStateStore.writeCheckpoint(dag);
+    }
     int queueId = DagManagerUtils.getDagQueueId(dag, this.numThreads);
     // Add the dag to the specific queue determined by flowExecutionId
     // Flow cancellation request has to be forwarded to the same DagManagerThread where the
@@ -306,7 +310,7 @@ public class DagManager extends AbstractIdleService {
           this.scheduledExecutorPool.scheduleAtFixedRate(dagManagerThread, 0, this.pollingInterval, TimeUnit.SECONDS);
         }
         for (Dag<JobExecutionPlan> dag : dagStateStore.getDags()) {
-          addDag(dag);
+          addDag(dag, false);
         }
       } else { //Mark the DagManager inactive.
         log.info("Inactivating the DagManager. Shutting down all DagManager threads");
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
index f987682..7a1b611 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
@@ -284,7 +284,7 @@ public class Orchestrator implements SpecCatalogListener, Instrumentable {
 
       if (this.dagManager.isPresent()) {
         //Send the dag to the DagManager.
-        this.dagManager.get().addDag(jobExecutionPlanDag);
+        this.dagManager.get().addDag(jobExecutionPlanDag, true);
       } else {
         // Schedule all compiled JobSpecs on their respective Executor
         for (Dag.DagNode<JobExecutionPlan> dagNode : jobExecutionPlanDag.getNodes()) {
diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerFlowTest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerFlowTest.java
index fde9b85..ae9877f 100644
--- a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerFlowTest.java
+++ b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerFlowTest.java
@@ -90,9 +90,9 @@ public class DagManagerFlowTest {
         .thenReturn(Collections.singletonList(123456782L));
 
     // mock add spec
-    dagManager.addDag(dag1);
-    dagManager.addDag(dag2);
-    dagManager.addDag(dag3);
+    dagManager.addDag(dag1, true);
+    dagManager.addDag(dag2, true);
+    dagManager.addDag(dag3, true);
 
     // check existence of dag in dagToJobs map
     AssertWithBackoff.create().maxSleepMs(5000).backoffFactor(1).
@@ -148,7 +148,7 @@ public class DagManagerFlowTest {
         .thenReturn(Collections.singletonList(flowExecutionId));
 
     // mock add spec
-    dagManager.addDag(dag);
+    dagManager.addDag(dag, true);
 
     // check existence of dag in dagToJobs map
     AssertWithBackoff.create().maxSleepMs(5000).backoffFactor(1).
@@ -192,7 +192,7 @@ public class DagManagerFlowTest {
     dag.getStartNodes().get(0).getValue().getJobSpec().setConfig(jobConfig);
 
     // mock add spec
-    dagManager.addDag(dag);
+    dagManager.addDag(dag, true);
 
     // check existence of dag in dagToSLA map
     AssertWithBackoff.create().maxSleepMs(5000).backoffFactor(1).
@@ -231,7 +231,7 @@ public class DagManagerFlowTest {
     dag.getStartNodes().get(0).getValue().getJobSpec().setConfig(jobConfig);
 
     // mock add spec
-    dagManager.addDag(dag);
+    dagManager.addDag(dag, true);
 
     // check existence of dag in dagToSLA map
     AssertWithBackoff.create().maxSleepMs(5000).backoffFactor(1).