You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by su...@apache.org on 2019/11/08 19:16:25 UTC
[incubator-gobblin] branch master updated: [GOBBLIN-950] Avoid
persisting dag right after loading it on startup
This is an automated email from the ASF dual-hosted git repository.
suvasude pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 66376ec [GOBBLIN-950] Avoid persisting dag right after loading it on startup
66376ec is described below
commit 66376ec7a57df20adc6cd097da0fffb86bd19f68
Author: Jack Moseley <jm...@linkedin.com>
AuthorDate: Fri Nov 8 11:16:16 2019 -0800
[GOBBLIN-950] Avoid persisting dag right after loading it on startup
Closes #2798 from jack-moseley/dag-optimizations
---
.../gobblin/service/modules/orchestration/DagManager.java | 14 +++++++++-----
.../service/modules/orchestration/Orchestrator.java | 2 +-
.../service/modules/orchestration/DagManagerFlowTest.java | 12 ++++++------
3 files changed, 16 insertions(+), 12 deletions(-)
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
index dcadc88..06eb277 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
@@ -225,13 +225,17 @@ public class DagManager extends AbstractIdleService {
}
/**
- * Method to submit a {@link Dag} to the {@link DagManager}. The {@link DagManager} first persists the
+ * Method to submit a {@link Dag} to the {@link DagManager}. The {@link DagManager} optionally persists the
* submitted dag to the {@link DagStateStore} and then adds the dag to a {@link BlockingQueue} to be picked up
* by one of the {@link DagManagerThread}s.
+ * @param dag {@link Dag} to be added
+ * @param persist whether to persist the dag to the {@link DagStateStore}
*/
- synchronized void addDag(Dag<JobExecutionPlan> dag) throws IOException {
- //Persist the dag
- this.dagStateStore.writeCheckpoint(dag);
+ synchronized void addDag(Dag<JobExecutionPlan> dag, boolean persist) throws IOException {
+ if (persist) {
+ //Persist the dag
+ this.dagStateStore.writeCheckpoint(dag);
+ }
int queueId = DagManagerUtils.getDagQueueId(dag, this.numThreads);
// Add the dag to the specific queue determined by flowExecutionId
// Flow cancellation request has to be forwarded to the same DagManagerThread where the
@@ -306,7 +310,7 @@ public class DagManager extends AbstractIdleService {
this.scheduledExecutorPool.scheduleAtFixedRate(dagManagerThread, 0, this.pollingInterval, TimeUnit.SECONDS);
}
for (Dag<JobExecutionPlan> dag : dagStateStore.getDags()) {
- addDag(dag);
+ addDag(dag, false);
}
} else { //Mark the DagManager inactive.
log.info("Inactivating the DagManager. Shutting down all DagManager threads");
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
index f987682..7a1b611 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
@@ -284,7 +284,7 @@ public class Orchestrator implements SpecCatalogListener, Instrumentable {
if (this.dagManager.isPresent()) {
//Send the dag to the DagManager.
- this.dagManager.get().addDag(jobExecutionPlanDag);
+ this.dagManager.get().addDag(jobExecutionPlanDag, true);
} else {
// Schedule all compiled JobSpecs on their respective Executor
for (Dag.DagNode<JobExecutionPlan> dagNode : jobExecutionPlanDag.getNodes()) {
diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerFlowTest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerFlowTest.java
index fde9b85..ae9877f 100644
--- a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerFlowTest.java
+++ b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerFlowTest.java
@@ -90,9 +90,9 @@ public class DagManagerFlowTest {
.thenReturn(Collections.singletonList(123456782L));
// mock add spec
- dagManager.addDag(dag1);
- dagManager.addDag(dag2);
- dagManager.addDag(dag3);
+ dagManager.addDag(dag1, true);
+ dagManager.addDag(dag2, true);
+ dagManager.addDag(dag3, true);
// check existence of dag in dagToJobs map
AssertWithBackoff.create().maxSleepMs(5000).backoffFactor(1).
@@ -148,7 +148,7 @@ public class DagManagerFlowTest {
.thenReturn(Collections.singletonList(flowExecutionId));
// mock add spec
- dagManager.addDag(dag);
+ dagManager.addDag(dag, true);
// check existence of dag in dagToJobs map
AssertWithBackoff.create().maxSleepMs(5000).backoffFactor(1).
@@ -192,7 +192,7 @@ public class DagManagerFlowTest {
dag.getStartNodes().get(0).getValue().getJobSpec().setConfig(jobConfig);
// mock add spec
- dagManager.addDag(dag);
+ dagManager.addDag(dag, true);
// check existence of dag in dagToSLA map
AssertWithBackoff.create().maxSleepMs(5000).backoffFactor(1).
@@ -231,7 +231,7 @@ public class DagManagerFlowTest {
dag.getStartNodes().get(0).getValue().getJobSpec().setConfig(jobConfig);
// mock add spec
- dagManager.addDag(dag);
+ dagManager.addDag(dag, true);
// check existence of dag in dagToSLA map
AssertWithBackoff.create().maxSleepMs(5000).backoffFactor(1).