You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@gobblin.apache.org by "ZihanLi58 (via GitHub)" <gi...@apache.org> on 2023/06/27 20:53:12 UTC

[GitHub] [gobblin] ZihanLi58 commented on a diff in pull request #3704: [GOBBLIN-1840] Helix Job scheduler should not try to replace running workflow if within configured time

ZihanLi58 commented on code in PR #3704:
URL: https://github.com/apache/gobblin/pull/3704#discussion_r1244336768


##########
gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobScheduler.java:
##########
@@ -360,7 +400,7 @@ private void waitForJobCompletion(String jobName) {
   }
 
   @Subscribe
-  public void handleDeleteJobConfigArrival(DeleteJobConfigArrivalEvent deleteJobArrival) throws InterruptedException {
+  public synchronized void handleDeleteJobConfigArrival(DeleteJobConfigArrivalEvent deleteJobArrival) throws InterruptedException {

Review Comment:
   1. In handleUpdateJobConfigArrival, we call handleDeleteJobConfigArrival directly. So if you want to specifically reset it, remember to distinguish the two calls here (one is called by handleUpdateJobConfigArrival) and another is called directly. 
   2. Unless you change all the cancel APIs in our code to send a delete job message to trigger this method, then, in that case, resetting the timer can enable us to start a new job immediately, otherwise it does not make sense to achieve 1, as we don't explicitly delete anyway... 



##########
gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobScheduler.java:
##########
@@ -315,24 +344,35 @@ public void handleNewJobConfigArrival(NewJobConfigArrivalEvent newJobArrival) {
       jobProps.setProperty(GobblinClusterConfigurationKeys.JOB_SPEC_URI, jobUri);
 
       this.jobSchedulerMetrics.updateTimeBeforeJobScheduling(jobProps);
-
+      GobblinHelixJobLauncherListener listener = isThrottleEnabled ?
+          new GobblinThrottlingHelixJobLauncherListener(this.launcherMetrics, jobNameToNextSchedulableTime,
+              jobSchedulingThrottleTimeout, clock)
+          : new GobblinHelixJobLauncherListener(this.launcherMetrics);
       if (jobProps.containsKey(ConfigurationKeys.JOB_SCHEDULE_KEY)) {
         LOGGER.info("Scheduling job " + jobUri);
-        scheduleJob(jobProps,
-                    new GobblinHelixJobLauncherListener(this.launcherMetrics));
+        scheduleJob(jobProps, listener);
       } else {
         LOGGER.info("No job schedule found, so running job " + jobUri);
-        this.jobExecutor.execute(new NonScheduledJobRunner(jobProps,
-                                 new GobblinHelixJobLauncherListener(this.launcherMetrics)));
+        this.jobExecutor.execute(new NonScheduledJobRunner(jobProps, listener));
       }
     } catch (JobException je) {
       LOGGER.error("Failed to schedule or run job " + jobUri, je);
     }
   }
 
   @Subscribe
-  public void handleUpdateJobConfigArrival(UpdateJobConfigArrivalEvent updateJobArrival) {
+  public synchronized void handleUpdateJobConfigArrival(UpdateJobConfigArrivalEvent updateJobArrival) {

Review Comment:
   @homatthew are we sure this change won't affect performance when those message-handling methods will be called frequently? (That's why initially I suggested having job level lock)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@gobblin.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org