You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@gobblin.apache.org by GitBox <gi...@apache.org> on 2021/04/27 19:05:42 UTC

[GitHub] [gobblin] umustafi commented on a change in pull request #3268: [GOBBLIN-1437] Segregate FlowConfigs/Delete and FlowExecutions/Delete functions

umustafi commented on a change in pull request #3268:
URL: https://github.com/apache/gobblin/pull/3268#discussion_r621514135



##########
File path: gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
##########
@@ -365,38 +365,42 @@ public void remove(Spec spec, Properties headers) throws IOException {
     // .. this will work for Identity compiler but not always for multi-hop.
     // Note: Current logic assumes compilation is consistent between all executions
     if (spec instanceof FlowSpec) {
+      //Send the dag to the DagManager to stop it.
+      //Also send it to the SpecProducer to do any cleanup tasks on SpecExecutor.
       if (this.dagManager.isPresent()) {
-        //Send the dag to the DagManager.
         _log.info("Forwarding cancel request for flow URI {} to DagManager.", spec.getUri());
         this.dagManager.get().stopDag(spec.getUri());
-      } else {
-        // If DagManager is not enabled, we need to recompile the flow to find the spec producer,
-        // If compilation results is different, it remove request can go to some different spec producer
-        Dag<JobExecutionPlan> jobExecutionPlanDag = specCompiler.compileFlow(spec);
-
-        if (jobExecutionPlanDag.isEmpty()) {
-          _log.warn("Cannot determine an executor to delete Spec: " + spec);
-          return;
-        }
-
-        // Delete all compiled JobSpecs on their respective Executor
-        for (Dag.DagNode<JobExecutionPlan> dagNode: jobExecutionPlanDag.getNodes()) {
-          JobExecutionPlan jobExecutionPlan = dagNode.getValue();
-          Spec jobSpec = jobExecutionPlan.getJobSpec();
-          try {
-            SpecProducer<Spec> producer = jobExecutionPlan.getSpecExecutor().getProducer().get();
-            _log.info(String.format("Going to delete JobSpec: %s on Executor: %s", jobSpec, producer));
-            producer.deleteSpec(jobSpec.getUri(), headers);
-          } catch (Exception e) {
-            _log.error(String.format("Could not delete JobSpec: %s for flow: %s", jobSpec, spec), e);
-          }
-        }
       }
+      // We need to recompile the flow to find the spec producer,
+      // If compilation results is different, it remove request can go to some different spec producer

Review comment:
       small nitpick here "it's"




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org