You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@gobblin.apache.org by GitBox <gi...@apache.org> on 2020/06/18 21:06:36 UTC

[GitHub] [incubator-gobblin] sv2000 commented on a change in pull request #3027: [GOBBLIN-1187] cancel flow execution when dag manager is disabled

sv2000 commented on a change in pull request #3027:
URL: https://github.com/apache/incubator-gobblin/pull/3027#discussion_r442480654



##########
File path: gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/JobCatalogListener.java
##########
@@ -91,4 +94,18 @@ public Void apply(JobCatalogListener listener) {
     }
   }
 
+  public static class CancelJobCallback extends Callback<JobCatalogListener, Void> {
+    private final JobSpec _cancelJob;
+
+    public CancelJobCallback(JobSpec cancelJob) {
+      super(Objects.toStringHelper("onCancelJob")
+          .add("cancelJob", cancelJob).toString());
+      _cancelJob = cancelJob;
+    }
+
+    @Override public Void apply(JobCatalogListener listener) {

Review comment:
       start the method definition on a new line.

##########
File path: gobblin-modules/gobblin-kafka-09/src/main/java/org/apache/gobblin/kafka/writer/Kafka09DataWriter.java
##########
@@ -84,6 +83,7 @@ public long bytesWritten() {
       };
 
   private final Producer<K, V> producer;
+  @EqualsAndHashCode.Include

Review comment:
       Why does the equals check only include the topic name and not the other fields?

##########
File path: gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
##########
@@ -385,6 +400,42 @@ public void remove(Spec spec, Properties headers) throws IOException {
     }
   }
 
+  @Subscribe
+  public void handleKillFlowEvent(KillFlowEvent killFlowEvent) {

Review comment:
       I am a little concerned with the extent of hacks we are introducing because of the dag manager mode being present or not. I think we should evaluate doing this against just making DagManager the standard de-facto code path.  

##########
File path: gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
##########
@@ -293,28 +301,34 @@ public void orchestrate(Spec spec) throws Exception {
         this.dagManager.get().addDag(jobExecutionPlanDag, true);
       } else {
         // Schedule all compiled JobSpecs on their respective Executor
+        // This assumes that the JobSpecs do not have any dependency on each other and all can run together
         for (Dag.DagNode<JobExecutionPlan> dagNode : jobExecutionPlanDag.getNodes()) {
           DagManagerUtils.incrementJobAttempt(dagNode);
           JobExecutionPlan jobExecutionPlan = dagNode.getValue();
 
           // Run this spec on selected executor
-          SpecProducer producer = null;
+          SpecProducer<Spec> producer = null;
           try {
             producer = jobExecutionPlan.getSpecExecutor().getProducer().get();
-            Spec jobSpec = jobExecutionPlan.getJobSpec();
+            JobSpec jobSpec = jobExecutionPlan.getJobSpec();
 
-            if (!((JobSpec) jobSpec).getConfig().hasPath(ConfigurationKeys.FLOW_EXECUTION_ID_KEY)) {
+            if (!jobSpec.getConfig().hasPath(ConfigurationKeys.FLOW_EXECUTION_ID_KEY)) {
               _log.warn("JobSpec does not contain flowExecutionId.");
             }
 
             Map<String, String> jobMetadata = TimingEventUtils.getJobMetadata(flowMetadata, jobExecutionPlan);
-            _log.info(String.format("Going to orchestrate JobSpec: %s on Executor: %s", jobSpec, producer));
+            _log.info(String.format("Going to orchestrate JobSpec: %s on Executor: %s", jobSpec.toString(), producer));
 
             TimingEvent jobOrchestrationTimer = this.eventSubmitter.isPresent() ? this.eventSubmitter.get().
                 getTimingEvent(TimingEvent.LauncherTimings.JOB_ORCHESTRATED) : null;
 
             producer.addSpec(jobSpec);
 
+            if (!specProducerToSpecs.containsKey(producer)) {

Review comment:
       It looks like the specProducerToSpecs map will keep growing unless the spec deletion is triggered via DELETE request. The map should be pruned on job completions, either or on success or failure.

##########
File path: gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
##########
@@ -298,24 +296,14 @@ synchronized public void stopDag(URI uri) throws IOException {
   /**
    * Add the specified flow to {@link DagManager#cancelQueue}
    */
-  private void killFlow(String flowGroup, String flowName, long flowExecutionId) throws IOException {
+  void killFlow(String flowGroup, String flowName, long flowExecutionId) throws IOException {
     int queueId =  DagManagerUtils.getDagQueueId(flowExecutionId, this.numThreads);
     String dagId = DagManagerUtils.generateDagId(flowGroup, flowName, flowExecutionId);
     if (!this.cancelQueue[queueId].offer(dagId)) {
       throw new IOException("Could not add dag " + dagId + " to cancellation queue.");
     }
   }
 
-  @Subscribe

Review comment:
       I see that the handleKillFlowEvent is now moved to Orchestrator. Shouldn't the Orchestrator be now registering itself with the eventbus instance to receive the messages? 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org