You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@gobblin.apache.org by GitBox <gi...@apache.org> on 2020/11/17 19:26:05 UTC

[GitHub] [incubator-gobblin] arjun4084346 commented on a change in pull request #3155: [GOBBLIN-1319] fix cancellation in gobblin cluster jobs

arjun4084346 commented on a change in pull request #3155:
URL: https://github.com/apache/incubator-gobblin/pull/3155#discussion_r525428377



##########
File path: gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobScheduler.java
##########
@@ -367,6 +369,33 @@ public void handleDeleteJobConfigArrival(DeleteJobConfigArrivalEvent deleteJobAr
     }
   }
 
+  @Subscribe
+  public void handleCancelJobConfigArrival(CancelJobConfigArrivalEvent cancelJobArrival)
+      throws InterruptedException {
+    String jobUri = cancelJobArrival.getJoburi();
+    LOGGER.info("Received cancel for job configuration of job " + jobUri);
+    Optional<String> planningJob = Optional.empty();
+    Optional<String> actualJob = Optional.empty();
+
+    try {
+      planningJob = this.jobsMapping.getPlanningJobId(jobUri);
+      actualJob = this.jobsMapping.getActualJobId(jobUri);
+    } catch (IOException e) {
+      LOGGER.warn("Planning and actual jobs could not be retrieved for job {}", jobUri);
+      return;
+    }
+
+    if (planningJob.isPresent()) {
+      LOGGER.info("Cancelling planning job helix workflow: {}", planningJob.get());
+      new TaskDriver(this.taskDriverHelixManager.get()).waitToStop(planningJob.get(), this.helixJobStopTimeoutMillis);
+    }
+
+    if (actualJob.isPresent()) {
+      LOGGER.info("Cancelling actual job helix workflow: {}", actualJob.get());
+      new TaskDriver(this.jobHelixManager).waitToStop(actualJob.get(), this.helixJobStopTimeoutMillis);
+    }

Review comment:
       1) The problem is in `HelixUtils.getWorkflowIdsFromJobNames` method.
   It uses ConfigurationKeys.JOB_NAME_KEY (job.name) to identify helix workflows for the provided jobs. However, for planning jobs' `job.name` is the name of original jobs and not what you get with `planningJob.get()`
   The helix jobs' configs do have a field (`PLANNING_ID_KEY`) which stores `planningJob.get()` but I cannot use that field because I wanted to keep `cancelJobIfRequired()` method usable for rest of the helix jobs.
   
   2) There is no side effect, but yes it can be removed from here.
   
   3) HelixRetriggeringJobCallable.deleteJobSpec() calls `jobCatalog.remove()`, after it submits job to helix. I want this remove to be different than the DELETE request coming from GaaS, which is meant for cancellation (including deletion). added this comment in NonObservingFSJobCatalog




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org