You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@gobblin.apache.org by "Peiyingy (via GitHub)" <gi...@apache.org> on 2023/06/14 18:01:21 UTC

[GitHub] [gobblin] Peiyingy opened a new pull request, #3704: Py helix scheduler throttle gobblin 1840

Peiyingy opened a new pull request, #3704:
URL: https://github.com/apache/gobblin/pull/3704

   Dear Gobblin maintainers,
   
   Please accept this PR. I understand that it will not be reviewed until I have checked off all the steps below!
   
   ### JIRA
   - [ ] My PR addresses the following [Gobblin JIRA](https://issues.apache.org/jira/browse/GOBBLIN/) issues and references them in the PR title. For example, "[GOBBLIN-1840] My Gobblin PR"
       - https://issues.apache.org/jira/browse/GOBBLIN-1840
   
   ### Description
   - [ ] Here are some details about my PR, including screenshots (if applicable):
   
   #### Problem Statement
   Currently, there is a problem with the Helix replanner, that Azkaban jobs can be triggered at the same time, causing replanning to happen in a short time span twice or more. It is expensive to create a replanner, consuming a lot of resources and a long time for both the Zookeeper and the Application Master.
   
   #### Solution
   We implemented a concurrent hashmap to store the create time for each job so that we can check the hashmap record to make sure that we only reschedule the workflow when the last replanning is earlier than the throttle timeout threshold, which has a default time of an hour and totally configurable. We also have a throttling feature that is able to turn off, stopping this early return mechanism.
   
   ### Tests
   - [ ] My PR adds the following unit tests __OR__ does not need testing for this extremely good reason:
   <img width="333" alt="Screenshot 2023-06-14 at 11 00 50 AM" src="https://github.com/apache/gobblin/assets/112960226/098448b9-ee6d-4f7f-9672-d51ad7f62b48">
   
   The unit tests are permutations regards to three variables: same or different workflow, time span, and whether throttling is enabled. The original testNewJobAndUpdate is for the same workflow, long time period, and throttle enabled. The rest of the tests have descriptive names.
   
   ### Commits
   - [ ] My commits all reference JIRA issues in their subject lines, and I have squashed multiple commits if they address the same issue. In addition, my commits follow the guidelines from "[How to write a good git commit message](http://chris.beams.io/posts/git-commit/)":
       1. Subject is separated from body by a blank line
       2. Subject is limited to 50 characters
       3. Subject does not end with a period
       4. Subject uses the imperative mood ("add", not "adding")
       5. Body wraps at 72 characters
       6. Body explains "what" and "why", not "how"
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@gobblin.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [gobblin] homatthew commented on a diff in pull request #3704: Py helix scheduler throttle gobblin 1840

Posted by "homatthew (via GitHub)" <gi...@apache.org>.
homatthew commented on code in PR #3704:
URL: https://github.com/apache/gobblin/pull/3704#discussion_r1229997117


##########
gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobScheduler.java:
##########
@@ -110,14 +113,16 @@ public class GobblinHelixJobScheduler extends JobScheduler implements StandardMe
 
   private boolean startServicesCompleted;
   private final long helixJobStopTimeoutMillis;
+  private final Duration throttleTimeoutDuration;
+  private ConcurrentHashMap<String, Instant> jobStartTimeMap;
 
   public GobblinHelixJobScheduler(Config sysConfig,
-                                  HelixManager jobHelixManager,
-                                  Optional<HelixManager> taskDriverHelixManager,
-                                  EventBus eventBus,
-                                  Path appWorkDir, List<? extends Tag<?>> metadataTags,
-                                  SchedulerService schedulerService,
-                                  MutableJobCatalog jobCatalog) throws Exception {
+      HelixManager jobHelixManager,

Review Comment:
   In general, the autoformatter we use is https://gobblin.apache.org/docs/developer-guide/CodingStyle/



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@gobblin.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [gobblin] Peiyingy commented on a diff in pull request #3704: [GOBBLIN-1840] Helix Job scheduler should not try to replace running workflow if within configured time

Posted by "Peiyingy (via GitHub)" <gi...@apache.org>.
Peiyingy commented on code in PR #3704:
URL: https://github.com/apache/gobblin/pull/3704#discussion_r1231313269


##########
gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobScheduler.java:
##########
@@ -333,6 +345,20 @@ public void handleNewJobConfigArrival(NewJobConfigArrivalEvent newJobArrival) {
   @Subscribe
   public void handleUpdateJobConfigArrival(UpdateJobConfigArrivalEvent updateJobArrival) {
     LOGGER.info("Received update for job configuration of job " + updateJobArrival.getJobName());
+    String jobName = updateJobArrival.getJobName();
+    boolean throttleEnabled = PropertiesUtils.getPropAsBoolean(updateJobArrival.getJobConfig(),

Review Comment:
   Default is provided by GobblinClusterConfigurationKeys.DEFAULT_HELIX_JOB_SCHEDULING_THROTTLE_ENABLED_KEY in the config file, which is set to false. Should I add an additional default proof in this part?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@gobblin.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [gobblin] homatthew commented on a diff in pull request #3704: [GOBBLIN-1840] Helix Job scheduler should not try to replace running workflow if within configured time

Posted by "homatthew (via GitHub)" <gi...@apache.org>.
homatthew commented on code in PR #3704:
URL: https://github.com/apache/gobblin/pull/3704#discussion_r1244470463


##########
gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobScheduler.java:
##########
@@ -522,7 +534,7 @@ public void run() {
         GobblinHelixJobScheduler.this.jobSchedulerMetrics.updateTimeBetweenJobSchedulingAndJobLaunching(this.creationTimeInMillis, System.currentTimeMillis());
         GobblinHelixJobScheduler.this.runJob(this.jobProps, this.jobListener);
       } catch (JobException je) {
-        LOGGER.error("Failed to run job " + this.jobProps.getProperty(ConfigurationKeys.JOB_NAME_KEY), je);
+        LOGGER.error("Failed to schedule or run job to run job " + this.jobProps.getProperty(ConfigurationKeys.JOB_NAME_KEY), je);

Review Comment:
   Typo / wording. `schedule or run job to run job`



##########
gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinHelixJobSchedulerTest.java:
##########
@@ -129,58 +140,192 @@ public void setUp()
     this.thread.start();
   }
 
+  /***
+   * Time span exceeds throttle timeout, within same workflow, throttle is enabled
+   * Job will be updated
+   * @throws Exception
+   */
+  @Test
+  public void testUpdateSameWorkflowLongPeriodThrottle()
+      throws Exception {
+    runWorkflowTest(exceedsThrottlePeriod, "UpdateSameWorkflowLongPeriodThrottle",
+        workflowIdSuffix1, workflowIdSuffix2, workflowIdSuffix2,
+        true, true);
+  }
+
+  /***
+   * Time span is within throttle timeout, within same workflow, throttle is enabled
+   * Job will not be updated
+   * @throws Exception
+   */
+  @Test
+  public void testUpdateSameWorkflowShortPeriodThrottle()
+      throws Exception {
+    runWorkflowTest(withinThrottlePeriod, "UpdateSameWorkflowShortPeriodThrottle",
+        workflowIdSuffix1, workflowIdSuffix2, workflowIdSuffix1,
+        true, true);
+  }
+
+  /***
+   * Time span exceeds throttle timeout, within same workflow, throttle is not enabled
+   * Job will be updated
+   * @throws Exception
+   */
+  @Test
+  public void testUpdateSameWorkflowLongPeriodNoThrottle()
+      throws Exception {
+    runWorkflowTest(exceedsThrottlePeriod, "UpdateSameWorkflowLongPeriodNoThrottle",
+        workflowIdSuffix1, workflowIdSuffix2, workflowIdSuffix2,
+        false, true);
+  }
+
+  /***
+   * Time span is within throttle timeout, within same workflow, throttle is not enabled
+   * Job will be updated
+   * @throws Exception
+   */
   @Test
-  public void testNewJobAndUpdate()
+  public void testUpdateSameWorkflowShortPeriodNoThrottle()
+      throws Exception {
+    runWorkflowTest(withinThrottlePeriod, "UpdateSameWorkflowShortPeriodNoThrottle",
+        workflowIdSuffix1, workflowIdSuffix2, workflowIdSuffix2,
+        false, true);
+  }
+
+  /***
+   * Time span is within throttle timeout, within different workflow, throttle is enabled
+   * Job will be updated
+   * @throws Exception
+   */
+  public void testUpdateDiffWorkflowShortPeriodThrottle()
       throws Exception {
+    runWorkflowTest(withinThrottlePeriod, "UpdateDiffWorkflowShortPeriodThrottle",
+        workflowIdSuffix1, workflowIdSuffix2, workflowIdSuffix2,
+        true, false);
+  }
+
+  /***
+   * Time span is within throttle timeout, within different workflow, throttle is not enabled
+   * Job will be updated
+   * @throws Exception
+   */
+  @Test
+  public void testUpdateDiffWorkflowShortPeriodNoThrottle()
+      throws Exception {
+    runWorkflowTest(withinThrottlePeriod, "UpdateDiffWorkflowShortPeriodNoThrottle",
+        workflowIdSuffix1, workflowIdSuffix2, workflowIdSuffix2,
+        false, false);
+  }
+
+  /***
+   * Time span exceeds throttle timeout, within different workflow, throttle is enabled
+   * Job will be updated
+   * @throws Exception
+   */
+  @Test
+  public void testUpdateDiffWorkflowLongPeriodThrottle()
+      throws Exception {
+    runWorkflowTest(exceedsThrottlePeriod, "UpdateDiffWorkflowLongPeriodThrottle",
+        workflowIdSuffix1, workflowIdSuffix2, workflowIdSuffix2,
+        true, false);
+  }
+
+  /***
+   * Time span exceeds throttle timeout, within different workflow, throttle is not enabled
+   * Job will be updated
+   * @throws Exception
+   */
+  @Test
+  public void testUpdateDiffWorkflowLongPeriodNoThrottle()
+      throws Exception {
+    runWorkflowTest(exceedsThrottlePeriod, "UpdateDiffWorkflowLongPeriodNoThrottle",
+        workflowIdSuffix1, workflowIdSuffix2, workflowIdSuffix2,
+        false, false);
+  }
+
+  private GobblinHelixJobScheduler createJobScheduler(HelixManager helixManager, boolean isThrottleEnabled, Clock clock) throws Exception {
+    java.nio.file.Path p = Files.createTempDirectory(GobblinHelixJobScheduler.class.getSimpleName());
     Config config = ConfigFactory.empty().withValue(ConfigurationKeys.JOB_CONFIG_FILE_GENERAL_PATH_KEY,
-        ConfigValueFactory.fromAnyRef("/tmp/" + GobblinHelixJobScheduler.class.getSimpleName()));
+        ConfigValueFactory.fromAnyRef(p.toString()));
     SchedulerService schedulerService = new SchedulerService(new Properties());
     NonObservingFSJobCatalog jobCatalog = new NonObservingFSJobCatalog(config);
     jobCatalog.startAsync();
-    GobblinHelixJobScheduler jobScheduler =
-        new GobblinHelixJobScheduler(ConfigFactory.empty(), this.helixManager, java.util.Optional.empty(),
-            new EventBus(), appWorkDir, Lists.emptyList(), schedulerService, jobCatalog);
-
-    final Properties properties1 =
-        GobblinHelixJobLauncherTest.generateJobProperties(this.baseConfig, "1", workflowIdSuffix1);
-    properties1.setProperty(GobblinClusterConfigurationKeys.CANCEL_RUNNING_JOB_ON_DELETE, "true");
+    Config helixJobSchedulerConfig = ConfigFactory.empty().withValue(GobblinClusterConfigurationKeys.HELIX_JOB_SCHEDULING_THROTTLE_ENABLED_KEY,
+        ConfigValueFactory.fromAnyRef(isThrottleEnabled));
+    GobblinHelixJobScheduler gobblinHelixJobScheduler = new GobblinHelixJobScheduler(helixJobSchedulerConfig, helixManager, java.util.Optional.empty(),
+          new EventBus(), appWorkDir, Lists.emptyList(), schedulerService, jobCatalog, clock);
+    return gobblinHelixJobScheduler;
+  }
 
+  private NewJobConfigArrivalEvent createJobConfigArrivalEvent(Properties properties) {
+    properties.setProperty(GobblinClusterConfigurationKeys.CANCEL_RUNNING_JOB_ON_DELETE, "true");
     NewJobConfigArrivalEvent newJobConfigArrivalEvent =
-        new NewJobConfigArrivalEvent(properties1.getProperty(ConfigurationKeys.JOB_NAME_KEY), properties1);
-    jobScheduler.handleNewJobConfigArrival(newJobConfigArrivalEvent);
-    properties1.setProperty(ConfigurationKeys.JOB_ID_KEY,
-        "job_" + properties1.getProperty(ConfigurationKeys.JOB_NAME_KEY) + workflowIdSuffix2);
-    Map<String, String> workflowIdMap;
-    this.helixManager.connect();
+        new NewJobConfigArrivalEvent(properties.getProperty(ConfigurationKeys.JOB_NAME_KEY), properties);
+    return newJobConfigArrivalEvent;
+  }
+
+  private void connectAndAssertWorkflowId(String expectedSuffix, String jobName, HelixManager helixManager) throws Exception {
+    helixManager.connect();
+    String workFlowId = getWorkflowID(jobName, helixManager);
+    Assert.assertNotNull(workFlowId);
+    Assert.assertTrue(workFlowId.endsWith(expectedSuffix));
+  }
 
-    String workFlowId = null;
+  private String getWorkflowID (String jobName, HelixManager helixManager)
+      throws Exception {
+    // Poll helix for up to 30 seconds to fetch until a workflow with a matching job name exists in Helix and then return that workflowID
     long endTime = System.currentTimeMillis() + 30000;
+    Map<String, String> workflowIdMap;
     while (System.currentTimeMillis() < endTime) {
-      workflowIdMap = HelixUtils.getWorkflowIdsFromJobNames(this.helixManager,
-          Collections.singletonList(newJobConfigArrivalEvent.getJobName()));
-      if (workflowIdMap.containsKey(newJobConfigArrivalEvent.getJobName())) {
-        workFlowId = workflowIdMap.get(newJobConfigArrivalEvent.getJobName());
-        break;
+      try{
+        workflowIdMap = HelixUtils.getWorkflowIdsFromJobNames(helixManager,
+            Collections.singletonList(jobName));
+      } catch(GobblinHelixUnexpectedStateException e){
+        continue;
+      }
+      if (workflowIdMap.containsKey(jobName)) {
+        return workflowIdMap.get(jobName);
       }
       Thread.sleep(100);
     }
-    Assert.assertNotNull(workFlowId);
-    Assert.assertTrue(workFlowId.endsWith(workflowIdSuffix1));
+    return null;
+  }
 
-    jobScheduler.handleUpdateJobConfigArrival(
-        new UpdateJobConfigArrivalEvent(properties1.getProperty(ConfigurationKeys.JOB_NAME_KEY), properties1));
-    this.helixManager.connect();
-    endTime = System.currentTimeMillis() + 30000;
-    while (System.currentTimeMillis() < endTime) {
-      workflowIdMap = HelixUtils.getWorkflowIdsFromJobNames(this.helixManager,
-          Collections.singletonList(newJobConfigArrivalEvent.getJobName()));
-      if (workflowIdMap.containsKey(newJobConfigArrivalEvent.getJobName())) {
-        workFlowId = workflowIdMap.get(newJobConfigArrivalEvent.getJobName());
-        break;
-      }
-      Thread.sleep(100);
+  private void runWorkflowTest(Duration mockedPeriod, String jobSuffix,
+    String newJobWorkflowIdSuffix, String updateWorkflowIdSuffix,
+    String assertUpdateWorkflowIdSuffix, boolean isThrottleEnabled, boolean isSameWorkflow) throws Exception {
+    Clock mockClock = Mockito.mock(Clock.class);
+    AtomicReference<Instant> nextInstant = new AtomicReference<>(beginTime);
+    when(mockClock.instant()).thenAnswer(invocation -> nextInstant.getAndAccumulate(nextInstant.get(), (currentInstant, x) -> currentInstant.plus(mockedPeriod)));

Review Comment:
   nextInstant.get() is not used and is just a placeholder right? Since it seems like just a placeholder value you can use something like null



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@gobblin.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [gobblin] homatthew commented on a diff in pull request #3704: [GOBBLIN-1840] Helix Job scheduler should not try to replace running workflow if within configured time

Posted by "homatthew (via GitHub)" <gi...@apache.org>.
homatthew commented on code in PR #3704:
URL: https://github.com/apache/gobblin/pull/3704#discussion_r1244481162


##########
gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinHelixJobSchedulerTest.java:
##########
@@ -129,58 +140,192 @@ public void setUp()
     this.thread.start();
   }
 
+  /***
+   * Time span exceeds throttle timeout, within same workflow, throttle is enabled
+   * Job will be updated
+   * @throws Exception
+   */
+  @Test
+  public void testUpdateSameWorkflowLongPeriodThrottle()
+      throws Exception {
+    runWorkflowTest(exceedsThrottlePeriod, "UpdateSameWorkflowLongPeriodThrottle",
+        workflowIdSuffix1, workflowIdSuffix2, workflowIdSuffix2,
+        true, true);
+  }
+
+  /***
+   * Time span is within throttle timeout, within same workflow, throttle is enabled
+   * Job will not be updated
+   * @throws Exception
+   */
+  @Test
+  public void testUpdateSameWorkflowShortPeriodThrottle()
+      throws Exception {
+    runWorkflowTest(withinThrottlePeriod, "UpdateSameWorkflowShortPeriodThrottle",
+        workflowIdSuffix1, workflowIdSuffix2, workflowIdSuffix1,
+        true, true);
+  }
+
+  /***
+   * Time span exceeds throttle timeout, within same workflow, throttle is not enabled
+   * Job will be updated
+   * @throws Exception
+   */
+  @Test
+  public void testUpdateSameWorkflowLongPeriodNoThrottle()
+      throws Exception {
+    runWorkflowTest(exceedsThrottlePeriod, "UpdateSameWorkflowLongPeriodNoThrottle",
+        workflowIdSuffix1, workflowIdSuffix2, workflowIdSuffix2,
+        false, true);
+  }
+
+  /***
+   * Time span is within throttle timeout, within same workflow, throttle is not enabled
+   * Job will be updated
+   * @throws Exception
+   */
   @Test
-  public void testNewJobAndUpdate()
+  public void testUpdateSameWorkflowShortPeriodNoThrottle()
+      throws Exception {
+    runWorkflowTest(withinThrottlePeriod, "UpdateSameWorkflowShortPeriodNoThrottle",
+        workflowIdSuffix1, workflowIdSuffix2, workflowIdSuffix2,
+        false, true);
+  }
+
+  /***
+   * Time span is within throttle timeout, within different workflow, throttle is enabled
+   * Job will be updated
+   * @throws Exception
+   */
+  public void testUpdateDiffWorkflowShortPeriodThrottle()
       throws Exception {
+    runWorkflowTest(withinThrottlePeriod, "UpdateDiffWorkflowShortPeriodThrottle",
+        workflowIdSuffix1, workflowIdSuffix2, workflowIdSuffix2,
+        true, false);
+  }
+
+  /***
+   * Time span is within throttle timeout, within different workflow, throttle is not enabled
+   * Job will be updated
+   * @throws Exception
+   */
+  @Test
+  public void testUpdateDiffWorkflowShortPeriodNoThrottle()
+      throws Exception {
+    runWorkflowTest(withinThrottlePeriod, "UpdateDiffWorkflowShortPeriodNoThrottle",
+        workflowIdSuffix1, workflowIdSuffix2, workflowIdSuffix2,
+        false, false);
+  }
+
+  /***
+   * Time span exceeds throttle timeout, within different workflow, throttle is enabled
+   * Job will be updated
+   * @throws Exception
+   */
+  @Test
+  public void testUpdateDiffWorkflowLongPeriodThrottle()
+      throws Exception {
+    runWorkflowTest(exceedsThrottlePeriod, "UpdateDiffWorkflowLongPeriodThrottle",
+        workflowIdSuffix1, workflowIdSuffix2, workflowIdSuffix2,
+        true, false);
+  }
+
+  /***
+   * Time span exceeds throttle timeout, within different workflow, throttle is not enabled
+   * Job will be updated
+   * @throws Exception
+   */
+  @Test
+  public void testUpdateDiffWorkflowLongPeriodNoThrottle()
+      throws Exception {
+    runWorkflowTest(exceedsThrottlePeriod, "UpdateDiffWorkflowLongPeriodNoThrottle",
+        workflowIdSuffix1, workflowIdSuffix2, workflowIdSuffix2,
+        false, false);
+  }
+
+  private GobblinHelixJobScheduler createJobScheduler(HelixManager helixManager, boolean isThrottleEnabled, Clock clock) throws Exception {
+    java.nio.file.Path p = Files.createTempDirectory(GobblinHelixJobScheduler.class.getSimpleName());
     Config config = ConfigFactory.empty().withValue(ConfigurationKeys.JOB_CONFIG_FILE_GENERAL_PATH_KEY,
-        ConfigValueFactory.fromAnyRef("/tmp/" + GobblinHelixJobScheduler.class.getSimpleName()));
+        ConfigValueFactory.fromAnyRef(p.toString()));
     SchedulerService schedulerService = new SchedulerService(new Properties());
     NonObservingFSJobCatalog jobCatalog = new NonObservingFSJobCatalog(config);
     jobCatalog.startAsync();
-    GobblinHelixJobScheduler jobScheduler =
-        new GobblinHelixJobScheduler(ConfigFactory.empty(), this.helixManager, java.util.Optional.empty(),
-            new EventBus(), appWorkDir, Lists.emptyList(), schedulerService, jobCatalog);
-
-    final Properties properties1 =
-        GobblinHelixJobLauncherTest.generateJobProperties(this.baseConfig, "1", workflowIdSuffix1);
-    properties1.setProperty(GobblinClusterConfigurationKeys.CANCEL_RUNNING_JOB_ON_DELETE, "true");
+    Config helixJobSchedulerConfig = ConfigFactory.empty().withValue(GobblinClusterConfigurationKeys.HELIX_JOB_SCHEDULING_THROTTLE_ENABLED_KEY,
+        ConfigValueFactory.fromAnyRef(isThrottleEnabled));
+    GobblinHelixJobScheduler gobblinHelixJobScheduler = new GobblinHelixJobScheduler(helixJobSchedulerConfig, helixManager, java.util.Optional.empty(),
+          new EventBus(), appWorkDir, Lists.emptyList(), schedulerService, jobCatalog, clock);
+    return gobblinHelixJobScheduler;
+  }
 
+  private NewJobConfigArrivalEvent createJobConfigArrivalEvent(Properties properties) {
+    properties.setProperty(GobblinClusterConfigurationKeys.CANCEL_RUNNING_JOB_ON_DELETE, "true");
     NewJobConfigArrivalEvent newJobConfigArrivalEvent =
-        new NewJobConfigArrivalEvent(properties1.getProperty(ConfigurationKeys.JOB_NAME_KEY), properties1);
-    jobScheduler.handleNewJobConfigArrival(newJobConfigArrivalEvent);
-    properties1.setProperty(ConfigurationKeys.JOB_ID_KEY,
-        "job_" + properties1.getProperty(ConfigurationKeys.JOB_NAME_KEY) + workflowIdSuffix2);
-    Map<String, String> workflowIdMap;
-    this.helixManager.connect();
+        new NewJobConfigArrivalEvent(properties.getProperty(ConfigurationKeys.JOB_NAME_KEY), properties);
+    return newJobConfigArrivalEvent;
+  }
+
+  private void connectAndAssertWorkflowId(String expectedSuffix, String jobName, HelixManager helixManager) throws Exception {
+    helixManager.connect();
+    String workFlowId = getWorkflowID(jobName, helixManager);
+    Assert.assertNotNull(workFlowId);
+    Assert.assertTrue(workFlowId.endsWith(expectedSuffix));
+  }
 
-    String workFlowId = null;
+  private String getWorkflowID (String jobName, HelixManager helixManager)
+      throws Exception {
+    // Poll helix for up to 30 seconds to fetch until a workflow with a matching job name exists in Helix and then return that workflowID
     long endTime = System.currentTimeMillis() + 30000;
+    Map<String, String> workflowIdMap;
     while (System.currentTimeMillis() < endTime) {
-      workflowIdMap = HelixUtils.getWorkflowIdsFromJobNames(this.helixManager,
-          Collections.singletonList(newJobConfigArrivalEvent.getJobName()));
-      if (workflowIdMap.containsKey(newJobConfigArrivalEvent.getJobName())) {
-        workFlowId = workflowIdMap.get(newJobConfigArrivalEvent.getJobName());
-        break;
+      try{
+        workflowIdMap = HelixUtils.getWorkflowIdsFromJobNames(helixManager,
+            Collections.singletonList(jobName));
+      } catch(GobblinHelixUnexpectedStateException e){
+        continue;
+      }
+      if (workflowIdMap.containsKey(jobName)) {
+        return workflowIdMap.get(jobName);
       }
       Thread.sleep(100);
     }
-    Assert.assertNotNull(workFlowId);
-    Assert.assertTrue(workFlowId.endsWith(workflowIdSuffix1));
+    return null;
+  }
 
-    jobScheduler.handleUpdateJobConfigArrival(
-        new UpdateJobConfigArrivalEvent(properties1.getProperty(ConfigurationKeys.JOB_NAME_KEY), properties1));
-    this.helixManager.connect();
-    endTime = System.currentTimeMillis() + 30000;
-    while (System.currentTimeMillis() < endTime) {
-      workflowIdMap = HelixUtils.getWorkflowIdsFromJobNames(this.helixManager,
-          Collections.singletonList(newJobConfigArrivalEvent.getJobName()));
-      if (workflowIdMap.containsKey(newJobConfigArrivalEvent.getJobName())) {
-        workFlowId = workflowIdMap.get(newJobConfigArrivalEvent.getJobName());
-        break;
-      }
-      Thread.sleep(100);
+  private void runWorkflowTest(Duration mockedPeriod, String jobSuffix,

Review Comment:
   A java doc that describes what these variables are so that future people can use this method would be helpful. 
   
   Also, mockedPeriod is a bit of a weird name. Since you're now using it to represent the step amount each time clock.instant() is called. 
   
   Is this really necessary? In your original implementation it was just returning a final Instant defined at the beginning which was a bit easier to reason about. But now we sort of rely on how many times clock.instant() is called to know what the current time is



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@gobblin.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [gobblin] Peiyingy commented on a diff in pull request #3704: [GOBBLIN-1840] Helix Job scheduler should not try to replace running workflow if within configured time

Posted by "Peiyingy (via GitHub)" <gi...@apache.org>.
Peiyingy commented on code in PR #3704:
URL: https://github.com/apache/gobblin/pull/3704#discussion_r1231416602


##########
gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinHelixJobSchedulerTest.java:
##########
@@ -132,57 +143,260 @@ public void setUp()
   @Test
   public void testNewJobAndUpdate()
       throws Exception {
+    try (MockedStatic<Instant> mocked = mockStatic(Instant.class, CALLS_REAL_METHODS)) {
+      mocked.when(Instant::now).thenReturn(beginTime, longPeriod);
+      HelixManager helixManager = HelixManagerFactory
+          .getZKHelixManager(helixClusterName, TestHelper.TEST_HELIX_INSTANCE_NAME, InstanceType.CONTROLLER,
+              zkConnectingString);
+
+      GobblinHelixJobScheduler jobScheduler = createJobScheduler(helixManager);
+
+      final Properties properties1 =
+          GobblinHelixJobLauncherTest.generateJobProperties(
+              this.baseConfig, "NewJobAndUpdate", workflowIdSuffix1);
+      NewJobConfigArrivalEvent newJobConfigArrivalEvent = createJobConfigArrivalEvent(properties1, workflowIdSuffix2);
+      jobScheduler.handleNewJobConfigArrival(newJobConfigArrivalEvent);
+
+      connectAndAssertWorkflowId(workflowIdSuffix1, newJobConfigArrivalEvent, helixManager);
+
+      properties1.setProperty(GobblinClusterConfigurationKeys.HELIX_JOB_SCHEDULING_THROTTLE_ENABLED_KEY, "true");
+      jobScheduler.handleUpdateJobConfigArrival(
+          new UpdateJobConfigArrivalEvent(properties1.getProperty(ConfigurationKeys.JOB_NAME_KEY), properties1));
+
+      connectAndAssertWorkflowId(workflowIdSuffix2, newJobConfigArrivalEvent, helixManager);
+    }
+  }
+
+  @Test
+  public void testUpdateSameWorkflowShortPeriodThrottle()
+      throws Exception {
+    try (MockedStatic<Instant> mocked = mockStatic(Instant.class, CALLS_REAL_METHODS)) {
+      mocked.when(Instant::now).thenReturn(beginTime, shortPeriod);
+      HelixManager helixManager = HelixManagerFactory

Review Comment:
   I've put comments within the line. I'm kind of confused by where's the Javadoc that I should also add comments on



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@gobblin.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [gobblin] Peiyingy commented on a diff in pull request #3704: [GOBBLIN-1840] Helix Job scheduler should not try to replace running workflow if within configured time

Posted by "Peiyingy (via GitHub)" <gi...@apache.org>.
Peiyingy commented on code in PR #3704:
URL: https://github.com/apache/gobblin/pull/3704#discussion_r1231294898


##########
gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinHelixJobSchedulerTest.java:
##########
@@ -132,57 +143,260 @@ public void setUp()
   @Test
   public void testNewJobAndUpdate()
       throws Exception {
+    try (MockedStatic<Instant> mocked = mockStatic(Instant.class, CALLS_REAL_METHODS)) {
+      mocked.when(Instant::now).thenReturn(beginTime, longPeriod);
+      HelixManager helixManager = HelixManagerFactory
+          .getZKHelixManager(helixClusterName, TestHelper.TEST_HELIX_INSTANCE_NAME, InstanceType.CONTROLLER,
+              zkConnectingString);
+
+      GobblinHelixJobScheduler jobScheduler = createJobScheduler(helixManager);
+
+      final Properties properties1 =
+          GobblinHelixJobLauncherTest.generateJobProperties(
+              this.baseConfig, "NewJobAndUpdate", workflowIdSuffix1);
+      NewJobConfigArrivalEvent newJobConfigArrivalEvent = createJobConfigArrivalEvent(properties1, workflowIdSuffix2);
+      jobScheduler.handleNewJobConfigArrival(newJobConfigArrivalEvent);
+
+      connectAndAssertWorkflowId(workflowIdSuffix1, newJobConfigArrivalEvent, helixManager);
+
+      properties1.setProperty(GobblinClusterConfigurationKeys.HELIX_JOB_SCHEDULING_THROTTLE_ENABLED_KEY, "true");
+      jobScheduler.handleUpdateJobConfigArrival(
+          new UpdateJobConfigArrivalEvent(properties1.getProperty(ConfigurationKeys.JOB_NAME_KEY), properties1));
+
+      connectAndAssertWorkflowId(workflowIdSuffix2, newJobConfigArrivalEvent, helixManager);
+    }
+  }
+
+  @Test
+  public void testUpdateSameWorkflowShortPeriodThrottle()
+      throws Exception {
+    try (MockedStatic<Instant> mocked = mockStatic(Instant.class, CALLS_REAL_METHODS)) {
+      mocked.when(Instant::now).thenReturn(beginTime, shortPeriod);
+      HelixManager helixManager = HelixManagerFactory

Review Comment:
   The original approach was to use that across tests, but when the test number increases, it would cause the error HelixManager (ZkClient) is not connected, so I changed that to local variable to avoid this problem.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@gobblin.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [gobblin] Peiyingy commented on a diff in pull request #3704: [GOBBLIN-1840] Helix Job scheduler should not try to replace running workflow if within configured time

Posted by "Peiyingy (via GitHub)" <gi...@apache.org>.
Peiyingy commented on code in PR #3704:
URL: https://github.com/apache/gobblin/pull/3704#discussion_r1231305948


##########
gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinHelixJobSchedulerTest.java:
##########
@@ -96,16 +107,16 @@ public void setUp()
             ConfigValueFactory.fromAnyRef(sourceJsonFile.getAbsolutePath()))
         .withValue(ConfigurationKeys.JOB_STATE_IN_STATE_STORE, ConfigValueFactory.fromAnyRef("true")).resolve();
 
-    String zkConnectingString = baseConfig.getString(GobblinClusterConfigurationKeys.ZK_CONNECTION_STRING_KEY);
-    String helixClusterName = baseConfig.getString(GobblinClusterConfigurationKeys.HELIX_CLUSTER_NAME_KEY);
+    zkConnectingString = baseConfig.getString(GobblinClusterConfigurationKeys.ZK_CONNECTION_STRING_KEY);
+    helixClusterName = baseConfig.getString(GobblinClusterConfigurationKeys.HELIX_CLUSTER_NAME_KEY);
 
     HelixUtils.createGobblinHelixCluster(zkConnectingString, helixClusterName);
 
-    this.helixManager = HelixManagerFactory
+    HelixManager helixManager = HelixManagerFactory

Review Comment:
   Yes, this part is no longer required as helixManager is not a global variable anymore, I'll remove that.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@gobblin.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [gobblin] Peiyingy commented on a diff in pull request #3704: [GOBBLIN-1840] Helix Job scheduler should not try to replace running workflow if within configured time

Posted by "Peiyingy (via GitHub)" <gi...@apache.org>.
Peiyingy commented on code in PR #3704:
URL: https://github.com/apache/gobblin/pull/3704#discussion_r1231416602


##########
gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinHelixJobSchedulerTest.java:
##########
@@ -132,57 +143,260 @@ public void setUp()
   @Test
   public void testNewJobAndUpdate()
       throws Exception {
+    try (MockedStatic<Instant> mocked = mockStatic(Instant.class, CALLS_REAL_METHODS)) {
+      mocked.when(Instant::now).thenReturn(beginTime, longPeriod);
+      HelixManager helixManager = HelixManagerFactory
+          .getZKHelixManager(helixClusterName, TestHelper.TEST_HELIX_INSTANCE_NAME, InstanceType.CONTROLLER,
+              zkConnectingString);
+
+      GobblinHelixJobScheduler jobScheduler = createJobScheduler(helixManager);
+
+      final Properties properties1 =
+          GobblinHelixJobLauncherTest.generateJobProperties(
+              this.baseConfig, "NewJobAndUpdate", workflowIdSuffix1);
+      NewJobConfigArrivalEvent newJobConfigArrivalEvent = createJobConfigArrivalEvent(properties1, workflowIdSuffix2);
+      jobScheduler.handleNewJobConfigArrival(newJobConfigArrivalEvent);
+
+      connectAndAssertWorkflowId(workflowIdSuffix1, newJobConfigArrivalEvent, helixManager);
+
+      properties1.setProperty(GobblinClusterConfigurationKeys.HELIX_JOB_SCHEDULING_THROTTLE_ENABLED_KEY, "true");
+      jobScheduler.handleUpdateJobConfigArrival(
+          new UpdateJobConfigArrivalEvent(properties1.getProperty(ConfigurationKeys.JOB_NAME_KEY), properties1));
+
+      connectAndAssertWorkflowId(workflowIdSuffix2, newJobConfigArrivalEvent, helixManager);
+    }
+  }
+
+  @Test
+  public void testUpdateSameWorkflowShortPeriodThrottle()
+      throws Exception {
+    try (MockedStatic<Instant> mocked = mockStatic(Instant.class, CALLS_REAL_METHODS)) {
+      mocked.when(Instant::now).thenReturn(beginTime, shortPeriod);
+      HelixManager helixManager = HelixManagerFactory

Review Comment:
   I've put comments within the line. I'm kind of confused by where's the Javadoc that I should also add comments on



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@gobblin.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [gobblin] umustafi commented on a diff in pull request #3704: [GOBBLIN-1840] Helix Job scheduler should not try to replace running workflow if within configured time

Posted by "umustafi (via GitHub)" <gi...@apache.org>.
umustafi commented on code in PR #3704:
URL: https://github.com/apache/gobblin/pull/3704#discussion_r1231628803


##########
gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobScheduler.java:
##########
@@ -333,6 +345,20 @@ public void handleNewJobConfigArrival(NewJobConfigArrivalEvent newJobArrival) {
   @Subscribe
   public void handleUpdateJobConfigArrival(UpdateJobConfigArrivalEvent updateJobArrival) {
     LOGGER.info("Received update for job configuration of job " + updateJobArrival.getJobName());
+    String jobName = updateJobArrival.getJobName();
+    boolean isThrottleEnabled = PropertiesUtils.getPropAsBoolean(updateJobArrival.getJobConfig(),
+        GobblinClusterConfigurationKeys.HELIX_JOB_SCHEDULING_THROTTLE_ENABLED_KEY,
+        String.valueOf(GobblinClusterConfigurationKeys.DEFAULT_HELIX_JOB_SCHEDULING_THROTTLE_ENABLED_KEY));
+
+    if (isThrottleEnabled && this.jobNameToStartTimeMap.containsKey(jobName)) {
+      Instant jobStartTime = this.jobNameToStartTimeMap.get(jobName);
+      Duration workflowRunningDuration = Duration.between(jobStartTime, Instant.now());
+      if (workflowRunningDuration.minus(throttleTimeoutDurationSecs).isNegative()) {
+        LOGGER.info("Replanning is skipped for job {} ", jobName);

Review Comment:
   nit: can add "skipped due to job being started close to another start" or "due to scheduling throttle enabled..."



##########
gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinHelixJobSchedulerTest.java:
##########
@@ -132,57 +143,260 @@ public void setUp()
   @Test
   public void testNewJobAndUpdate()
       throws Exception {
+    try (MockedStatic<Instant> mocked = mockStatic(Instant.class, CALLS_REAL_METHODS)) {
+      mocked.when(Instant::now).thenReturn(beginTime, longPeriod);
+      HelixManager helixManager = HelixManagerFactory
+          .getZKHelixManager(helixClusterName, TestHelper.TEST_HELIX_INSTANCE_NAME, InstanceType.CONTROLLER,
+              zkConnectingString);
+
+      GobblinHelixJobScheduler jobScheduler = createJobScheduler(helixManager);
+
+      final Properties properties1 =
+          GobblinHelixJobLauncherTest.generateJobProperties(
+              this.baseConfig, "NewJobAndUpdate", workflowIdSuffix1);
+      NewJobConfigArrivalEvent newJobConfigArrivalEvent = createJobConfigArrivalEvent(properties1, workflowIdSuffix2);
+      jobScheduler.handleNewJobConfigArrival(newJobConfigArrivalEvent);
+
+      connectAndAssertWorkflowId(workflowIdSuffix1, newJobConfigArrivalEvent, helixManager);
+
+      properties1.setProperty(GobblinClusterConfigurationKeys.HELIX_JOB_SCHEDULING_THROTTLE_ENABLED_KEY, "true");
+      jobScheduler.handleUpdateJobConfigArrival(
+          new UpdateJobConfigArrivalEvent(properties1.getProperty(ConfigurationKeys.JOB_NAME_KEY), properties1));
+
+      connectAndAssertWorkflowId(workflowIdSuffix2, newJobConfigArrivalEvent, helixManager);
+    }
+  }
+
+  @Test
+  public void testUpdateSameWorkflowShortPeriodThrottle()
+      throws Exception {
+    try (MockedStatic<Instant> mocked = mockStatic(Instant.class, CALLS_REAL_METHODS)) {
+      mocked.when(Instant::now).thenReturn(beginTime, shortPeriod);
+      HelixManager helixManager = HelixManagerFactory
+          .getZKHelixManager(helixClusterName, TestHelper.TEST_HELIX_INSTANCE_NAME, InstanceType.CONTROLLER,
+              zkConnectingString);
+
+      GobblinHelixJobScheduler jobScheduler = createJobScheduler(helixManager);
+
+      final Properties properties1 =
+          GobblinHelixJobLauncherTest.generateJobProperties(
+              this.baseConfig, "UpdateSameWorkflowShortPeriodThrottle", workflowIdSuffix1);
+      NewJobConfigArrivalEvent newJobConfigArrivalEvent = createJobConfigArrivalEvent(properties1, workflowIdSuffix2);
+      jobScheduler.handleNewJobConfigArrival(newJobConfigArrivalEvent);
+      connectAndAssertWorkflowId(workflowIdSuffix1, newJobConfigArrivalEvent, helixManager);
+
+      properties1.setProperty(GobblinClusterConfigurationKeys.HELIX_JOB_SCHEDULING_THROTTLE_ENABLED_KEY, "true");
+      jobScheduler.handleUpdateJobConfigArrival(
+          new UpdateJobConfigArrivalEvent(properties1.getProperty(ConfigurationKeys.JOB_NAME_KEY), properties1));
+
+      connectAndAssertWorkflowId(workflowIdSuffix1, newJobConfigArrivalEvent, helixManager);
+    }
+  }
+
+  @Test
+  public void testUpdateSameWorkflowLongPeriodNoThrottle()
+      throws Exception {
+    try (MockedStatic<Instant> mocked = mockStatic(Instant.class, CALLS_REAL_METHODS)) {
+      mocked.when(Instant::now).thenReturn(beginTime, longPeriod);
+      HelixManager helixManager = HelixManagerFactory
+          .getZKHelixManager(helixClusterName, TestHelper.TEST_HELIX_INSTANCE_NAME, InstanceType.CONTROLLER,
+              zkConnectingString);
+      GobblinHelixJobScheduler jobScheduler = createJobScheduler(helixManager);
+      final Properties properties1 =
+          GobblinHelixJobLauncherTest.generateJobProperties(
+              this.baseConfig, "UpdateSameWorkflowLongPeriodNoThrottle", workflowIdSuffix1);
+      NewJobConfigArrivalEvent newJobConfigArrivalEvent = createJobConfigArrivalEvent(properties1, workflowIdSuffix2);
+      jobScheduler.handleNewJobConfigArrival(newJobConfigArrivalEvent);
+      connectAndAssertWorkflowId(workflowIdSuffix1, newJobConfigArrivalEvent, helixManager);
+
+      properties1.setProperty(GobblinClusterConfigurationKeys.HELIX_JOB_SCHEDULING_THROTTLE_ENABLED_KEY, "false");
+      jobScheduler.handleUpdateJobConfigArrival(
+          new UpdateJobConfigArrivalEvent(properties1.getProperty(ConfigurationKeys.JOB_NAME_KEY), properties1));
+      connectAndAssertWorkflowId(workflowIdSuffix2, newJobConfigArrivalEvent, helixManager);
+    }
+  }
+
+  @Test
+  public void testUpdateSameWorkflowShortPeriodNoThrottle()

Review Comment:
   nice work in updating all the tests



##########
gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinHelixJobSchedulerTest.java:
##########
@@ -52,16 +55,23 @@
 import org.apache.gobblin.runtime.job_catalog.NonObservingFSJobCatalog;
 import org.apache.gobblin.scheduler.SchedulerService;
 
+import static org.mockito.Mockito.*;
+
 
 /**
  * Unit tests for {@link org.apache.gobblin.cluster.GobblinHelixJobScheduler}.
  *
  */
+
+/**
+ * In all test cases, we use HelixManager as a local variable to avoid

Review Comment:
   nice helpful description, let's put this with the other comment above in one block



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@gobblin.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [gobblin] ZihanLi58 commented on a diff in pull request #3704: [GOBBLIN-1840] Helix Job scheduler should not try to replace running workflow if within configured time

Posted by "ZihanLi58 (via GitHub)" <gi...@apache.org>.
ZihanLi58 commented on code in PR #3704:
URL: https://github.com/apache/gobblin/pull/3704#discussion_r1245763554


##########
gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinThrottlingHelixJobLauncherListener.java:
##########
@@ -0,0 +1,79 @@
+package org.apache.gobblin.cluster;
+
+import java.time.Clock;
+import java.time.Duration;
+import java.time.Instant;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.runtime.JobContext;
+import org.apache.gobblin.runtime.JobState;
+
+
+/**
+ * A job listener used when {@link GobblinHelixJobLauncher} launches a job.
+ * In {@link GobblinHelixJobScheduler}, when throttling is enabled, this
+ * listener would record jobName to next schedulable time to decide whether
+ * the replanning should be executed or skipped.
+ */
+@Slf4j
+public class GobblinThrottlingHelixJobLauncherListener extends GobblinHelixJobLauncherListener {
+
+  public final static Logger LOG = LoggerFactory.getLogger(GobblinThrottlingHelixJobLauncherListener.class);
+  private ConcurrentHashMap<String, Instant> jobNameToNextSchedulableTime;
+  private Duration helixJobSchedulingThrottleTimeout;
+  private Clock clock;
+
+  public GobblinThrottlingHelixJobLauncherListener(GobblinHelixJobLauncherMetrics jobLauncherMetrics,
+      ConcurrentHashMap<String, Instant> jobNameToNextSchedulableTime, Duration helixJobSchedulingThrottleTimeout, Clock clock) {
+    super(jobLauncherMetrics);
+    this.jobNameToNextSchedulableTime = jobNameToNextSchedulableTime;
+    this.helixJobSchedulingThrottleTimeout = helixJobSchedulingThrottleTimeout;
+    this.clock = clock;
+  }
+
+  @Override
+  public void onJobPrepare(JobContext jobContext)
+      throws Exception {
+    super.onJobPrepare(jobContext);
+    Instant nextSchedulableTime = clock.instant().plus(helixJobSchedulingThrottleTimeout);
+    jobNameToNextSchedulableTime.put(jobContext.getJobName(), nextSchedulableTime);
+    LOG.info("{} finished preparing. The next schedulable time is {}", jobContext.getJobName(), nextSchedulableTime);
+  }
+
+  @Override
+  public void onJobStart(JobContext jobContext)
+      throws Exception {
+    super.onJobStart(jobContext);
+    Instant nextSchedulableTime = clock.instant().plus(helixJobSchedulingThrottleTimeout);
+    jobNameToNextSchedulableTime.put(jobContext.getJobName(), nextSchedulableTime);
+    LOG.info("{} has started. The next schedulable time is {}", jobContext.getJobName(), nextSchedulableTime);
+  }
+
+  @Override
+  public void onJobCompletion(JobContext jobContext)
+      throws Exception {
+    super.onJobCompletion(jobContext);
+    if (jobContext.getJobState().getState() == JobState.RunningState.FAILED) {
+      jobNameToNextSchedulableTime.put(jobContext.getJobName(), Instant.EPOCH);
+      LOG.info("{} failed. The next schedulable time is {} so that any future schedule attempts will be allowed.",

Review Comment:
   "{} failed. -> {} completed?



##########
gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobScheduler.java:
##########
@@ -303,36 +342,66 @@ public Object get(long timeout, TimeUnit unit) throws InterruptedException, Exec
   }
 
   @Subscribe
-  public void handleNewJobConfigArrival(NewJobConfigArrivalEvent newJobArrival) {
-    String jobUri = newJobArrival.getJobName();
-    LOGGER.info("Received new job configuration of job " + jobUri);
+  public synchronized void handleNewJobConfigArrival(NewJobConfigArrivalEvent newJobArrival) {
+    String jobName = newJobArrival.getJobName();
+    LOGGER.info("Received new job configuration of job " + jobName);
+
+    Instant nextSchedulableTime = jobNameToNextSchedulableTime.getOrDefault(jobName, Instant.EPOCH);
+    if (this.isThrottleEnabled && clock.instant().isBefore(nextSchedulableTime)) {
+      LOGGER.info("Adding new job is skipped for job {}. Current time is {} and the next schedulable time would be {}",
+          jobName,
+          clock.instant(),
+          nextSchedulableTime
+      );
+      return;
+    }
+    nextSchedulableTime = clock.instant().plus(jobSchedulingThrottleTimeout);

Review Comment:
   Should we only add entry to jobNameToNextSchedulableTime when the throttle is enabled? it is a hash map, where we can easily see memory leak when we not delete the entry properly 



##########
gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinThrottlingHelixJobLauncherListener.java:
##########
@@ -0,0 +1,79 @@
+package org.apache.gobblin.cluster;
+
+import java.time.Clock;
+import java.time.Duration;
+import java.time.Instant;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.runtime.JobContext;
+import org.apache.gobblin.runtime.JobState;
+
+
+/**
+ * A job listener used when {@link GobblinHelixJobLauncher} launches a job.
+ * In {@link GobblinHelixJobScheduler}, when throttling is enabled, this
+ * listener would record jobName to next schedulable time to decide whether
+ * the replanning should be executed or skipped.
+ */
+@Slf4j
+public class GobblinThrottlingHelixJobLauncherListener extends GobblinHelixJobLauncherListener {
+
+  public final static Logger LOG = LoggerFactory.getLogger(GobblinThrottlingHelixJobLauncherListener.class);
+  private ConcurrentHashMap<String, Instant> jobNameToNextSchedulableTime;
+  private Duration helixJobSchedulingThrottleTimeout;
+  private Clock clock;
+
+  public GobblinThrottlingHelixJobLauncherListener(GobblinHelixJobLauncherMetrics jobLauncherMetrics,
+      ConcurrentHashMap<String, Instant> jobNameToNextSchedulableTime, Duration helixJobSchedulingThrottleTimeout, Clock clock) {
+    super(jobLauncherMetrics);
+    this.jobNameToNextSchedulableTime = jobNameToNextSchedulableTime;
+    this.helixJobSchedulingThrottleTimeout = helixJobSchedulingThrottleTimeout;
+    this.clock = clock;
+  }
+
+  @Override
+  public void onJobPrepare(JobContext jobContext)

Review Comment:
   Why for the same job, why we try to update the schedulable time three times? once when we handle the message, once when we prepare the job, once when job start. This will be confusing reading the log. 
   If we have concerns about race conditions when we handle messages, can we only update it when handling a message



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@gobblin.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [gobblin] homatthew commented on a diff in pull request #3704: [GOBBLIN-1840] Helix Job scheduler should not try to replace running workflow if within configured time

Posted by "homatthew (via GitHub)" <gi...@apache.org>.
homatthew commented on code in PR #3704:
URL: https://github.com/apache/gobblin/pull/3704#discussion_r1244406476


##########
gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobScheduler.java:
##########
@@ -357,19 +370,22 @@ public synchronized void handleNewJobConfigArrival(NewJobConfigArrivalEvent newJ
       }
     } catch (JobException je) {
       LOGGER.error("Failed to schedule or run job " + jobUri, je);

Review Comment:
   Update this log to say that you are resetting the clock



##########
gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinHelixJobSchedulerTest.java:
##########
@@ -264,59 +264,61 @@ private NewJobConfigArrivalEvent createJobConfigArrivalEvent(Properties properti
     return newJobConfigArrivalEvent;
   }
 
-  private void connectAndAssertWorkflowId(String expectedSuffix, NewJobConfigArrivalEvent newJobConfigArrivalEvent, HelixManager helixManager ) throws Exception {
+  private void connectAndAssertWorkflowId(String expectedSuffix, String jobName, HelixManager helixManager) throws Exception {
     helixManager.connect();
-    String workFlowId = getWorkflowID(newJobConfigArrivalEvent, helixManager);
+    String workFlowId = getWorkflowID(jobName, helixManager);
     Assert.assertNotNull(workFlowId);
     Assert.assertTrue(workFlowId.endsWith(expectedSuffix));
   }
 
-  private String getWorkflowID (NewJobConfigArrivalEvent newJobConfigArrivalEvent, HelixManager helixManager)
+  private String getWorkflowID (String jobName, HelixManager helixManager)
       throws Exception {
     // Poll helix for up to 30 seconds to fetch until a workflow with a matching job name exists in Helix and then return that workflowID
     long endTime = System.currentTimeMillis() + 30000;
     Map<String, String> workflowIdMap;
     while (System.currentTimeMillis() < endTime) {
       try{
         workflowIdMap = HelixUtils.getWorkflowIdsFromJobNames(helixManager,
-            Collections.singletonList(newJobConfigArrivalEvent.getJobName()));
+            Collections.singletonList(jobName));
       } catch(GobblinHelixUnexpectedStateException e){
         continue;
       }
-      if (workflowIdMap.containsKey(newJobConfigArrivalEvent.getJobName())) {
-        return workflowIdMap.get(newJobConfigArrivalEvent.getJobName());
+      if (workflowIdMap.containsKey(jobName)) {
+        return workflowIdMap.get(jobName);
       }
       Thread.sleep(100);
     }
     return null;
   }
 
-  private void runWorkflowTest(Instant mockedTime, String jobSuffix,
+  private void runWorkflowTest(Duration mockedTime, String jobSuffix,

Review Comment:
   Does `mockedTime` still make sense? Seems like it now represents a step duration for incrementing the clock forward in time



##########
gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobScheduler.java:
##########
@@ -357,19 +370,22 @@ public synchronized void handleNewJobConfigArrival(NewJobConfigArrivalEvent newJ
       }
     } catch (JobException je) {
       LOGGER.error("Failed to schedule or run job " + jobUri, je);
+      jobNameToNextSchedulableTime.put(jobUri, Instant.EPOCH);
     }
   }
 
   @Subscribe
   public synchronized void handleUpdateJobConfigArrival(UpdateJobConfigArrivalEvent updateJobArrival) {
     LOGGER.info("Received update for job configuration of job " + updateJobArrival.getJobName());
-    String jobName = updateJobArrival.getJobName();
+    String jobUri = updateJobArrival.getJobName();

Review Comment:
   Let's not mix up the usage of job uri and job name. If you are gonna use job name (e.g. jobNameToNextSchedulableTime), then use the term job name everywhere. And if you are gonna use job uri, then change it for all of them 



##########
gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinHelixJobSchedulerTest.java:
##########
@@ -264,59 +264,61 @@ private NewJobConfigArrivalEvent createJobConfigArrivalEvent(Properties properti
     return newJobConfigArrivalEvent;
   }
 
-  private void connectAndAssertWorkflowId(String expectedSuffix, NewJobConfigArrivalEvent newJobConfigArrivalEvent, HelixManager helixManager ) throws Exception {
+  private void connectAndAssertWorkflowId(String expectedSuffix, String jobName, HelixManager helixManager) throws Exception {
     helixManager.connect();
-    String workFlowId = getWorkflowID(newJobConfigArrivalEvent, helixManager);
+    String workFlowId = getWorkflowID(jobName, helixManager);
     Assert.assertNotNull(workFlowId);
     Assert.assertTrue(workFlowId.endsWith(expectedSuffix));
   }
 
-  private String getWorkflowID (NewJobConfigArrivalEvent newJobConfigArrivalEvent, HelixManager helixManager)
+  private String getWorkflowID (String jobName, HelixManager helixManager)
       throws Exception {
     // Poll helix for up to 30 seconds to fetch until a workflow with a matching job name exists in Helix and then return that workflowID
     long endTime = System.currentTimeMillis() + 30000;
     Map<String, String> workflowIdMap;
     while (System.currentTimeMillis() < endTime) {
       try{
         workflowIdMap = HelixUtils.getWorkflowIdsFromJobNames(helixManager,
-            Collections.singletonList(newJobConfigArrivalEvent.getJobName()));
+            Collections.singletonList(jobName));
       } catch(GobblinHelixUnexpectedStateException e){
         continue;
       }
-      if (workflowIdMap.containsKey(newJobConfigArrivalEvent.getJobName())) {
-        return workflowIdMap.get(newJobConfigArrivalEvent.getJobName());
+      if (workflowIdMap.containsKey(jobName)) {
+        return workflowIdMap.get(jobName);
       }
       Thread.sleep(100);
     }
     return null;
   }
 
-  private void runWorkflowTest(Instant mockedTime, String jobSuffix,
+  private void runWorkflowTest(Duration mockedTime, String jobSuffix,
     String newJobWorkflowIdSuffix, String updateWorkflowIdSuffix,
     String assertUpdateWorkflowIdSuffix, boolean isThrottleEnabled, boolean isSameWorkflow) throws Exception {
     Clock mockClock = Mockito.mock(Clock.class);
-    AtomicInteger count = new AtomicInteger(0);
-    when(mockClock.instant()).thenAnswer(invocation -> count.getAndIncrement() == 0 ? beginTime : mockedTime);
+    AtomicReference<Instant> nextInstant = new AtomicReference<>(beginTime);
+    when(mockClock.instant()).thenAnswer(invocation -> {
+      Instant currentInstant = nextInstant.get();
+      nextInstant.set(currentInstant.plus(mockedTime));

Review Comment:
   I noticed that you're using AtomicReference. But you are not doing an atomic get and set, which basically defeats the point of what you're doing. 
   
   Did you mean to do something like `getAndAccumulate`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@gobblin.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [gobblin] umustafi commented on a diff in pull request #3704: [GOBBLIN-1840] Helix Job scheduler should not try to replace running workflow if within configured time

Posted by "umustafi (via GitHub)" <gi...@apache.org>.
umustafi commented on code in PR #3704:
URL: https://github.com/apache/gobblin/pull/3704#discussion_r1231362760


##########
gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobScheduler.java:
##########
@@ -333,6 +345,20 @@ public void handleNewJobConfigArrival(NewJobConfigArrivalEvent newJobArrival) {
   @Subscribe
   public void handleUpdateJobConfigArrival(UpdateJobConfigArrivalEvent updateJobArrival) {
     LOGGER.info("Received update for job configuration of job " + updateJobArrival.getJobName());
+    String jobName = updateJobArrival.getJobName();
+    boolean throttleEnabled = PropertiesUtils.getPropAsBoolean(updateJobArrival.getJobConfig(),

Review Comment:
   but you are not using that value here right? You want to provide `DEFAULT_HELIX_JOB_SCHEDULING_THROTTLE_ENABLED_KEY` here so value is used 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@gobblin.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [gobblin] homatthew commented on a diff in pull request #3704: [GOBBLIN-1840] Helix Job scheduler should not try to replace running workflow if within configured time

Posted by "homatthew (via GitHub)" <gi...@apache.org>.
homatthew commented on code in PR #3704:
URL: https://github.com/apache/gobblin/pull/3704#discussion_r1243021714


##########
gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinThrottlingHelixJobLauncherListener.java:
##########
@@ -0,0 +1,64 @@
+package org.apache.gobblin.cluster;
+
+import java.time.Clock;
+import java.time.Duration;
+import java.time.Instant;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.gobblin.runtime.JobContext;
+import org.apache.gobblin.runtime.JobState;
+
+
+/**
+ * A job listener used when {@link GobblinHelixJobLauncher} launches a job.
+ * In {@link GobblinHelixJobScheduler}, when throttling is enabled, this
+ * listener would record jobName to next schedulable time to decide whether
+ * the replanning should be executed or skipped.
+ */
+public class GobblinThrottlingHelixJobLauncherListener extends GobblinHelixJobLauncherListener {
+
+  public final static Logger LOG = LoggerFactory.getLogger(GobblinThrottlingHelixJobLauncherListener.class);

Review Comment:
   use `@SLf4j` for adding a logging



##########
gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinThrottlingHelixJobLauncherListener.java:
##########
@@ -0,0 +1,64 @@
+package org.apache.gobblin.cluster;
+
+import java.time.Clock;
+import java.time.Duration;
+import java.time.Instant;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.gobblin.runtime.JobContext;
+import org.apache.gobblin.runtime.JobState;
+
+
+/**
+ * A job listener used when {@link GobblinHelixJobLauncher} launches a job.
+ * In {@link GobblinHelixJobScheduler}, when throttling is enabled, this
+ * listener would record jobName to next schedulable time to decide whether
+ * the replanning should be executed or skipped.
+ */
+public class GobblinThrottlingHelixJobLauncherListener extends GobblinHelixJobLauncherListener {
+
+  public final static Logger LOG = LoggerFactory.getLogger(GobblinThrottlingHelixJobLauncherListener.class);
+  private ConcurrentHashMap<String, Instant> jobNameToNextSchedulableTime;
+  private Duration helixJobSchedulingThrottleTimeout;
+  private Clock clock;
+
+  public GobblinThrottlingHelixJobLauncherListener(GobblinHelixJobLauncherMetrics jobLauncherMetrics,
+      ConcurrentHashMap jobNameToNextSchedulableTime, Duration helixJobSchedulingThrottleTimeout, Clock clock) {
+    super(jobLauncherMetrics);
+    this.jobNameToNextSchedulableTime = jobNameToNextSchedulableTime;
+    this.helixJobSchedulingThrottleTimeout = helixJobSchedulingThrottleTimeout;
+    this.clock = clock;
+  }
+
+  @Override
+  public void onJobPrepare(JobContext jobContext)
+      throws Exception {
+    super.onJobPrepare(jobContext);
+    Instant nextSchedulableTime = clock.instant().plus(helixJobSchedulingThrottleTimeout);
+    jobNameToNextSchedulableTime.put(jobContext.getJobName(), nextSchedulableTime);
+    LOG.info(jobContext.getJobName() + " finished prepare. The next schedulable time is  " + nextSchedulableTime );

Review Comment:
   Use `{}` syntax for logging object info



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@gobblin.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [gobblin] homatthew commented on a diff in pull request #3704: [GOBBLIN-1840] Helix Job scheduler should not try to replace running workflow if within configured time

Posted by "homatthew (via GitHub)" <gi...@apache.org>.
homatthew commented on code in PR #3704:
URL: https://github.com/apache/gobblin/pull/3704#discussion_r1244464916


##########
gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobScheduler.java:
##########
@@ -315,24 +344,35 @@ public void handleNewJobConfigArrival(NewJobConfigArrivalEvent newJobArrival) {
       jobProps.setProperty(GobblinClusterConfigurationKeys.JOB_SPEC_URI, jobUri);
 
       this.jobSchedulerMetrics.updateTimeBeforeJobScheduling(jobProps);
-
+      GobblinHelixJobLauncherListener listener = isThrottleEnabled ?
+          new GobblinThrottlingHelixJobLauncherListener(this.launcherMetrics, jobNameToNextSchedulableTime,
+              jobSchedulingThrottleTimeout, clock)
+          : new GobblinHelixJobLauncherListener(this.launcherMetrics);
       if (jobProps.containsKey(ConfigurationKeys.JOB_SCHEDULE_KEY)) {
         LOGGER.info("Scheduling job " + jobUri);
-        scheduleJob(jobProps,
-                    new GobblinHelixJobLauncherListener(this.launcherMetrics));
+        scheduleJob(jobProps, listener);
       } else {
         LOGGER.info("No job schedule found, so running job " + jobUri);
-        this.jobExecutor.execute(new NonScheduledJobRunner(jobProps,
-                                 new GobblinHelixJobLauncherListener(this.launcherMetrics)));
+        this.jobExecutor.execute(new NonScheduledJobRunner(jobProps, listener));
       }
     } catch (JobException je) {
       LOGGER.error("Failed to schedule or run job " + jobUri, je);
     }
   }
 
   @Subscribe
-  public void handleUpdateJobConfigArrival(UpdateJobConfigArrivalEvent updateJobArrival) {
+  public synchronized void handleUpdateJobConfigArrival(UpdateJobConfigArrivalEvent updateJobArrival) {

Review Comment:
   Summary of offline discussion:
   > What kind of throughput are expecting with this job launcher? I.e. for fliptop I know the traffic is bursty but how is it bursty? What sort of magnitude are we talking about here?
   
   > we have 100k jobs submitted throughout the day, so around 1~2 per second? And cancel job can be triggered randomly but should be much in-frequent
   
   Since the only blocking operation in the critical section is the delete operation, and there are infrequent deletes (usually this takes seconds to complete), we can go ahead with the change and add fine-grained locking in the future if necessary



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@gobblin.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [gobblin] umustafi commented on a diff in pull request #3704: [GOBBLIN-1840] Helix Job scheduler should not try to replace running workflow if within configured time

Posted by "umustafi (via GitHub)" <gi...@apache.org>.
umustafi commented on code in PR #3704:
URL: https://github.com/apache/gobblin/pull/3704#discussion_r1231363962


##########
gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinHelixJobSchedulerTest.java:
##########
@@ -132,57 +143,260 @@ public void setUp()
   @Test
   public void testNewJobAndUpdate()
       throws Exception {
+    try (MockedStatic<Instant> mocked = mockStatic(Instant.class, CALLS_REAL_METHODS)) {
+      mocked.when(Instant::now).thenReturn(beginTime, longPeriod);
+      HelixManager helixManager = HelixManagerFactory
+          .getZKHelixManager(helixClusterName, TestHelper.TEST_HELIX_INSTANCE_NAME, InstanceType.CONTROLLER,
+              zkConnectingString);
+
+      GobblinHelixJobScheduler jobScheduler = createJobScheduler(helixManager);
+
+      final Properties properties1 =
+          GobblinHelixJobLauncherTest.generateJobProperties(
+              this.baseConfig, "NewJobAndUpdate", workflowIdSuffix1);
+      NewJobConfigArrivalEvent newJobConfigArrivalEvent = createJobConfigArrivalEvent(properties1, workflowIdSuffix2);
+      jobScheduler.handleNewJobConfigArrival(newJobConfigArrivalEvent);
+
+      connectAndAssertWorkflowId(workflowIdSuffix1, newJobConfigArrivalEvent, helixManager);
+
+      properties1.setProperty(GobblinClusterConfigurationKeys.HELIX_JOB_SCHEDULING_THROTTLE_ENABLED_KEY, "true");
+      jobScheduler.handleUpdateJobConfigArrival(
+          new UpdateJobConfigArrivalEvent(properties1.getProperty(ConfigurationKeys.JOB_NAME_KEY), properties1));
+
+      connectAndAssertWorkflowId(workflowIdSuffix2, newJobConfigArrivalEvent, helixManager);
+    }
+  }
+
+  @Test
+  public void testUpdateSameWorkflowShortPeriodThrottle()
+      throws Exception {
+    try (MockedStatic<Instant> mocked = mockStatic(Instant.class, CALLS_REAL_METHODS)) {
+      mocked.when(Instant::now).thenReturn(beginTime, shortPeriod);
+      HelixManager helixManager = HelixManagerFactory

Review Comment:
   i see, let's add a comment to explain that in javadoc for this testing class and make a method to create `HelixManager` that you can reuse across test. You can also put comment there to explain why you changed to local variable so the knowledge is preserved for those updating tests in future.  



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@gobblin.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [gobblin] homatthew commented on a diff in pull request #3704: Py helix scheduler throttle gobblin 1840

Posted by "homatthew (via GitHub)" <gi...@apache.org>.
homatthew commented on code in PR #3704:
URL: https://github.com/apache/gobblin/pull/3704#discussion_r1229998645


##########
gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinHelixJobSchedulerTest.java:
##########
@@ -96,16 +107,16 @@ public void setUp()
             ConfigValueFactory.fromAnyRef(sourceJsonFile.getAbsolutePath()))
         .withValue(ConfigurationKeys.JOB_STATE_IN_STATE_STORE, ConfigValueFactory.fromAnyRef("true")).resolve();
 
-    String zkConnectingString = baseConfig.getString(GobblinClusterConfigurationKeys.ZK_CONNECTION_STRING_KEY);
-    String helixClusterName = baseConfig.getString(GobblinClusterConfigurationKeys.HELIX_CLUSTER_NAME_KEY);
+    zkConnectingString = baseConfig.getString(GobblinClusterConfigurationKeys.ZK_CONNECTION_STRING_KEY);
+    helixClusterName = baseConfig.getString(GobblinClusterConfigurationKeys.HELIX_CLUSTER_NAME_KEY);
 
     HelixUtils.createGobblinHelixCluster(zkConnectingString, helixClusterName);
 
-    this.helixManager = HelixManagerFactory
+    HelixManager helixManager = HelixManagerFactory

Review Comment:
   Why do we create a local variable helixManager here? Seems like it's not used aside from connecting



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@gobblin.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [gobblin] ZihanLi58 commented on a diff in pull request #3704: [GOBBLIN-1840] Helix Job scheduler should not try to replace running workflow if within configured time

Posted by "ZihanLi58 (via GitHub)" <gi...@apache.org>.
ZihanLi58 commented on code in PR #3704:
URL: https://github.com/apache/gobblin/pull/3704#discussion_r1244336768


##########
gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobScheduler.java:
##########
@@ -360,7 +400,7 @@ private void waitForJobCompletion(String jobName) {
   }
 
   @Subscribe
-  public void handleDeleteJobConfigArrival(DeleteJobConfigArrivalEvent deleteJobArrival) throws InterruptedException {
+  public synchronized void handleDeleteJobConfigArrival(DeleteJobConfigArrivalEvent deleteJobArrival) throws InterruptedException {

Review Comment:
   1. In handleUpdateJobConfigArrival, we call handleDeleteJobConfigArrival directly. So if you want to specifically reset it, remember to distinguish the two calls here (one is called by handleUpdateJobConfigArrival) and another is called directly. 
   2. Unless you change all the cancel APIs in our code to send a delete job message to trigger this method, then, in that case, resetting the timer can enable us to start a new job immediately, otherwise it does not make sense to achieve 1, as we don't explicitly delete anyway... 



##########
gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobScheduler.java:
##########
@@ -315,24 +344,35 @@ public void handleNewJobConfigArrival(NewJobConfigArrivalEvent newJobArrival) {
       jobProps.setProperty(GobblinClusterConfigurationKeys.JOB_SPEC_URI, jobUri);
 
       this.jobSchedulerMetrics.updateTimeBeforeJobScheduling(jobProps);
-
+      GobblinHelixJobLauncherListener listener = isThrottleEnabled ?
+          new GobblinThrottlingHelixJobLauncherListener(this.launcherMetrics, jobNameToNextSchedulableTime,
+              jobSchedulingThrottleTimeout, clock)
+          : new GobblinHelixJobLauncherListener(this.launcherMetrics);
       if (jobProps.containsKey(ConfigurationKeys.JOB_SCHEDULE_KEY)) {
         LOGGER.info("Scheduling job " + jobUri);
-        scheduleJob(jobProps,
-                    new GobblinHelixJobLauncherListener(this.launcherMetrics));
+        scheduleJob(jobProps, listener);
       } else {
         LOGGER.info("No job schedule found, so running job " + jobUri);
-        this.jobExecutor.execute(new NonScheduledJobRunner(jobProps,
-                                 new GobblinHelixJobLauncherListener(this.launcherMetrics)));
+        this.jobExecutor.execute(new NonScheduledJobRunner(jobProps, listener));
       }
     } catch (JobException je) {
       LOGGER.error("Failed to schedule or run job " + jobUri, je);
     }
   }
 
   @Subscribe
-  public void handleUpdateJobConfigArrival(UpdateJobConfigArrivalEvent updateJobArrival) {
+  public synchronized void handleUpdateJobConfigArrival(UpdateJobConfigArrivalEvent updateJobArrival) {

Review Comment:
   @homatthew are we sure this change won't affect performance when those message-handling methods will be called frequently? (That's why initially I suggested having job level lock)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@gobblin.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [gobblin] umustafi commented on a diff in pull request #3704: [GOBBLIN-1840] Helix Job scheduler should not try to replace running workflow if within configured time

Posted by "umustafi (via GitHub)" <gi...@apache.org>.
umustafi commented on code in PR #3704:
URL: https://github.com/apache/gobblin/pull/3704#discussion_r1231361824


##########
gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobScheduler.java:
##########
@@ -110,6 +113,8 @@ public class GobblinHelixJobScheduler extends JobScheduler implements StandardMe
 
   private boolean startServicesCompleted;
   private final long helixJobStopTimeoutMillis;
+  private final Duration throttleTimeoutDuration;
+  private ConcurrentHashMap<String, Instant> jobStartTimeMap;

Review Comment:
   I don't have that strong of a preference with Instant vs. Timestamp/Long, the latter are more common on GaaS side so I was initially surprised. More important to update the map name. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@gobblin.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [gobblin] codecov-commenter commented on pull request #3704: [GOBBLIN-1840] Helix Job scheduler should not try to replace running workflow if within configured time

Posted by "codecov-commenter (via GitHub)" <gi...@apache.org>.
codecov-commenter commented on PR #3704:
URL: https://github.com/apache/gobblin/pull/3704#issuecomment-1593587610

   ## [Codecov](https://app.codecov.io/gh/apache/gobblin/pull/3704?src=pr&el=h1&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) Report
   > Merging [#3704](https://app.codecov.io/gh/apache/gobblin/pull/3704?src=pr&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) (0e7ba4d) into [master](https://app.codecov.io/gh/apache/gobblin/commit/51a852d506b749b9ac33568aff47105e14972a57?el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) (51a852d) will **decrease** coverage by `2.15%`.
   > The diff coverage is `100.00%`.
   
   ```diff
   @@             Coverage Diff              @@
   ##             master    #3704      +/-   ##
   ============================================
   - Coverage     46.97%   44.82%   -2.15%     
   + Complexity    10794     2104    -8690     
   ============================================
     Files          2138      411    -1727     
     Lines         84132    17750   -66382     
     Branches       9356     2165    -7191     
   ============================================
   - Hits          39518     7957   -31561     
   + Misses        41015     8936   -32079     
   + Partials       3599      857    -2742     
   ```
   
   
   | [Impacted Files](https://app.codecov.io/gh/apache/gobblin/pull/3704?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | Coverage Δ | |
   |---|---|---|
   | [...bblin/cluster/GobblinClusterConfigurationKeys.java](https://app.codecov.io/gh/apache/gobblin/pull/3704?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache#diff-Z29iYmxpbi1jbHVzdGVyL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL2NsdXN0ZXIvR29iYmxpbkNsdXN0ZXJDb25maWd1cmF0aW9uS2V5cy5qYXZh) | `0.00% <ø> (ø)` | |
   | [...ache/gobblin/cluster/GobblinHelixJobScheduler.java](https://app.codecov.io/gh/apache/gobblin/pull/3704?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache#diff-Z29iYmxpbi1jbHVzdGVyL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL2NsdXN0ZXIvR29iYmxpbkhlbGl4Sm9iU2NoZWR1bGVyLmphdmE=) | `57.89% <100.00%> (+3.65%)` | :arrow_up: |
   | [...in/java/org/apache/gobblin/cluster/HelixUtils.java](https://app.codecov.io/gh/apache/gobblin/pull/3704?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache#diff-Z29iYmxpbi1jbHVzdGVyL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL2NsdXN0ZXIvSGVsaXhVdGlscy5qYXZh) | `50.00% <100.00%> (+2.48%)` | :arrow_up: |
   
   ... and [1735 files with indirect coverage changes](https://app.codecov.io/gh/apache/gobblin/pull/3704/indirect-changes?src=pr&el=tree-more&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache)
   
   :mega: We’re building smart automated test selection to slash your CI/CD build times. [Learn more](https://about.codecov.io/iterative-testing/?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@gobblin.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [gobblin] homatthew commented on a diff in pull request #3704: [GOBBLIN-1840] Helix Job scheduler should not try to replace running workflow if within configured time

Posted by "homatthew (via GitHub)" <gi...@apache.org>.
homatthew commented on code in PR #3704:
URL: https://github.com/apache/gobblin/pull/3704#discussion_r1230000099


##########
gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinHelixJobSchedulerTest.java:
##########
@@ -96,16 +107,16 @@ public void setUp()
             ConfigValueFactory.fromAnyRef(sourceJsonFile.getAbsolutePath()))
         .withValue(ConfigurationKeys.JOB_STATE_IN_STATE_STORE, ConfigValueFactory.fromAnyRef("true")).resolve();
 
-    String zkConnectingString = baseConfig.getString(GobblinClusterConfigurationKeys.ZK_CONNECTION_STRING_KEY);
-    String helixClusterName = baseConfig.getString(GobblinClusterConfigurationKeys.HELIX_CLUSTER_NAME_KEY);
+    zkConnectingString = baseConfig.getString(GobblinClusterConfigurationKeys.ZK_CONNECTION_STRING_KEY);

Review Comment:
   nit: within this test, it seems we use `this.` as a prefix for member variables. So let's try to follow convention



##########
gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinHelixJobSchedulerTest.java:
##########
@@ -96,16 +107,16 @@ public void setUp()
             ConfigValueFactory.fromAnyRef(sourceJsonFile.getAbsolutePath()))
         .withValue(ConfigurationKeys.JOB_STATE_IN_STATE_STORE, ConfigValueFactory.fromAnyRef("true")).resolve();
 
-    String zkConnectingString = baseConfig.getString(GobblinClusterConfigurationKeys.ZK_CONNECTION_STRING_KEY);
-    String helixClusterName = baseConfig.getString(GobblinClusterConfigurationKeys.HELIX_CLUSTER_NAME_KEY);
+    zkConnectingString = baseConfig.getString(GobblinClusterConfigurationKeys.ZK_CONNECTION_STRING_KEY);
+    helixClusterName = baseConfig.getString(GobblinClusterConfigurationKeys.HELIX_CLUSTER_NAME_KEY);
 
     HelixUtils.createGobblinHelixCluster(zkConnectingString, helixClusterName);
 
-    this.helixManager = HelixManagerFactory
+    HelixManager helixManager = HelixManagerFactory

Review Comment:
   Why do we create a local variable helixManager here? Seems like it's not used aside from connecting



##########
gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobScheduler.java:
##########
@@ -333,6 +345,20 @@ public void handleNewJobConfigArrival(NewJobConfigArrivalEvent newJobArrival) {
   @Subscribe
   public void handleUpdateJobConfigArrival(UpdateJobConfigArrivalEvent updateJobArrival) {
     LOGGER.info("Received update for job configuration of job " + updateJobArrival.getJobName());
+    String jobName = updateJobArrival.getJobName();
+    boolean throttleEnabled = PropertiesUtils.getPropAsBoolean(updateJobArrival.getJobConfig(),
+        GobblinClusterConfigurationKeys.HELIX_JOB_SCHEDULING_THROTTLE_ENABLED_KEY,
+        String.valueOf(GobblinClusterConfigurationKeys.DEFAULT_HELIX_JOB_SCHEDULING_THROTTLE_ENABLED_KEY));
+
+    if (throttleEnabled && this.jobStartTimeMap.containsKey(jobName)) {
+      Instant jobStartTime = this.jobStartTimeMap.get(jobName);
+      Duration workflowDuration = Duration.between(jobStartTime, Instant.now());
+      Duration difference = workflowDuration.minus(throttleTimeoutDuration);

Review Comment:
   Nit: This variable doesn't add much value here.  IMO something like
   ```
   workflowDuration.minus(throttleTimeoutDuration).isNegative()
   ```
   
   Seems reasonably readable. 



##########
gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinHelixJobSchedulerTest.java:
##########
@@ -70,9 +74,16 @@ public class GobblinHelixJobSchedulerTest {
   private GobblinTaskRunner gobblinTaskRunner;
 
   private Thread thread;
-
   private final String workflowIdSuffix1 = "_1504201348471";
   private final String workflowIdSuffix2 = "_1504201348472";
+  private final String workflowIdSuffix3 = "_1504201348473";
+
+  private Instant beginTime = Instant.ofEpochMilli(0);
+  private Instant shortPeriod = Instant.ofEpochMilli(1);

Review Comment:
   Nit: Period doesn't seem quite right here since these are `Instant` of time. So maybe something like
   `shortTimeLater` and `longTimeLater` could make more sense



##########
gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobScheduler.java:
##########
@@ -333,6 +345,20 @@ public void handleNewJobConfigArrival(NewJobConfigArrivalEvent newJobArrival) {
   @Subscribe
   public void handleUpdateJobConfigArrival(UpdateJobConfigArrivalEvent updateJobArrival) {
     LOGGER.info("Received update for job configuration of job " + updateJobArrival.getJobName());
+    String jobName = updateJobArrival.getJobName();
+    boolean throttleEnabled = PropertiesUtils.getPropAsBoolean(updateJobArrival.getJobConfig(),
+        GobblinClusterConfigurationKeys.HELIX_JOB_SCHEDULING_THROTTLE_ENABLED_KEY,
+        String.valueOf(GobblinClusterConfigurationKeys.DEFAULT_HELIX_JOB_SCHEDULING_THROTTLE_ENABLED_KEY));
+
+    if (throttleEnabled && this.jobStartTimeMap.containsKey(jobName)) {
+      Instant jobStartTime = this.jobStartTimeMap.get(jobName);
+      Duration workflowDuration = Duration.between(jobStartTime, Instant.now());

Review Comment:
   Maybe `workflowRunningDuration` is a more descriptive name. @ZihanLi58 can you help chime in here



##########
gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java:
##########
@@ -222,4 +222,11 @@ public class GobblinClusterConfigurationKeys {
   public static final String CONTAINER_ID_KEY = GOBBLIN_HELIX_PREFIX + "containerId";
 
   public static final String GOBBLIN_CLUSTER_SYSTEM_PROPERTY_PREFIX = GOBBLIN_CLUSTER_PREFIX + "sysProps";
+
+  public static final String HELIX_JOB_SCHEDULING_THROTTLE_ENABLED_KEY = "helix.job.scheduling.throttle.enabled";
+  public static final boolean DEFAULT_HELIX_JOB_SCHEDULING_THROTTLE_ENABLED_KEY = false;

Review Comment:
   The reason we would call this `Throttle` is because we are preventing frequent replanner restarts from executing in the Gobblin AM. In other words, when a Helix workflow is computed by a regular start-up, replanner, or nurse replanner, there is no reason to ever recompute and resubmit.
   
   This is because there is a significant "cold" start time to these workflows and there would be no benefit from allow these things to trigger shortly one after another.



##########
gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobScheduler.java:
##########
@@ -110,6 +113,8 @@ public class GobblinHelixJobScheduler extends JobScheduler implements StandardMe
 
   private boolean startServicesCompleted;
   private final long helixJobStopTimeoutMillis;
+  private final Duration throttleTimeoutDuration;
+  private ConcurrentHashMap<String, Instant> jobStartTimeMap;

Review Comment:
   Also just my opinion, but Instant (or I guess `Timestamp`) are less error prone to write code for than millis. 
   
   I don't have an opinion on changing the above duration to Millis long to fit the rest of the class. But Instant vs long is a big deal because long is hard to reason about. It often refers to epoch millis but you always have to add that epochmillis to the name of the map. 
   
   As for `java.time.Instant` vs `java.sql.Timestamp`, I've seen `Instant` used elsewhere. And personally haven't seen `Timestamp` used much. So clearly there are multiple pockets  of usage. and either one makes sense probably.



##########
gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobScheduler.java:
##########
@@ -110,6 +113,8 @@ public class GobblinHelixJobScheduler extends JobScheduler implements StandardMe
 
   private boolean startServicesCompleted;
   private final long helixJobStopTimeoutMillis;
+  private final Duration throttleTimeoutDuration;
+  private ConcurrentHashMap<String, Instant> jobStartTimeMap;

Review Comment:
   +1 to adjusting the map name. Pretty sure the key is the jobName is the key, which refers to the Gobblin configuration job.name
   
   



##########
gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobScheduler.java:
##########
@@ -333,6 +345,20 @@ public void handleNewJobConfigArrival(NewJobConfigArrivalEvent newJobArrival) {
   @Subscribe
   public void handleUpdateJobConfigArrival(UpdateJobConfigArrivalEvent updateJobArrival) {
     LOGGER.info("Received update for job configuration of job " + updateJobArrival.getJobName());
+    String jobName = updateJobArrival.getJobName();
+    boolean throttleEnabled = PropertiesUtils.getPropAsBoolean(updateJobArrival.getJobConfig(),
+        GobblinClusterConfigurationKeys.HELIX_JOB_SCHEDULING_THROTTLE_ENABLED_KEY,
+        String.valueOf(GobblinClusterConfigurationKeys.DEFAULT_HELIX_JOB_SCHEDULING_THROTTLE_ENABLED_KEY));
+
+    if (throttleEnabled && this.jobStartTimeMap.containsKey(jobName)) {
+      Instant jobStartTime = this.jobStartTimeMap.get(jobName);
+      Duration workflowDuration = Duration.between(jobStartTime, Instant.now());
+      Duration difference = workflowDuration.minus(throttleTimeoutDuration);
+      if (difference.isNegative()) {

Review Comment:
   +1 to a log for if this executes



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@gobblin.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [gobblin] homatthew commented on a diff in pull request #3704: [GOBBLIN-1840] Helix Job scheduler should not try to replace running workflow if within configured time

Posted by "homatthew (via GitHub)" <gi...@apache.org>.
homatthew commented on code in PR #3704:
URL: https://github.com/apache/gobblin/pull/3704#discussion_r1243109243


##########
gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobScheduler.java:
##########
@@ -315,24 +344,35 @@ public void handleNewJobConfigArrival(NewJobConfigArrivalEvent newJobArrival) {
       jobProps.setProperty(GobblinClusterConfigurationKeys.JOB_SPEC_URI, jobUri);
 
       this.jobSchedulerMetrics.updateTimeBeforeJobScheduling(jobProps);
-
+      GobblinHelixJobLauncherListener listener = isThrottleEnabled ?
+          new GobblinThrottlingHelixJobLauncherListener(this.launcherMetrics, jobNameToNextSchedulableTime,
+              jobSchedulingThrottleTimeout, clock)
+          : new GobblinHelixJobLauncherListener(this.launcherMetrics);
       if (jobProps.containsKey(ConfigurationKeys.JOB_SCHEDULE_KEY)) {
         LOGGER.info("Scheduling job " + jobUri);
-        scheduleJob(jobProps,
-                    new GobblinHelixJobLauncherListener(this.launcherMetrics));
+        scheduleJob(jobProps, listener);
       } else {
         LOGGER.info("No job schedule found, so running job " + jobUri);
-        this.jobExecutor.execute(new NonScheduledJobRunner(jobProps,
-                                 new GobblinHelixJobLauncherListener(this.launcherMetrics)));
+        this.jobExecutor.execute(new NonScheduledJobRunner(jobProps, listener));
       }
     } catch (JobException je) {
       LOGGER.error("Failed to schedule or run job " + jobUri, je);
     }
   }
 
   @Subscribe
-  public void handleUpdateJobConfigArrival(UpdateJobConfigArrivalEvent updateJobArrival) {
+  public synchronized void handleUpdateJobConfigArrival(UpdateJobConfigArrivalEvent updateJobArrival) {
     LOGGER.info("Received update for job configuration of job " + updateJobArrival.getJobName());
+    String jobName = updateJobArrival.getJobName();
+
+    Instant nextSchedulableTime = jobNameToNextSchedulableTime.getOrDefault(jobName, Instant.MIN);
+    if (this.isThrottleEnabled && clock.instant().isBefore(nextSchedulableTime)) {
+      LOGGER.info("Replanning is skipped for job {}. Current time is "
+          + clock.instant() + " and the next schedulable time would be "

Review Comment:
   clock.instant() should be using the `{}` syntax. Same for the nextSchedulable time. And instead of getting the value from the map, use the `nextSchedulableTime` variable



##########
gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinHelixJobSchedulerTest.java:
##########
@@ -129,58 +140,194 @@ public void setUp()
     this.thread.start();
   }
 
+  /***
+   * Time span exceeds throttle timeout, within same workflow, throttle is enabled
+   * Job will be updated
+   * @throws Exception
+   */
+  @Test
+  public void testUpdateSameWorkflowLongPeriodThrottle()
+      throws Exception {
+    runWorkflowTest(exceedsThrottlePeriod, "UpdateSameWorkflowLongPeriodThrottle",
+        workflowIdSuffix1, workflowIdSuffix2, workflowIdSuffix2,
+        true, true);
+  }
+
+  /***
+   * Time span is within throttle timeout, within same workflow, throttle is enabled
+   * Job will not be updated
+   * @throws Exception
+   */
+  @Test
+  public void testUpdateSameWorkflowShortPeriodThrottle()
+      throws Exception {
+    runWorkflowTest(withinThrottlePeriod, "UpdateSameWorkflowShortPeriodThrottle",
+        workflowIdSuffix1, workflowIdSuffix2, workflowIdSuffix1,
+        true, true);
+  }
+
+  /***
+   * Time span exceeds throttle timeout, within same workflow, throttle is not enabled
+   * Job will be updated
+   * @throws Exception
+   */
+  @Test
+  public void testUpdateSameWorkflowLongPeriodNoThrottle()
+      throws Exception {
+    runWorkflowTest(exceedsThrottlePeriod, "UpdateSameWorkflowLongPeriodNoThrottle",
+        workflowIdSuffix1, workflowIdSuffix2, workflowIdSuffix2,
+        false, true);
+  }
+
+  /***
+   * Time span is within throttle timeout, within same workflow, throttle is not enabled
+   * Job will be updated
+   * @throws Exception
+   */
+  @Test
+  public void testUpdateSameWorkflowShortPeriodNoThrottle()
+      throws Exception {
+    runWorkflowTest(withinThrottlePeriod, "UpdateSameWorkflowShortPeriodNoThrottle",
+        workflowIdSuffix1, workflowIdSuffix2, workflowIdSuffix2,
+        false, true);
+  }
+
+  /***
+   * Time span is within throttle timeout, within different workflow, throttle is enabled
+   * Job will be updated
+   * @throws Exception
+   */
+  public void testUpdateDiffWorkflowShortPeriodThrottle()
+      throws Exception {
+    runWorkflowTest(withinThrottlePeriod, "UpdateDiffWorkflowShortPeriodThrottle",
+        workflowIdSuffix1, workflowIdSuffix2, workflowIdSuffix2,
+        true, false);
+  }
+
+  /***
+   * Time span is within throttle timeout, within different workflow, throttle is not enabled
+   * Job will be updated
+   * @throws Exception
+   */
+  @Test
+  public void testUpdateDiffWorkflowShortPeriodNoThrottle()
+      throws Exception {
+    runWorkflowTest(withinThrottlePeriod, "UpdateDiffWorkflowShortPeriodNoThrottle",
+        workflowIdSuffix1, workflowIdSuffix2, workflowIdSuffix2,
+        false, false);
+  }
+
+  /***
+   * Time span exceeds throttle timeout, within different workflow, throttle is enabled
+   * Job will be updated
+   * @throws Exception
+   */
+  @Test
+  public void testUpdateDiffWorkflowLongPeriodThrottle()
+      throws Exception {
+    runWorkflowTest(exceedsThrottlePeriod, "UpdateDiffWorkflowLongPeriodThrottle",
+        workflowIdSuffix1, workflowIdSuffix2, workflowIdSuffix2,
+        true, false);
+  }
+
+  /***
+   * Time span exceeds throttle timeout, within different workflow, throttle is not enabled
+   * Job will be updated
+   * @throws Exception
+   */
   @Test
-  public void testNewJobAndUpdate()
+  public void testUpdateDiffWorkflowLongPeriodNoThrottle()
       throws Exception {
+    runWorkflowTest(exceedsThrottlePeriod, "UpdateDiffWorkflowLongPeriodNoThrottle",
+        workflowIdSuffix1, workflowIdSuffix2, workflowIdSuffix2,
+        false, false);
+  }
+
+  private GobblinHelixJobScheduler createJobScheduler(HelixManager helixManager, boolean isThrottleEnabled, Clock clock) throws Exception {
+    java.nio.file.Path p = Files.createTempDirectory(GobblinHelixJobScheduler.class.getSimpleName());
     Config config = ConfigFactory.empty().withValue(ConfigurationKeys.JOB_CONFIG_FILE_GENERAL_PATH_KEY,
-        ConfigValueFactory.fromAnyRef("/tmp/" + GobblinHelixJobScheduler.class.getSimpleName()));
+        ConfigValueFactory.fromAnyRef(p.toString()));
     SchedulerService schedulerService = new SchedulerService(new Properties());
     NonObservingFSJobCatalog jobCatalog = new NonObservingFSJobCatalog(config);
     jobCatalog.startAsync();
-    GobblinHelixJobScheduler jobScheduler =
-        new GobblinHelixJobScheduler(ConfigFactory.empty(), this.helixManager, java.util.Optional.empty(),
-            new EventBus(), appWorkDir, Lists.emptyList(), schedulerService, jobCatalog);
-
-    final Properties properties1 =
-        GobblinHelixJobLauncherTest.generateJobProperties(this.baseConfig, "1", workflowIdSuffix1);
-    properties1.setProperty(GobblinClusterConfigurationKeys.CANCEL_RUNNING_JOB_ON_DELETE, "true");
+    Config helixJobSchedulerConfig = ConfigFactory.empty().withValue("helix.job.scheduling.throttle.enabled",
+        ConfigValueFactory.fromAnyRef(isThrottleEnabled));
+    GobblinHelixJobScheduler gobblinHelixJobScheduler = new GobblinHelixJobScheduler(helixJobSchedulerConfig, helixManager, java.util.Optional.empty(),
+          new EventBus(), appWorkDir, Lists.emptyList(), schedulerService, jobCatalog, clock);
+    return gobblinHelixJobScheduler;
+  }
 
+  private NewJobConfigArrivalEvent createJobConfigArrivalEvent(Properties properties) {
+    properties.setProperty(GobblinClusterConfigurationKeys.CANCEL_RUNNING_JOB_ON_DELETE, "true");
     NewJobConfigArrivalEvent newJobConfigArrivalEvent =
-        new NewJobConfigArrivalEvent(properties1.getProperty(ConfigurationKeys.JOB_NAME_KEY), properties1);
-    jobScheduler.handleNewJobConfigArrival(newJobConfigArrivalEvent);
-    properties1.setProperty(ConfigurationKeys.JOB_ID_KEY,
-        "job_" + properties1.getProperty(ConfigurationKeys.JOB_NAME_KEY) + workflowIdSuffix2);
-    Map<String, String> workflowIdMap;
-    this.helixManager.connect();
+        new NewJobConfigArrivalEvent(properties.getProperty(ConfigurationKeys.JOB_NAME_KEY), properties);
+    return newJobConfigArrivalEvent;
+  }
+
+  private void connectAndAssertWorkflowId(String expectedSuffix, NewJobConfigArrivalEvent newJobConfigArrivalEvent, HelixManager helixManager ) throws Exception {
+    helixManager.connect();
+    String workFlowId = getWorkflowID(newJobConfigArrivalEvent, helixManager);
+    Assert.assertNotNull(workFlowId);
+    Assert.assertTrue(workFlowId.endsWith(expectedSuffix));
+  }
 
-    String workFlowId = null;
+  private String getWorkflowID (NewJobConfigArrivalEvent newJobConfigArrivalEvent, HelixManager helixManager)
+      throws Exception {
+    // Poll helix for up to 30 seconds to fetch until a workflow with a matching job name exists in Helix and then return that workflowID
     long endTime = System.currentTimeMillis() + 30000;
+    Map<String, String> workflowIdMap;
     while (System.currentTimeMillis() < endTime) {
-      workflowIdMap = HelixUtils.getWorkflowIdsFromJobNames(this.helixManager,
-          Collections.singletonList(newJobConfigArrivalEvent.getJobName()));
+      try{
+        workflowIdMap = HelixUtils.getWorkflowIdsFromJobNames(helixManager,
+            Collections.singletonList(newJobConfigArrivalEvent.getJobName()));
+      } catch(GobblinHelixUnexpectedStateException e){
+        continue;
+      }
       if (workflowIdMap.containsKey(newJobConfigArrivalEvent.getJobName())) {
-        workFlowId = workflowIdMap.get(newJobConfigArrivalEvent.getJobName());
-        break;
+        return workflowIdMap.get(newJobConfigArrivalEvent.getJobName());
       }
       Thread.sleep(100);
     }
-    Assert.assertNotNull(workFlowId);
-    Assert.assertTrue(workFlowId.endsWith(workflowIdSuffix1));
+    return null;
+  }
 
-    jobScheduler.handleUpdateJobConfigArrival(
-        new UpdateJobConfigArrivalEvent(properties1.getProperty(ConfigurationKeys.JOB_NAME_KEY), properties1));
-    this.helixManager.connect();
-    endTime = System.currentTimeMillis() + 30000;
-    while (System.currentTimeMillis() < endTime) {
-      workflowIdMap = HelixUtils.getWorkflowIdsFromJobNames(this.helixManager,
-          Collections.singletonList(newJobConfigArrivalEvent.getJobName()));
-      if (workflowIdMap.containsKey(newJobConfigArrivalEvent.getJobName())) {
-        workFlowId = workflowIdMap.get(newJobConfigArrivalEvent.getJobName());
-        break;
-      }
-      Thread.sleep(100);
+  private void runWorkflowTest(Instant mockedTime, String jobSuffix,
+    String newJobWorkflowIdSuffix, String updateWorkflowIdSuffix,
+    String assertUpdateWorkflowIdSuffix, boolean isThrottleEnabled, boolean isSameWorkflow) throws Exception {
+    Clock mockClock = Mockito.mock(Clock.class);
+    AtomicInteger count = new AtomicInteger(0);
+    when(mockClock.instant()).thenAnswer(invocation -> count.getAndIncrement() == 0 ? beginTime : mockedTime);
+
+    // Use GobblinHelixManagerFactory instead of HelixManagerFactory to avoid the connection error
+    // helixManager is set to local variable to avoid the HelixManager (ZkClient) is not connected error across tests
+    HelixManager helixManager = GobblinHelixManagerFactory
+        .getZKHelixManager(helixClusterName, TestHelper.TEST_HELIX_INSTANCE_NAME, InstanceType.CONTROLLER,
+            zkConnectingString);
+    GobblinHelixJobScheduler jobScheduler = createJobScheduler(helixManager, isThrottleEnabled, mockClock);
+    final Properties properties =
+        GobblinHelixJobLauncherTest.generateJobProperties(

Review Comment:
   Nit: was this meant to be on a new line? Seems like it would fit fine on the line above



##########
gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinThrottlingHelixJobLauncherListener.java:
##########
@@ -0,0 +1,74 @@
+package org.apache.gobblin.cluster;
+
+import java.time.Clock;
+import java.time.Duration;
+import java.time.Instant;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.runtime.JobContext;
+import org.apache.gobblin.runtime.JobState;
+
+
+/**
+ * A job listener used when {@link GobblinHelixJobLauncher} launches a job.
+ * In {@link GobblinHelixJobScheduler}, when throttling is enabled, this
+ * listener would record jobName to next schedulable time to decide whether
+ * the replanning should be executed or skipped.
+ */
+@Slf4j
+public class GobblinThrottlingHelixJobLauncherListener extends GobblinHelixJobLauncherListener {
+
+  public final static Logger LOG = LoggerFactory.getLogger(GobblinThrottlingHelixJobLauncherListener.class);
+  private ConcurrentHashMap<String, Instant> jobNameToNextSchedulableTime;
+  private Duration helixJobSchedulingThrottleTimeout;
+  private Clock clock;
+
+  public GobblinThrottlingHelixJobLauncherListener(GobblinHelixJobLauncherMetrics jobLauncherMetrics,
+      ConcurrentHashMap jobNameToNextSchedulableTime, Duration helixJobSchedulingThrottleTimeout, Clock clock) {

Review Comment:
   Shouldn't the it should be `ConcurrentHashMap<String, Instant>` instead of just `ConcurrentHashMap`?



##########
gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinHelixJobSchedulerTest.java:
##########
@@ -129,58 +140,194 @@ public void setUp()
     this.thread.start();
   }
 
+  /***
+   * Time span exceeds throttle timeout, within same workflow, throttle is enabled
+   * Job will be updated
+   * @throws Exception
+   */
+  @Test
+  public void testUpdateSameWorkflowLongPeriodThrottle()
+      throws Exception {
+    runWorkflowTest(exceedsThrottlePeriod, "UpdateSameWorkflowLongPeriodThrottle",
+        workflowIdSuffix1, workflowIdSuffix2, workflowIdSuffix2,
+        true, true);
+  }
+
+  /***
+   * Time span is within throttle timeout, within same workflow, throttle is enabled
+   * Job will not be updated
+   * @throws Exception
+   */
+  @Test
+  public void testUpdateSameWorkflowShortPeriodThrottle()
+      throws Exception {
+    runWorkflowTest(withinThrottlePeriod, "UpdateSameWorkflowShortPeriodThrottle",
+        workflowIdSuffix1, workflowIdSuffix2, workflowIdSuffix1,
+        true, true);
+  }
+
+  /***
+   * Time span exceeds throttle timeout, within same workflow, throttle is not enabled
+   * Job will be updated
+   * @throws Exception
+   */
+  @Test
+  public void testUpdateSameWorkflowLongPeriodNoThrottle()
+      throws Exception {
+    runWorkflowTest(exceedsThrottlePeriod, "UpdateSameWorkflowLongPeriodNoThrottle",
+        workflowIdSuffix1, workflowIdSuffix2, workflowIdSuffix2,
+        false, true);
+  }
+
+  /***
+   * Time span is within throttle timeout, within same workflow, throttle is not enabled
+   * Job will be updated
+   * @throws Exception
+   */
+  @Test
+  public void testUpdateSameWorkflowShortPeriodNoThrottle()
+      throws Exception {
+    runWorkflowTest(withinThrottlePeriod, "UpdateSameWorkflowShortPeriodNoThrottle",
+        workflowIdSuffix1, workflowIdSuffix2, workflowIdSuffix2,
+        false, true);
+  }
+
+  /***
+   * Time span is within throttle timeout, within different workflow, throttle is enabled
+   * Job will be updated
+   * @throws Exception
+   */
+  public void testUpdateDiffWorkflowShortPeriodThrottle()
+      throws Exception {
+    runWorkflowTest(withinThrottlePeriod, "UpdateDiffWorkflowShortPeriodThrottle",
+        workflowIdSuffix1, workflowIdSuffix2, workflowIdSuffix2,
+        true, false);
+  }
+
+  /***
+   * Time span is within throttle timeout, within different workflow, throttle is not enabled
+   * Job will be updated
+   * @throws Exception
+   */
+  @Test
+  public void testUpdateDiffWorkflowShortPeriodNoThrottle()
+      throws Exception {
+    runWorkflowTest(withinThrottlePeriod, "UpdateDiffWorkflowShortPeriodNoThrottle",
+        workflowIdSuffix1, workflowIdSuffix2, workflowIdSuffix2,
+        false, false);
+  }
+
+  /***
+   * Time span exceeds throttle timeout, within different workflow, throttle is enabled
+   * Job will be updated
+   * @throws Exception
+   */
+  @Test
+  public void testUpdateDiffWorkflowLongPeriodThrottle()
+      throws Exception {
+    runWorkflowTest(exceedsThrottlePeriod, "UpdateDiffWorkflowLongPeriodThrottle",
+        workflowIdSuffix1, workflowIdSuffix2, workflowIdSuffix2,
+        true, false);
+  }
+
+  /***
+   * Time span exceeds throttle timeout, within different workflow, throttle is not enabled
+   * Job will be updated
+   * @throws Exception
+   */
   @Test
-  public void testNewJobAndUpdate()
+  public void testUpdateDiffWorkflowLongPeriodNoThrottle()
       throws Exception {
+    runWorkflowTest(exceedsThrottlePeriod, "UpdateDiffWorkflowLongPeriodNoThrottle",
+        workflowIdSuffix1, workflowIdSuffix2, workflowIdSuffix2,
+        false, false);
+  }
+
+  private GobblinHelixJobScheduler createJobScheduler(HelixManager helixManager, boolean isThrottleEnabled, Clock clock) throws Exception {
+    java.nio.file.Path p = Files.createTempDirectory(GobblinHelixJobScheduler.class.getSimpleName());
     Config config = ConfigFactory.empty().withValue(ConfigurationKeys.JOB_CONFIG_FILE_GENERAL_PATH_KEY,
-        ConfigValueFactory.fromAnyRef("/tmp/" + GobblinHelixJobScheduler.class.getSimpleName()));
+        ConfigValueFactory.fromAnyRef(p.toString()));
     SchedulerService schedulerService = new SchedulerService(new Properties());
     NonObservingFSJobCatalog jobCatalog = new NonObservingFSJobCatalog(config);
     jobCatalog.startAsync();
-    GobblinHelixJobScheduler jobScheduler =
-        new GobblinHelixJobScheduler(ConfigFactory.empty(), this.helixManager, java.util.Optional.empty(),
-            new EventBus(), appWorkDir, Lists.emptyList(), schedulerService, jobCatalog);
-
-    final Properties properties1 =
-        GobblinHelixJobLauncherTest.generateJobProperties(this.baseConfig, "1", workflowIdSuffix1);
-    properties1.setProperty(GobblinClusterConfigurationKeys.CANCEL_RUNNING_JOB_ON_DELETE, "true");
+    Config helixJobSchedulerConfig = ConfigFactory.empty().withValue("helix.job.scheduling.throttle.enabled",

Review Comment:
   Use the `GobblinClusterConfigurationKeys.HELIX_JOB_SCHEDULING_THROTTLE_ENABLED_KEY` instead of the raw string value



##########
gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobScheduler.java:
##########
@@ -360,7 +400,7 @@ private void waitForJobCompletion(String jobName) {
   }
 
   @Subscribe
-  public void handleDeleteJobConfigArrival(DeleteJobConfigArrivalEvent deleteJobArrival) throws InterruptedException {
+  public synchronized void handleDeleteJobConfigArrival(DeleteJobConfigArrivalEvent deleteJobArrival) throws InterruptedException {

Review Comment:
   Super minor nit. Not sure if it's even worth implementing:
   
   Would we want to reset the `Instant` in the map to `Instant.EPOCH` if we delete a workflow? My understanding is that internally we don't use this delete job config method and only rely on update, so this wouldn't really affect our own use case.
   
   I am not sure which behavior is more intuitive:
   1. If I explicitly delete, I should be able to reschedule it and bypass the throttle time
   2. Regardless of if I deleted the old flow, the throttle time should prevent resubmission



##########
gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinHelixJobSchedulerTest.java:
##########
@@ -129,58 +140,194 @@ public void setUp()
     this.thread.start();
   }
 
+  /***
+   * Time span exceeds throttle timeout, within same workflow, throttle is enabled
+   * Job will be updated
+   * @throws Exception
+   */
+  @Test
+  public void testUpdateSameWorkflowLongPeriodThrottle()
+      throws Exception {
+    runWorkflowTest(exceedsThrottlePeriod, "UpdateSameWorkflowLongPeriodThrottle",
+        workflowIdSuffix1, workflowIdSuffix2, workflowIdSuffix2,
+        true, true);
+  }
+
+  /***
+   * Time span is within throttle timeout, within same workflow, throttle is enabled
+   * Job will not be updated
+   * @throws Exception
+   */
+  @Test
+  public void testUpdateSameWorkflowShortPeriodThrottle()
+      throws Exception {
+    runWorkflowTest(withinThrottlePeriod, "UpdateSameWorkflowShortPeriodThrottle",
+        workflowIdSuffix1, workflowIdSuffix2, workflowIdSuffix1,
+        true, true);
+  }
+
+  /***
+   * Time span exceeds throttle timeout, within same workflow, throttle is not enabled
+   * Job will be updated
+   * @throws Exception
+   */
+  @Test
+  public void testUpdateSameWorkflowLongPeriodNoThrottle()
+      throws Exception {
+    runWorkflowTest(exceedsThrottlePeriod, "UpdateSameWorkflowLongPeriodNoThrottle",
+        workflowIdSuffix1, workflowIdSuffix2, workflowIdSuffix2,
+        false, true);
+  }
+
+  /***
+   * Time span is within throttle timeout, within same workflow, throttle is not enabled
+   * Job will be updated
+   * @throws Exception
+   */
+  @Test
+  public void testUpdateSameWorkflowShortPeriodNoThrottle()
+      throws Exception {
+    runWorkflowTest(withinThrottlePeriod, "UpdateSameWorkflowShortPeriodNoThrottle",
+        workflowIdSuffix1, workflowIdSuffix2, workflowIdSuffix2,
+        false, true);
+  }
+
+  /***
+   * Time span is within throttle timeout, within different workflow, throttle is enabled
+   * Job will be updated
+   * @throws Exception
+   */
+  public void testUpdateDiffWorkflowShortPeriodThrottle()
+      throws Exception {
+    runWorkflowTest(withinThrottlePeriod, "UpdateDiffWorkflowShortPeriodThrottle",
+        workflowIdSuffix1, workflowIdSuffix2, workflowIdSuffix2,
+        true, false);
+  }
+
+  /***
+   * Time span is within throttle timeout, within different workflow, throttle is not enabled
+   * Job will be updated
+   * @throws Exception
+   */
+  @Test
+  public void testUpdateDiffWorkflowShortPeriodNoThrottle()
+      throws Exception {
+    runWorkflowTest(withinThrottlePeriod, "UpdateDiffWorkflowShortPeriodNoThrottle",
+        workflowIdSuffix1, workflowIdSuffix2, workflowIdSuffix2,
+        false, false);
+  }
+
+  /***
+   * Time span exceeds throttle timeout, within different workflow, throttle is enabled
+   * Job will be updated
+   * @throws Exception
+   */
+  @Test
+  public void testUpdateDiffWorkflowLongPeriodThrottle()
+      throws Exception {
+    runWorkflowTest(exceedsThrottlePeriod, "UpdateDiffWorkflowLongPeriodThrottle",
+        workflowIdSuffix1, workflowIdSuffix2, workflowIdSuffix2,
+        true, false);
+  }
+
+  /***
+   * Time span exceeds throttle timeout, within different workflow, throttle is not enabled
+   * Job will be updated
+   * @throws Exception
+   */
   @Test
-  public void testNewJobAndUpdate()
+  public void testUpdateDiffWorkflowLongPeriodNoThrottle()
       throws Exception {
+    runWorkflowTest(exceedsThrottlePeriod, "UpdateDiffWorkflowLongPeriodNoThrottle",
+        workflowIdSuffix1, workflowIdSuffix2, workflowIdSuffix2,
+        false, false);
+  }
+
+  private GobblinHelixJobScheduler createJobScheduler(HelixManager helixManager, boolean isThrottleEnabled, Clock clock) throws Exception {
+    java.nio.file.Path p = Files.createTempDirectory(GobblinHelixJobScheduler.class.getSimpleName());
     Config config = ConfigFactory.empty().withValue(ConfigurationKeys.JOB_CONFIG_FILE_GENERAL_PATH_KEY,
-        ConfigValueFactory.fromAnyRef("/tmp/" + GobblinHelixJobScheduler.class.getSimpleName()));
+        ConfigValueFactory.fromAnyRef(p.toString()));
     SchedulerService schedulerService = new SchedulerService(new Properties());
     NonObservingFSJobCatalog jobCatalog = new NonObservingFSJobCatalog(config);
     jobCatalog.startAsync();
-    GobblinHelixJobScheduler jobScheduler =
-        new GobblinHelixJobScheduler(ConfigFactory.empty(), this.helixManager, java.util.Optional.empty(),
-            new EventBus(), appWorkDir, Lists.emptyList(), schedulerService, jobCatalog);
-
-    final Properties properties1 =
-        GobblinHelixJobLauncherTest.generateJobProperties(this.baseConfig, "1", workflowIdSuffix1);
-    properties1.setProperty(GobblinClusterConfigurationKeys.CANCEL_RUNNING_JOB_ON_DELETE, "true");
+    Config helixJobSchedulerConfig = ConfigFactory.empty().withValue("helix.job.scheduling.throttle.enabled",
+        ConfigValueFactory.fromAnyRef(isThrottleEnabled));
+    GobblinHelixJobScheduler gobblinHelixJobScheduler = new GobblinHelixJobScheduler(helixJobSchedulerConfig, helixManager, java.util.Optional.empty(),
+          new EventBus(), appWorkDir, Lists.emptyList(), schedulerService, jobCatalog, clock);
+    return gobblinHelixJobScheduler;
+  }
 
+  private NewJobConfigArrivalEvent createJobConfigArrivalEvent(Properties properties) {
+    properties.setProperty(GobblinClusterConfigurationKeys.CANCEL_RUNNING_JOB_ON_DELETE, "true");
     NewJobConfigArrivalEvent newJobConfigArrivalEvent =
-        new NewJobConfigArrivalEvent(properties1.getProperty(ConfigurationKeys.JOB_NAME_KEY), properties1);
-    jobScheduler.handleNewJobConfigArrival(newJobConfigArrivalEvent);
-    properties1.setProperty(ConfigurationKeys.JOB_ID_KEY,
-        "job_" + properties1.getProperty(ConfigurationKeys.JOB_NAME_KEY) + workflowIdSuffix2);
-    Map<String, String> workflowIdMap;
-    this.helixManager.connect();
+        new NewJobConfigArrivalEvent(properties.getProperty(ConfigurationKeys.JOB_NAME_KEY), properties);
+    return newJobConfigArrivalEvent;
+  }
+
+  private void connectAndAssertWorkflowId(String expectedSuffix, NewJobConfigArrivalEvent newJobConfigArrivalEvent, HelixManager helixManager ) throws Exception {

Review Comment:
   Random question, but why do we use `NewJobConfigArrivalEvent` instead of just passing a string job name? There were some places in the code where we constructed a brand new `NewJobConfigArrivalEvent` just to pass it into this method.



##########
gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobScheduler.java:
##########
@@ -315,24 +344,35 @@ public void handleNewJobConfigArrival(NewJobConfigArrivalEvent newJobArrival) {
       jobProps.setProperty(GobblinClusterConfigurationKeys.JOB_SPEC_URI, jobUri);
 
       this.jobSchedulerMetrics.updateTimeBeforeJobScheduling(jobProps);
-
+      GobblinHelixJobLauncherListener listener = isThrottleEnabled ?
+          new GobblinThrottlingHelixJobLauncherListener(this.launcherMetrics, jobNameToNextSchedulableTime,
+              jobSchedulingThrottleTimeout, clock)
+          : new GobblinHelixJobLauncherListener(this.launcherMetrics);
       if (jobProps.containsKey(ConfigurationKeys.JOB_SCHEDULE_KEY)) {
         LOGGER.info("Scheduling job " + jobUri);
-        scheduleJob(jobProps,
-                    new GobblinHelixJobLauncherListener(this.launcherMetrics));
+        scheduleJob(jobProps, listener);
       } else {
         LOGGER.info("No job schedule found, so running job " + jobUri);
-        this.jobExecutor.execute(new NonScheduledJobRunner(jobProps,
-                                 new GobblinHelixJobLauncherListener(this.launcherMetrics)));
+        this.jobExecutor.execute(new NonScheduledJobRunner(jobProps, listener));
       }
     } catch (JobException je) {
       LOGGER.error("Failed to schedule or run job " + jobUri, je);
     }
   }
 
   @Subscribe
-  public void handleUpdateJobConfigArrival(UpdateJobConfigArrivalEvent updateJobArrival) {
+  public synchronized void handleUpdateJobConfigArrival(UpdateJobConfigArrivalEvent updateJobArrival) {
     LOGGER.info("Received update for job configuration of job " + updateJobArrival.getJobName());
+    String jobName = updateJobArrival.getJobName();
+
+    Instant nextSchedulableTime = jobNameToNextSchedulableTime.getOrDefault(jobName, Instant.MIN);

Review Comment:
   Random question, but is there a reason we use `Instant.min` as the default value here and `Instant.EPOCH` as the placeholder elsewhere?



##########
gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobScheduler.java:
##########
@@ -303,7 +332,7 @@ public Object get(long timeout, TimeUnit unit) throws InterruptedException, Exec
   }
 
   @Subscribe
-  public void handleNewJobConfigArrival(NewJobConfigArrivalEvent newJobArrival) {
+  public synchronized void handleNewJobConfigArrival(NewJobConfigArrivalEvent newJobArrival) {

Review Comment:
   I see a race condition here. Although only 1 thread can access this method at a time. The scheduling itself is still async. I.e. the `onJobPrepare` method is called async.
   
   Suppose we have 2 threads `T1` and `T2`. And consider the following interleaving:
   ```
   Synchronous:
     T1 calls handlenewJobConfigArrival
     T1 submits listener to executor
     T1 finishes handlenewJobConfigArrival
   Synchronous:
     T2 calls handlenewJobConfigArrival
     T2 submits listener to executor
     T2 finishes handlenewJobConfigArrival
   
   Executor picks up runnable submitted by T1
   Executor picks up runnable submitted by T2
   ```
   
   This would fail to throttle since both were submitted to the executor. We should make sure to update the map before submitting and if there is a jobException we should reset the time exactly the same way done in the listener. 



##########
gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobScheduler.java:
##########
@@ -360,7 +400,7 @@ private void waitForJobCompletion(String jobName) {
   }
 
   @Subscribe
-  public void handleDeleteJobConfigArrival(DeleteJobConfigArrivalEvent deleteJobArrival) throws InterruptedException {
+  public synchronized void handleDeleteJobConfigArrival(DeleteJobConfigArrivalEvent deleteJobArrival) throws InterruptedException {

Review Comment:
   The current behavior is (2). And to make the behavior (1), we would:
   1. Store the current time in the map, 
   2. Set the value in the map to `Instant.EPOCH` 
   3. If there is a job exception we reset the value back to the original value that was in the map
   
   The delete operations are synchronous and the method is `synchronized`, so this approach would be thread safe



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@gobblin.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [gobblin] homatthew commented on a diff in pull request #3704: [GOBBLIN-1840] Helix Job scheduler should not try to replace running workflow if within configured time

Posted by "homatthew (via GitHub)" <gi...@apache.org>.
homatthew commented on code in PR #3704:
URL: https://github.com/apache/gobblin/pull/3704#discussion_r1242790138


##########
gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobScheduler.java:
##########
@@ -315,24 +344,39 @@ public void handleNewJobConfigArrival(NewJobConfigArrivalEvent newJobArrival) {
       jobProps.setProperty(GobblinClusterConfigurationKeys.JOB_SPEC_URI, jobUri);
 
       this.jobSchedulerMetrics.updateTimeBeforeJobScheduling(jobProps);
-
+      GobblinHelixJobLauncherListener listener = isThrottleEnabled ?
+          new GobblinThrottlingHelixJobLauncherListener(this.launcherMetrics, jobNameToNextSchedulableTime,
+              jobSchedulingThrottleTimeout, clock)
+          : new GobblinHelixJobLauncherListener(this.launcherMetrics);
       if (jobProps.containsKey(ConfigurationKeys.JOB_SCHEDULE_KEY)) {
         LOGGER.info("Scheduling job " + jobUri);
         scheduleJob(jobProps,
-                    new GobblinHelixJobLauncherListener(this.launcherMetrics));
+            listener);
       } else {
-        LOGGER.info("No job schedule found, so running job " + jobUri);
+        LOGGER.info("No job schedule"
+            + " found, so running job " + jobUri);

Review Comment:
   nit: Does not need to be on a new line



##########
gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobScheduler.java:
##########
@@ -315,24 +344,39 @@ public void handleNewJobConfigArrival(NewJobConfigArrivalEvent newJobArrival) {
       jobProps.setProperty(GobblinClusterConfigurationKeys.JOB_SPEC_URI, jobUri);
 
       this.jobSchedulerMetrics.updateTimeBeforeJobScheduling(jobProps);
-
+      GobblinHelixJobLauncherListener listener = isThrottleEnabled ?
+          new GobblinThrottlingHelixJobLauncherListener(this.launcherMetrics, jobNameToNextSchedulableTime,
+              jobSchedulingThrottleTimeout, clock)
+          : new GobblinHelixJobLauncherListener(this.launcherMetrics);
       if (jobProps.containsKey(ConfigurationKeys.JOB_SCHEDULE_KEY)) {
         LOGGER.info("Scheduling job " + jobUri);
         scheduleJob(jobProps,
-                    new GobblinHelixJobLauncherListener(this.launcherMetrics));
+            listener);
       } else {
-        LOGGER.info("No job schedule found, so running job " + jobUri);
+        LOGGER.info("No job schedule"
+            + " found, so running job " + jobUri);
         this.jobExecutor.execute(new NonScheduledJobRunner(jobProps,
-                                 new GobblinHelixJobLauncherListener(this.launcherMetrics)));
+            listener));
       }
+
     } catch (JobException je) {
       LOGGER.error("Failed to schedule or run job " + jobUri, je);
     }
   }
 
   @Subscribe
-  public void handleUpdateJobConfigArrival(UpdateJobConfigArrivalEvent updateJobArrival) {
+  public synchronized void handleUpdateJobConfigArrival(UpdateJobConfigArrivalEvent updateJobArrival) {
     LOGGER.info("Received update for job configuration of job " + updateJobArrival.getJobName());
+    String jobName = updateJobArrival.getJobName();
+
+    if (this.isThrottleEnabled &&
+        this.jobNameToNextSchedulableTime.getOrDefault(jobName, Instant.ofEpochMilli(0)).isAfter(clock.instant())) {

Review Comment:
   Nit: This line is a bit dense. And to indicate beginning of time, the documentation for `Instant` has `Instant.MIN` or `Instant.EPOCH` which should be more readable.
   
   Also, intuitively it feels a little weird to read as "nextSchedulableTime is after current time". I feel it's more intuitive for it to be
   
   "current time is before nextSchedulableTime" i.e.
   ```
   clock.instant().isBefore(jobNameToNextSchedulableTime.getOrDefault(jobName, Instant.ofEpochMilli(0)))
   ```
   or IMO even more readable
   ```
   Instant nextSchedulableTime = jobNameToNextSchedulableTime.getOrDefault(jobName, Instant.MIN);
   if (this.isThrottleEnabled && clock.instant().isBefore(nextSchedulableTime)) {
   ...
   ```



##########
gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinThrottlingHelixJobLauncherListener.java:
##########
@@ -0,0 +1,64 @@
+package org.apache.gobblin.cluster;
+
+import java.time.Clock;
+import java.time.Duration;
+import java.time.Instant;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.gobblin.runtime.JobContext;
+import org.apache.gobblin.runtime.JobState;
+
+
+/**
+ * A job listener used when {@link GobblinHelixJobLauncher} launches a job.
+ * In {@link GobblinHelixJobScheduler}, when throttling is enabled, this
+ * listener would record jobName to next schedulable time to decide whether
+ * the replanning should be executed or skipped.
+ */
+public class GobblinThrottlingHelixJobLauncherListener extends GobblinHelixJobLauncherListener {
+
+  public final static Logger LOG = LoggerFactory.getLogger(GobblinThrottlingHelixJobLauncherListener.class);
+  private ConcurrentHashMap<String, Instant> jobNameToNextSchedulableTime;
+  private Duration helixJobSchedulingThrottleTimeout;
+  private Clock clock;
+
+  public GobblinThrottlingHelixJobLauncherListener(GobblinHelixJobLauncherMetrics jobLauncherMetrics,
+      ConcurrentHashMap jobNameToNextSchedulableTime, Duration helixJobSchedulingThrottleTimeout, Clock clock) {
+    super(jobLauncherMetrics);
+    this.jobNameToNextSchedulableTime = jobNameToNextSchedulableTime;
+    this.helixJobSchedulingThrottleTimeout = helixJobSchedulingThrottleTimeout;
+    this.clock = clock;
+  }
+
+  @Override
+  public void onJobPrepare(JobContext jobContext)
+      throws Exception {
+    super.onJobPrepare(jobContext);
+    Instant nextSchedulableTime = clock.instant().plus(helixJobSchedulingThrottleTimeout);
+    jobNameToNextSchedulableTime.put(jobContext.getJobName(), nextSchedulableTime);
+    LOG.info(jobContext.getJobName() + " finished prepare. The next schedulable time is  " + nextSchedulableTime );
+  }
+
+  @Override
+  public void onJobCompletion(JobContext jobContext)
+      throws Exception {
+    super.onJobCompletion(jobContext);
+    if (jobContext.getJobState().getState() == JobState.RunningState.FAILED) {
+      jobNameToNextSchedulableTime.put(jobContext.getJobName(), Instant.ofEpochMilli(0));
+    } else {
+      Instant nextSchedulableTime = clock.instant().plus(helixJobSchedulingThrottleTimeout);
+      jobNameToNextSchedulableTime.put(jobContext.getJobName(), nextSchedulableTime);
+      LOG.info(jobContext.getJobName() + " finished completion. The next schedulable time is " + nextSchedulableTime );
+    }
+  }
+
+  @Override
+  public void onJobCancellation(JobContext jobContext)
+      throws Exception {
+    super.onJobCancellation(jobContext);
+    jobNameToNextSchedulableTime.put(jobContext.getJobName(), Instant.ofEpochMilli(0));

Review Comment:
   Same as for job failed. We'd want something similar. 



##########
gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinHelixJobSchedulerTest.java:
##########
@@ -70,10 +87,18 @@ public class GobblinHelixJobSchedulerTest {
   private GobblinTaskRunner gobblinTaskRunner;
 
   private Thread thread;
-
   private final String workflowIdSuffix1 = "_1504201348471";
   private final String workflowIdSuffix2 = "_1504201348472";
 
+  private Instant beginTime = Instant.ofEpochMilli(0);
+  private Instant withinThrottlePeriod = beginTime.plus(1, ChronoUnit.SECONDS);

Review Comment:
   Nit: seems like these can be final



##########
gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobScheduler.java:
##########
@@ -315,24 +344,39 @@ public void handleNewJobConfigArrival(NewJobConfigArrivalEvent newJobArrival) {
       jobProps.setProperty(GobblinClusterConfigurationKeys.JOB_SPEC_URI, jobUri);
 
       this.jobSchedulerMetrics.updateTimeBeforeJobScheduling(jobProps);
-
+      GobblinHelixJobLauncherListener listener = isThrottleEnabled ?
+          new GobblinThrottlingHelixJobLauncherListener(this.launcherMetrics, jobNameToNextSchedulableTime,
+              jobSchedulingThrottleTimeout, clock)
+          : new GobblinHelixJobLauncherListener(this.launcherMetrics);
       if (jobProps.containsKey(ConfigurationKeys.JOB_SCHEDULE_KEY)) {
         LOGGER.info("Scheduling job " + jobUri);
         scheduleJob(jobProps,
-                    new GobblinHelixJobLauncherListener(this.launcherMetrics));
+            listener);
       } else {
-        LOGGER.info("No job schedule found, so running job " + jobUri);
+        LOGGER.info("No job schedule"
+            + " found, so running job " + jobUri);
         this.jobExecutor.execute(new NonScheduledJobRunner(jobProps,
-                                 new GobblinHelixJobLauncherListener(this.launcherMetrics)));
+            listener));

Review Comment:
   nit: Does not need to be on a new line



##########
gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinThrottlingHelixJobLauncherListener.java:
##########
@@ -0,0 +1,64 @@
+package org.apache.gobblin.cluster;
+
+import java.time.Clock;
+import java.time.Duration;
+import java.time.Instant;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.gobblin.runtime.JobContext;
+import org.apache.gobblin.runtime.JobState;
+
+
+/**
+ * A job listener used when {@link GobblinHelixJobLauncher} launches a job.
+ * In {@link GobblinHelixJobScheduler}, when throttling is enabled, this
+ * listener would record jobName to next schedulable time to decide whether
+ * the replanning should be executed or skipped.
+ */
+public class GobblinThrottlingHelixJobLauncherListener extends GobblinHelixJobLauncherListener {
+
+  public final static Logger LOG = LoggerFactory.getLogger(GobblinThrottlingHelixJobLauncherListener.class);
+  private ConcurrentHashMap<String, Instant> jobNameToNextSchedulableTime;
+  private Duration helixJobSchedulingThrottleTimeout;
+  private Clock clock;
+
+  public GobblinThrottlingHelixJobLauncherListener(GobblinHelixJobLauncherMetrics jobLauncherMetrics,
+      ConcurrentHashMap jobNameToNextSchedulableTime, Duration helixJobSchedulingThrottleTimeout, Clock clock) {
+    super(jobLauncherMetrics);
+    this.jobNameToNextSchedulableTime = jobNameToNextSchedulableTime;
+    this.helixJobSchedulingThrottleTimeout = helixJobSchedulingThrottleTimeout;
+    this.clock = clock;
+  }
+
+  @Override
+  public void onJobPrepare(JobContext jobContext)
+      throws Exception {
+    super.onJobPrepare(jobContext);
+    Instant nextSchedulableTime = clock.instant().plus(helixJobSchedulingThrottleTimeout);
+    jobNameToNextSchedulableTime.put(jobContext.getJobName(), nextSchedulableTime);
+    LOG.info(jobContext.getJobName() + " finished prepare. The next schedulable time is  " + nextSchedulableTime );
+  }
+
+  @Override
+  public void onJobCompletion(JobContext jobContext)
+      throws Exception {
+    super.onJobCompletion(jobContext);
+    if (jobContext.getJobState().getState() == JobState.RunningState.FAILED) {
+      jobNameToNextSchedulableTime.put(jobContext.getJobName(), Instant.ofEpochMilli(0));

Review Comment:
   Also, instead of ofEpochMilli(0), let's use `Instant.EPOCH`



##########
gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobScheduler.java:
##########
@@ -315,24 +344,39 @@ public void handleNewJobConfigArrival(NewJobConfigArrivalEvent newJobArrival) {
       jobProps.setProperty(GobblinClusterConfigurationKeys.JOB_SPEC_URI, jobUri);
 
       this.jobSchedulerMetrics.updateTimeBeforeJobScheduling(jobProps);
-
+      GobblinHelixJobLauncherListener listener = isThrottleEnabled ?
+          new GobblinThrottlingHelixJobLauncherListener(this.launcherMetrics, jobNameToNextSchedulableTime,
+              jobSchedulingThrottleTimeout, clock)
+          : new GobblinHelixJobLauncherListener(this.launcherMetrics);
       if (jobProps.containsKey(ConfigurationKeys.JOB_SCHEDULE_KEY)) {
         LOGGER.info("Scheduling job " + jobUri);
         scheduleJob(jobProps,
-                    new GobblinHelixJobLauncherListener(this.launcherMetrics));
+            listener);

Review Comment:
   nit: Does not need to be on a new line



##########
gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinThrottlingHelixJobLauncherListener.java:
##########
@@ -0,0 +1,64 @@
+package org.apache.gobblin.cluster;
+
+import java.time.Clock;
+import java.time.Duration;
+import java.time.Instant;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.gobblin.runtime.JobContext;
+import org.apache.gobblin.runtime.JobState;
+
+
+/**
+ * A job listener used when {@link GobblinHelixJobLauncher} launches a job.
+ * In {@link GobblinHelixJobScheduler}, when throttling is enabled, this
+ * listener would record jobName to next schedulable time to decide whether
+ * the replanning should be executed or skipped.
+ */
+public class GobblinThrottlingHelixJobLauncherListener extends GobblinHelixJobLauncherListener {
+
+  public final static Logger LOG = LoggerFactory.getLogger(GobblinThrottlingHelixJobLauncherListener.class);
+  private ConcurrentHashMap<String, Instant> jobNameToNextSchedulableTime;
+  private Duration helixJobSchedulingThrottleTimeout;
+  private Clock clock;
+
+  public GobblinThrottlingHelixJobLauncherListener(GobblinHelixJobLauncherMetrics jobLauncherMetrics,
+      ConcurrentHashMap jobNameToNextSchedulableTime, Duration helixJobSchedulingThrottleTimeout, Clock clock) {
+    super(jobLauncherMetrics);
+    this.jobNameToNextSchedulableTime = jobNameToNextSchedulableTime;
+    this.helixJobSchedulingThrottleTimeout = helixJobSchedulingThrottleTimeout;
+    this.clock = clock;
+  }
+
+  @Override
+  public void onJobPrepare(JobContext jobContext)
+      throws Exception {
+    super.onJobPrepare(jobContext);
+    Instant nextSchedulableTime = clock.instant().plus(helixJobSchedulingThrottleTimeout);
+    jobNameToNextSchedulableTime.put(jobContext.getJobName(), nextSchedulableTime);
+    LOG.info(jobContext.getJobName() + " finished prepare. The next schedulable time is  " + nextSchedulableTime );
+  }
+
+  @Override
+  public void onJobCompletion(JobContext jobContext)
+      throws Exception {
+    super.onJobCompletion(jobContext);
+    if (jobContext.getJobState().getState() == JobState.RunningState.FAILED) {
+      jobNameToNextSchedulableTime.put(jobContext.getJobName(), Instant.ofEpochMilli(0));
+    } else {
+      Instant nextSchedulableTime = clock.instant().plus(helixJobSchedulingThrottleTimeout);
+      jobNameToNextSchedulableTime.put(jobContext.getJobName(), nextSchedulableTime);
+      LOG.info(jobContext.getJobName() + " finished completion. The next schedulable time is " + nextSchedulableTime );

Review Comment:
   Maybe `is completed` instead of `finished completion`?



##########
gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobScheduler.java:
##########
@@ -315,24 +344,39 @@ public void handleNewJobConfigArrival(NewJobConfigArrivalEvent newJobArrival) {
       jobProps.setProperty(GobblinClusterConfigurationKeys.JOB_SPEC_URI, jobUri);
 
       this.jobSchedulerMetrics.updateTimeBeforeJobScheduling(jobProps);
-
+      GobblinHelixJobLauncherListener listener = isThrottleEnabled ?
+          new GobblinThrottlingHelixJobLauncherListener(this.launcherMetrics, jobNameToNextSchedulableTime,
+              jobSchedulingThrottleTimeout, clock)
+          : new GobblinHelixJobLauncherListener(this.launcherMetrics);
       if (jobProps.containsKey(ConfigurationKeys.JOB_SCHEDULE_KEY)) {
         LOGGER.info("Scheduling job " + jobUri);
         scheduleJob(jobProps,
-                    new GobblinHelixJobLauncherListener(this.launcherMetrics));
+            listener);
       } else {
-        LOGGER.info("No job schedule found, so running job " + jobUri);
+        LOGGER.info("No job schedule"
+            + " found, so running job " + jobUri);
         this.jobExecutor.execute(new NonScheduledJobRunner(jobProps,
-                                 new GobblinHelixJobLauncherListener(this.launcherMetrics)));
+            listener));
       }
+

Review Comment:
   nit: Does not need a new line



##########
gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobScheduler.java:
##########
@@ -443,6 +487,10 @@ private void cancelJobIfRequired(DeleteJobConfigArrivalEvent deleteJobArrival) t
     }
   }
 
+  public void setThrottleEnabled(boolean throttleEnabled) {

Review Comment:
   use lombok `@Setter` instead of declaring one. Also, why do we need this? Isn't this value settable via the config even for testing?



##########
gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinThrottlingHelixJobLauncherListener.java:
##########
@@ -0,0 +1,64 @@
+package org.apache.gobblin.cluster;
+
+import java.time.Clock;
+import java.time.Duration;
+import java.time.Instant;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.gobblin.runtime.JobContext;
+import org.apache.gobblin.runtime.JobState;
+
+
+/**
+ * A job listener used when {@link GobblinHelixJobLauncher} launches a job.
+ * In {@link GobblinHelixJobScheduler}, when throttling is enabled, this
+ * listener would record jobName to next schedulable time to decide whether
+ * the replanning should be executed or skipped.
+ */
+public class GobblinThrottlingHelixJobLauncherListener extends GobblinHelixJobLauncherListener {
+
+  public final static Logger LOG = LoggerFactory.getLogger(GobblinThrottlingHelixJobLauncherListener.class);
+  private ConcurrentHashMap<String, Instant> jobNameToNextSchedulableTime;
+  private Duration helixJobSchedulingThrottleTimeout;
+  private Clock clock;
+
+  public GobblinThrottlingHelixJobLauncherListener(GobblinHelixJobLauncherMetrics jobLauncherMetrics,
+      ConcurrentHashMap jobNameToNextSchedulableTime, Duration helixJobSchedulingThrottleTimeout, Clock clock) {
+    super(jobLauncherMetrics);
+    this.jobNameToNextSchedulableTime = jobNameToNextSchedulableTime;
+    this.helixJobSchedulingThrottleTimeout = helixJobSchedulingThrottleTimeout;
+    this.clock = clock;
+  }
+
+  @Override
+  public void onJobPrepare(JobContext jobContext)
+      throws Exception {
+    super.onJobPrepare(jobContext);
+    Instant nextSchedulableTime = clock.instant().plus(helixJobSchedulingThrottleTimeout);
+    jobNameToNextSchedulableTime.put(jobContext.getJobName(), nextSchedulableTime);
+    LOG.info(jobContext.getJobName() + " finished prepare. The next schedulable time is  " + nextSchedulableTime );

Review Comment:
   Nit: grammar "finished preparing"



##########
gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinThrottlingHelixJobLauncherListener.java:
##########
@@ -0,0 +1,64 @@
+package org.apache.gobblin.cluster;
+
+import java.time.Clock;
+import java.time.Duration;
+import java.time.Instant;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.gobblin.runtime.JobContext;
+import org.apache.gobblin.runtime.JobState;
+
+
+/**
+ * A job listener used when {@link GobblinHelixJobLauncher} launches a job.
+ * In {@link GobblinHelixJobScheduler}, when throttling is enabled, this
+ * listener would record jobName to next schedulable time to decide whether
+ * the replanning should be executed or skipped.
+ */
+public class GobblinThrottlingHelixJobLauncherListener extends GobblinHelixJobLauncherListener {
+
+  public final static Logger LOG = LoggerFactory.getLogger(GobblinThrottlingHelixJobLauncherListener.class);
+  private ConcurrentHashMap<String, Instant> jobNameToNextSchedulableTime;
+  private Duration helixJobSchedulingThrottleTimeout;
+  private Clock clock;
+
+  public GobblinThrottlingHelixJobLauncherListener(GobblinHelixJobLauncherMetrics jobLauncherMetrics,
+      ConcurrentHashMap jobNameToNextSchedulableTime, Duration helixJobSchedulingThrottleTimeout, Clock clock) {
+    super(jobLauncherMetrics);
+    this.jobNameToNextSchedulableTime = jobNameToNextSchedulableTime;
+    this.helixJobSchedulingThrottleTimeout = helixJobSchedulingThrottleTimeout;
+    this.clock = clock;
+  }
+
+  @Override
+  public void onJobPrepare(JobContext jobContext)
+      throws Exception {
+    super.onJobPrepare(jobContext);
+    Instant nextSchedulableTime = clock.instant().plus(helixJobSchedulingThrottleTimeout);
+    jobNameToNextSchedulableTime.put(jobContext.getJobName(), nextSchedulableTime);
+    LOG.info(jobContext.getJobName() + " finished prepare. The next schedulable time is  " + nextSchedulableTime );

Review Comment:
   White space after the `nextSchedulableTime`



##########
gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinThrottlingHelixJobLauncherListener.java:
##########
@@ -0,0 +1,64 @@
+package org.apache.gobblin.cluster;
+
+import java.time.Clock;
+import java.time.Duration;
+import java.time.Instant;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.gobblin.runtime.JobContext;
+import org.apache.gobblin.runtime.JobState;
+
+
+/**
+ * A job listener used when {@link GobblinHelixJobLauncher} launches a job.
+ * In {@link GobblinHelixJobScheduler}, when throttling is enabled, this
+ * listener would record jobName to next schedulable time to decide whether
+ * the replanning should be executed or skipped.
+ */
+public class GobblinThrottlingHelixJobLauncherListener extends GobblinHelixJobLauncherListener {
+
+  public final static Logger LOG = LoggerFactory.getLogger(GobblinThrottlingHelixJobLauncherListener.class);
+  private ConcurrentHashMap<String, Instant> jobNameToNextSchedulableTime;
+  private Duration helixJobSchedulingThrottleTimeout;
+  private Clock clock;
+
+  public GobblinThrottlingHelixJobLauncherListener(GobblinHelixJobLauncherMetrics jobLauncherMetrics,
+      ConcurrentHashMap jobNameToNextSchedulableTime, Duration helixJobSchedulingThrottleTimeout, Clock clock) {
+    super(jobLauncherMetrics);
+    this.jobNameToNextSchedulableTime = jobNameToNextSchedulableTime;
+    this.helixJobSchedulingThrottleTimeout = helixJobSchedulingThrottleTimeout;
+    this.clock = clock;
+  }
+
+  @Override
+  public void onJobPrepare(JobContext jobContext)
+      throws Exception {
+    super.onJobPrepare(jobContext);
+    Instant nextSchedulableTime = clock.instant().plus(helixJobSchedulingThrottleTimeout);
+    jobNameToNextSchedulableTime.put(jobContext.getJobName(), nextSchedulableTime);
+    LOG.info(jobContext.getJobName() + " finished prepare. The next schedulable time is  " + nextSchedulableTime );
+  }
+
+  @Override
+  public void onJobCompletion(JobContext jobContext)
+      throws Exception {
+    super.onJobCompletion(jobContext);
+    if (jobContext.getJobState().getState() == JobState.RunningState.FAILED) {
+      jobNameToNextSchedulableTime.put(jobContext.getJobName(), Instant.ofEpochMilli(0));

Review Comment:
   Add a log if the job failed. I see there is an existing log for the entire job context, but having a log specifically from the throttling scheduler would be important here for those not familiar with the code when they are debugging
   
   https://github.com/apache/gobblin/blob/702cadf48f910c79b129032aa673f08ce4397c03/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/listeners/AbstractJobListener.java#L59-L63



##########
gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinHelixJobSchedulerTest.java:
##########
@@ -52,16 +62,23 @@
 import org.apache.gobblin.runtime.job_catalog.NonObservingFSJobCatalog;
 import org.apache.gobblin.scheduler.SchedulerService;
 
+import static org.mockito.Mockito.when;
+
 
 /**
  * Unit tests for {@link org.apache.gobblin.cluster.GobblinHelixJobScheduler}.
  *
  */
-@Test(groups = {"gobblin.cluster"})
+
+/**
+ * In all test cases, we use GobblinHelixManagerFactory instead of
+ * HelixManagerFactory, and use HelixManager as a local variable to avoid
+ * the HelixManager (ZkClient) is not connected error when that's set as

Review Comment:
   Potentially better wording
   > and instantiate a local HelixManager per test to provide isolation and prevent errors caused by the ZKClient being shared (e.g. ZKClient is not connected exceptions). 



##########
gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinHelixJobSchedulerTest.java:
##########
@@ -70,10 +87,18 @@ public class GobblinHelixJobSchedulerTest {
   private GobblinTaskRunner gobblinTaskRunner;
 
   private Thread thread;
-
   private final String workflowIdSuffix1 = "_1504201348471";
   private final String workflowIdSuffix2 = "_1504201348472";
 
+  private Instant beginTime = Instant.ofEpochMilli(0);

Review Comment:
   `Instant.EPOCH`



##########
gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinHelixJobSchedulerTest.java:
##########
@@ -52,16 +62,23 @@
 import org.apache.gobblin.runtime.job_catalog.NonObservingFSJobCatalog;
 import org.apache.gobblin.scheduler.SchedulerService;
 
+import static org.mockito.Mockito.when;
+
 
 /**
  * Unit tests for {@link org.apache.gobblin.cluster.GobblinHelixJobScheduler}.
  *
  */
-@Test(groups = {"gobblin.cluster"})
+

Review Comment:
   Merge this with the java doc above



##########
gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinHelixJobSchedulerTest.java:
##########
@@ -129,58 +148,176 @@ public void setUp()
     this.thread.start();
   }
 
+  // Time span exceeds throttle timeout, within same workflow, throttle is enabled
+  // Job will be updated

Review Comment:
   Comments describing the method should be a java doc instead of a regular comment



##########
gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinHelixJobSchedulerTest.java:
##########
@@ -129,58 +148,176 @@ public void setUp()
     this.thread.start();
   }
 
+  // Time span exceeds throttle timeout, within same workflow, throttle is enabled
+  // Job will be updated
+  @Test
+  public void testUpdateSameWorkflowLongPeriodThrottle()
+      throws Exception {
+    runWorkflowTest(exceedsThrottlePeriod, "UpdateSameWorkflowLongPeriodThrottle",
+        workflowIdSuffix1, workflowIdSuffix2, workflowIdSuffix2,
+        true, true);
+  }
+
+  // Time span is within throttle timeout, within same workflow, throttle is enabled
+  // Job will not be updated
   @Test
-  public void testNewJobAndUpdate()
+  public void testUpdateSameWorkflowShortPeriodThrottle()
       throws Exception {
+    runWorkflowTest(withinThrottlePeriod, "UpdateSameWorkflowShortPeriodThrottle",
+        workflowIdSuffix1, workflowIdSuffix2, workflowIdSuffix1,
+        true, true);
+  }
+
+  // Time span exceeds throttle timeout, within same workflow, throttle is not enabled
+  // Job will be updated
+  @Test
+  public void testUpdateSameWorkflowLongPeriodNoThrottle()
+      throws Exception {
+    runWorkflowTest(exceedsThrottlePeriod, "UpdateSameWorkflowLongPeriodNoThrottle",
+        workflowIdSuffix1, workflowIdSuffix2, workflowIdSuffix2,
+        false, true);
+  }
+
+  // Time span is within throttle timeout, within same workflow, throttle is not enabled
+  // Job will be updated
+  @Test
+  public void testUpdateSameWorkflowShortPeriodNoThrottle()
+      throws Exception {
+    runWorkflowTest(withinThrottlePeriod, "UpdateSameWorkflowShortPeriodNoThrottle",
+        workflowIdSuffix1, workflowIdSuffix2, workflowIdSuffix2,
+        false, true);
+  }
+
+  // Time span is within throttle timeout, within different workflow, throttle is enabled
+  // Job will be updated
+  public void testUpdateDiffWorkflowShortPeriodThrottle()
+      throws Exception {
+    runWorkflowTest(withinThrottlePeriod, "UpdateDiffWorkflowShortPeriodThrottle",
+        workflowIdSuffix1, workflowIdSuffix2, workflowIdSuffix2,
+        true, false);
+  }
+
+  // Time span is within throttle timeout, within different workflow, throttle is not enabled
+  // Job will be updated
+  @Test
+  public void testUpdateDiffWorkflowShortPeriodNoThrottle()
+      throws Exception {
+    runWorkflowTest(withinThrottlePeriod, "UpdateDiffWorkflowShortPeriodNoThrottle",
+        workflowIdSuffix1, workflowIdSuffix2, workflowIdSuffix2,
+        false, false);
+  }
+
+  // Time span exceeds throttle timeout, within different workflow, throttle is enabled
+  // Job will be updated
+  @Test
+  public void testUpdateDiffWorkflowLongPeriodThrottle()
+      throws Exception {
+    runWorkflowTest(exceedsThrottlePeriod, "UpdateDiffWorkflowLongPeriodThrottle",
+        workflowIdSuffix1, workflowIdSuffix2, workflowIdSuffix2,
+        true, false);
+  }
+
+  // Time span exceeds throttle timeout, within different workflow, throttle is not enabled
+  // Job will be updated
+  @Test
+  public void testUpdateDiffWorkflowLongPeriodNoThrottle()
+      throws Exception {
+    runWorkflowTest(exceedsThrottlePeriod, "UpdateDiffWorkflowLongPeriodNoThrottle",
+        workflowIdSuffix1, workflowIdSuffix2, workflowIdSuffix2,
+        false, false);
+  }
+
+  private GobblinHelixJobScheduler createJobScheduler(HelixManager helixManager, boolean isThrottleEnabled, Clock clock) throws Exception {
+    java.nio.file.Path p = Files.createTempDirectory(GobblinHelixJobScheduler.class.getSimpleName());
     Config config = ConfigFactory.empty().withValue(ConfigurationKeys.JOB_CONFIG_FILE_GENERAL_PATH_KEY,
-        ConfigValueFactory.fromAnyRef("/tmp/" + GobblinHelixJobScheduler.class.getSimpleName()));
+        ConfigValueFactory.fromAnyRef(p.toString()));
     SchedulerService schedulerService = new SchedulerService(new Properties());
     NonObservingFSJobCatalog jobCatalog = new NonObservingFSJobCatalog(config);
     jobCatalog.startAsync();
-    GobblinHelixJobScheduler jobScheduler =
-        new GobblinHelixJobScheduler(ConfigFactory.empty(), this.helixManager, java.util.Optional.empty(),
-            new EventBus(), appWorkDir, Lists.emptyList(), schedulerService, jobCatalog);
-
-    final Properties properties1 =
-        GobblinHelixJobLauncherTest.generateJobProperties(this.baseConfig, "1", workflowIdSuffix1);
-    properties1.setProperty(GobblinClusterConfigurationKeys.CANCEL_RUNNING_JOB_ON_DELETE, "true");
+    GobblinHelixJobScheduler gobblinHelixJobScheduler;
+    if (isThrottleEnabled) {
+      gobblinHelixJobScheduler = new GobblinHelixJobScheduler(ConfigFactory.empty(), helixManager, java.util.Optional.empty(),
+          new EventBus(), appWorkDir, Lists.emptyList(), schedulerService, jobCatalog, clock);
+    }
+    else {
+      gobblinHelixJobScheduler = new GobblinHelixJobScheduler(ConfigFactory.empty(), helixManager, java.util.Optional.empty(),
+          new EventBus(), appWorkDir, Lists.emptyList(), schedulerService, jobCatalog);
+    }
+    gobblinHelixJobScheduler.setThrottleEnabled(isThrottleEnabled);

Review Comment:
   nit: wouldn't we want to set this via config?



##########
gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinHelixJobSchedulerTest.java:
##########
@@ -129,58 +148,176 @@ public void setUp()
     this.thread.start();
   }
 
+  // Time span exceeds throttle timeout, within same workflow, throttle is enabled
+  // Job will be updated
+  @Test
+  public void testUpdateSameWorkflowLongPeriodThrottle()
+      throws Exception {
+    runWorkflowTest(exceedsThrottlePeriod, "UpdateSameWorkflowLongPeriodThrottle",
+        workflowIdSuffix1, workflowIdSuffix2, workflowIdSuffix2,
+        true, true);
+  }
+
+  // Time span is within throttle timeout, within same workflow, throttle is enabled
+  // Job will not be updated
   @Test
-  public void testNewJobAndUpdate()
+  public void testUpdateSameWorkflowShortPeriodThrottle()
       throws Exception {
+    runWorkflowTest(withinThrottlePeriod, "UpdateSameWorkflowShortPeriodThrottle",
+        workflowIdSuffix1, workflowIdSuffix2, workflowIdSuffix1,
+        true, true);
+  }
+
+  // Time span exceeds throttle timeout, within same workflow, throttle is not enabled
+  // Job will be updated
+  @Test
+  public void testUpdateSameWorkflowLongPeriodNoThrottle()
+      throws Exception {
+    runWorkflowTest(exceedsThrottlePeriod, "UpdateSameWorkflowLongPeriodNoThrottle",
+        workflowIdSuffix1, workflowIdSuffix2, workflowIdSuffix2,
+        false, true);
+  }
+
+  // Time span is within throttle timeout, within same workflow, throttle is not enabled
+  // Job will be updated
+  @Test
+  public void testUpdateSameWorkflowShortPeriodNoThrottle()
+      throws Exception {
+    runWorkflowTest(withinThrottlePeriod, "UpdateSameWorkflowShortPeriodNoThrottle",
+        workflowIdSuffix1, workflowIdSuffix2, workflowIdSuffix2,
+        false, true);
+  }
+
+  // Time span is within throttle timeout, within different workflow, throttle is enabled
+  // Job will be updated
+  public void testUpdateDiffWorkflowShortPeriodThrottle()
+      throws Exception {
+    runWorkflowTest(withinThrottlePeriod, "UpdateDiffWorkflowShortPeriodThrottle",
+        workflowIdSuffix1, workflowIdSuffix2, workflowIdSuffix2,
+        true, false);
+  }
+
+  // Time span is within throttle timeout, within different workflow, throttle is not enabled
+  // Job will be updated
+  @Test
+  public void testUpdateDiffWorkflowShortPeriodNoThrottle()
+      throws Exception {
+    runWorkflowTest(withinThrottlePeriod, "UpdateDiffWorkflowShortPeriodNoThrottle",
+        workflowIdSuffix1, workflowIdSuffix2, workflowIdSuffix2,
+        false, false);
+  }
+
+  // Time span exceeds throttle timeout, within different workflow, throttle is enabled
+  // Job will be updated
+  @Test
+  public void testUpdateDiffWorkflowLongPeriodThrottle()
+      throws Exception {
+    runWorkflowTest(exceedsThrottlePeriod, "UpdateDiffWorkflowLongPeriodThrottle",
+        workflowIdSuffix1, workflowIdSuffix2, workflowIdSuffix2,
+        true, false);
+  }
+
+  // Time span exceeds throttle timeout, within different workflow, throttle is not enabled
+  // Job will be updated
+  @Test
+  public void testUpdateDiffWorkflowLongPeriodNoThrottle()
+      throws Exception {
+    runWorkflowTest(exceedsThrottlePeriod, "UpdateDiffWorkflowLongPeriodNoThrottle",
+        workflowIdSuffix1, workflowIdSuffix2, workflowIdSuffix2,
+        false, false);
+  }
+
+  private GobblinHelixJobScheduler createJobScheduler(HelixManager helixManager, boolean isThrottleEnabled, Clock clock) throws Exception {
+    java.nio.file.Path p = Files.createTempDirectory(GobblinHelixJobScheduler.class.getSimpleName());
     Config config = ConfigFactory.empty().withValue(ConfigurationKeys.JOB_CONFIG_FILE_GENERAL_PATH_KEY,
-        ConfigValueFactory.fromAnyRef("/tmp/" + GobblinHelixJobScheduler.class.getSimpleName()));
+        ConfigValueFactory.fromAnyRef(p.toString()));
     SchedulerService schedulerService = new SchedulerService(new Properties());
     NonObservingFSJobCatalog jobCatalog = new NonObservingFSJobCatalog(config);
     jobCatalog.startAsync();
-    GobblinHelixJobScheduler jobScheduler =
-        new GobblinHelixJobScheduler(ConfigFactory.empty(), this.helixManager, java.util.Optional.empty(),
-            new EventBus(), appWorkDir, Lists.emptyList(), schedulerService, jobCatalog);
-
-    final Properties properties1 =
-        GobblinHelixJobLauncherTest.generateJobProperties(this.baseConfig, "1", workflowIdSuffix1);
-    properties1.setProperty(GobblinClusterConfigurationKeys.CANCEL_RUNNING_JOB_ON_DELETE, "true");
+    GobblinHelixJobScheduler gobblinHelixJobScheduler;
+    if (isThrottleEnabled) {
+      gobblinHelixJobScheduler = new GobblinHelixJobScheduler(ConfigFactory.empty(), helixManager, java.util.Optional.empty(),

Review Comment:
   We can inject the clock regardless of if throttling is enabled. We'd never want to use UTC clock in a unit test IMO



##########
gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinHelixJobSchedulerTest.java:
##########
@@ -129,58 +148,176 @@ public void setUp()
     this.thread.start();
   }
 
+  // Time span exceeds throttle timeout, within same workflow, throttle is enabled
+  // Job will be updated
+  @Test
+  public void testUpdateSameWorkflowLongPeriodThrottle()
+      throws Exception {
+    runWorkflowTest(exceedsThrottlePeriod, "UpdateSameWorkflowLongPeriodThrottle",
+        workflowIdSuffix1, workflowIdSuffix2, workflowIdSuffix2,
+        true, true);
+  }
+
+  // Time span is within throttle timeout, within same workflow, throttle is enabled
+  // Job will not be updated
   @Test
-  public void testNewJobAndUpdate()
+  public void testUpdateSameWorkflowShortPeriodThrottle()
       throws Exception {
+    runWorkflowTest(withinThrottlePeriod, "UpdateSameWorkflowShortPeriodThrottle",
+        workflowIdSuffix1, workflowIdSuffix2, workflowIdSuffix1,
+        true, true);
+  }
+
+  // Time span exceeds throttle timeout, within same workflow, throttle is not enabled
+  // Job will be updated
+  @Test
+  public void testUpdateSameWorkflowLongPeriodNoThrottle()
+      throws Exception {
+    runWorkflowTest(exceedsThrottlePeriod, "UpdateSameWorkflowLongPeriodNoThrottle",
+        workflowIdSuffix1, workflowIdSuffix2, workflowIdSuffix2,
+        false, true);
+  }
+
+  // Time span is within throttle timeout, within same workflow, throttle is not enabled
+  // Job will be updated
+  @Test
+  public void testUpdateSameWorkflowShortPeriodNoThrottle()
+      throws Exception {
+    runWorkflowTest(withinThrottlePeriod, "UpdateSameWorkflowShortPeriodNoThrottle",
+        workflowIdSuffix1, workflowIdSuffix2, workflowIdSuffix2,
+        false, true);
+  }
+
+  // Time span is within throttle timeout, within different workflow, throttle is enabled
+  // Job will be updated
+  public void testUpdateDiffWorkflowShortPeriodThrottle()
+      throws Exception {
+    runWorkflowTest(withinThrottlePeriod, "UpdateDiffWorkflowShortPeriodThrottle",
+        workflowIdSuffix1, workflowIdSuffix2, workflowIdSuffix2,
+        true, false);
+  }
+
+  // Time span is within throttle timeout, within different workflow, throttle is not enabled
+  // Job will be updated
+  @Test
+  public void testUpdateDiffWorkflowShortPeriodNoThrottle()
+      throws Exception {
+    runWorkflowTest(withinThrottlePeriod, "UpdateDiffWorkflowShortPeriodNoThrottle",
+        workflowIdSuffix1, workflowIdSuffix2, workflowIdSuffix2,
+        false, false);
+  }
+
+  // Time span exceeds throttle timeout, within different workflow, throttle is enabled
+  // Job will be updated
+  @Test
+  public void testUpdateDiffWorkflowLongPeriodThrottle()
+      throws Exception {
+    runWorkflowTest(exceedsThrottlePeriod, "UpdateDiffWorkflowLongPeriodThrottle",
+        workflowIdSuffix1, workflowIdSuffix2, workflowIdSuffix2,
+        true, false);
+  }
+
+  // Time span exceeds throttle timeout, within different workflow, throttle is not enabled
+  // Job will be updated
+  @Test
+  public void testUpdateDiffWorkflowLongPeriodNoThrottle()
+      throws Exception {
+    runWorkflowTest(exceedsThrottlePeriod, "UpdateDiffWorkflowLongPeriodNoThrottle",
+        workflowIdSuffix1, workflowIdSuffix2, workflowIdSuffix2,
+        false, false);
+  }
+
+  private GobblinHelixJobScheduler createJobScheduler(HelixManager helixManager, boolean isThrottleEnabled, Clock clock) throws Exception {
+    java.nio.file.Path p = Files.createTempDirectory(GobblinHelixJobScheduler.class.getSimpleName());
     Config config = ConfigFactory.empty().withValue(ConfigurationKeys.JOB_CONFIG_FILE_GENERAL_PATH_KEY,
-        ConfigValueFactory.fromAnyRef("/tmp/" + GobblinHelixJobScheduler.class.getSimpleName()));
+        ConfigValueFactory.fromAnyRef(p.toString()));
     SchedulerService schedulerService = new SchedulerService(new Properties());
     NonObservingFSJobCatalog jobCatalog = new NonObservingFSJobCatalog(config);
     jobCatalog.startAsync();
-    GobblinHelixJobScheduler jobScheduler =
-        new GobblinHelixJobScheduler(ConfigFactory.empty(), this.helixManager, java.util.Optional.empty(),
-            new EventBus(), appWorkDir, Lists.emptyList(), schedulerService, jobCatalog);
-
-    final Properties properties1 =
-        GobblinHelixJobLauncherTest.generateJobProperties(this.baseConfig, "1", workflowIdSuffix1);
-    properties1.setProperty(GobblinClusterConfigurationKeys.CANCEL_RUNNING_JOB_ON_DELETE, "true");
+    GobblinHelixJobScheduler gobblinHelixJobScheduler;
+    if (isThrottleEnabled) {
+      gobblinHelixJobScheduler = new GobblinHelixJobScheduler(ConfigFactory.empty(), helixManager, java.util.Optional.empty(),
+          new EventBus(), appWorkDir, Lists.emptyList(), schedulerService, jobCatalog, clock);
+    }
+    else {
+      gobblinHelixJobScheduler = new GobblinHelixJobScheduler(ConfigFactory.empty(), helixManager, java.util.Optional.empty(),
+          new EventBus(), appWorkDir, Lists.emptyList(), schedulerService, jobCatalog);
+    }
+    gobblinHelixJobScheduler.setThrottleEnabled(isThrottleEnabled);
+    return gobblinHelixJobScheduler;
+  }
 
+  private NewJobConfigArrivalEvent createJobConfigArrivalEvent(Properties properties) {
+    properties.setProperty(GobblinClusterConfigurationKeys.CANCEL_RUNNING_JOB_ON_DELETE, "true");
     NewJobConfigArrivalEvent newJobConfigArrivalEvent =
-        new NewJobConfigArrivalEvent(properties1.getProperty(ConfigurationKeys.JOB_NAME_KEY), properties1);
-    jobScheduler.handleNewJobConfigArrival(newJobConfigArrivalEvent);
-    properties1.setProperty(ConfigurationKeys.JOB_ID_KEY,
-        "job_" + properties1.getProperty(ConfigurationKeys.JOB_NAME_KEY) + workflowIdSuffix2);
-    Map<String, String> workflowIdMap;
-    this.helixManager.connect();
+        new NewJobConfigArrivalEvent(properties.getProperty(ConfigurationKeys.JOB_NAME_KEY), properties);
+    return newJobConfigArrivalEvent;
+  }
+
+  private void connectAndAssertWorkflowId(String expectedSuffix, NewJobConfigArrivalEvent newJobConfigArrivalEvent, HelixManager helixManager ) throws Exception {
+    helixManager.connect();
+    String workFlowId = getWorkflowID(newJobConfigArrivalEvent, helixManager);
+    Assert.assertNotNull(workFlowId);
+    Assert.assertTrue(workFlowId.endsWith(expectedSuffix));
+  }
 
-    String workFlowId = null;
+  private String getWorkflowID (NewJobConfigArrivalEvent newJobConfigArrivalEvent, HelixManager helixManager)
+      throws Exception {
+    // endTime is manually set time period that we allow HelixUtils to fetch workflowIdMap before timeout

Review Comment:
   Maybe better wording:
   > Poll helix for up to 30 seconds to fetch until a workflow with a matching job name exists in Helix and then return that workflowID



##########
gobblin-cluster/src/test/resources/mockito-extensions/org.mockito.plugins.MockMaker:
##########
@@ -0,0 +1 @@
+mock-maker-inline

Review Comment:
   I don't think we need this anymore since we are not mocking any static classes



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@gobblin.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [gobblin] umustafi commented on a diff in pull request #3704: [GOBBLIN-1840] Helix Job scheduler should not try to replace running workflow if within configured time

Posted by "umustafi (via GitHub)" <gi...@apache.org>.
umustafi commented on code in PR #3704:
URL: https://github.com/apache/gobblin/pull/3704#discussion_r1231364395


##########
gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinHelixJobSchedulerTest.java:
##########
@@ -132,57 +143,260 @@ public void setUp()
   @Test
   public void testNewJobAndUpdate()
       throws Exception {
+    try (MockedStatic<Instant> mocked = mockStatic(Instant.class, CALLS_REAL_METHODS)) {
+      mocked.when(Instant::now).thenReturn(beginTime, longPeriod);
+      HelixManager helixManager = HelixManagerFactory
+          .getZKHelixManager(helixClusterName, TestHelper.TEST_HELIX_INSTANCE_NAME, InstanceType.CONTROLLER,
+              zkConnectingString);
+
+      GobblinHelixJobScheduler jobScheduler = createJobScheduler(helixManager);
+
+      final Properties properties1 =
+          GobblinHelixJobLauncherTest.generateJobProperties(
+              this.baseConfig, "NewJobAndUpdate", workflowIdSuffix1);
+      NewJobConfigArrivalEvent newJobConfigArrivalEvent = createJobConfigArrivalEvent(properties1, workflowIdSuffix2);
+      jobScheduler.handleNewJobConfigArrival(newJobConfigArrivalEvent);
+
+      connectAndAssertWorkflowId(workflowIdSuffix1, newJobConfigArrivalEvent, helixManager);
+
+      properties1.setProperty(GobblinClusterConfigurationKeys.HELIX_JOB_SCHEDULING_THROTTLE_ENABLED_KEY, "true");
+      jobScheduler.handleUpdateJobConfigArrival(
+          new UpdateJobConfigArrivalEvent(properties1.getProperty(ConfigurationKeys.JOB_NAME_KEY), properties1));
+
+      connectAndAssertWorkflowId(workflowIdSuffix2, newJobConfigArrivalEvent, helixManager);
+    }
+  }
+
+  @Test
+  public void testUpdateSameWorkflowShortPeriodThrottle()
+      throws Exception {
+    try (MockedStatic<Instant> mocked = mockStatic(Instant.class, CALLS_REAL_METHODS)) {
+      mocked.when(Instant::now).thenReturn(beginTime, shortPeriod);
+      HelixManager helixManager = HelixManagerFactory
+          .getZKHelixManager(helixClusterName, TestHelper.TEST_HELIX_INSTANCE_NAME, InstanceType.CONTROLLER,
+              zkConnectingString);
+
+      GobblinHelixJobScheduler jobScheduler = createJobScheduler(helixManager);
+
+      final Properties properties1 =
+          GobblinHelixJobLauncherTest.generateJobProperties(
+              this.baseConfig, "UpdateSameWorkflowShortPeriodThrottle", workflowIdSuffix1);
+      NewJobConfigArrivalEvent newJobConfigArrivalEvent = createJobConfigArrivalEvent(properties1, workflowIdSuffix2);
+      jobScheduler.handleNewJobConfigArrival(newJobConfigArrivalEvent);
+      connectAndAssertWorkflowId(workflowIdSuffix1, newJobConfigArrivalEvent, helixManager);
+
+      properties1.setProperty(GobblinClusterConfigurationKeys.HELIX_JOB_SCHEDULING_THROTTLE_ENABLED_KEY, "true");
+      jobScheduler.handleUpdateJobConfigArrival(
+          new UpdateJobConfigArrivalEvent(properties1.getProperty(ConfigurationKeys.JOB_NAME_KEY), properties1));
+
+      connectAndAssertWorkflowId(workflowIdSuffix1, newJobConfigArrivalEvent, helixManager);
+    }
+  }
+
+  @Test
+  public void testUpdateSameWorkflowLongPeriodNoThrottle()
+      throws Exception {
+    try (MockedStatic<Instant> mocked = mockStatic(Instant.class, CALLS_REAL_METHODS)) {
+      mocked.when(Instant::now).thenReturn(beginTime, longPeriod);
+      HelixManager helixManager = HelixManagerFactory
+          .getZKHelixManager(helixClusterName, TestHelper.TEST_HELIX_INSTANCE_NAME, InstanceType.CONTROLLER,
+              zkConnectingString);
+      GobblinHelixJobScheduler jobScheduler = createJobScheduler(helixManager);
+      final Properties properties1 =
+          GobblinHelixJobLauncherTest.generateJobProperties(
+              this.baseConfig, "UpdateSameWorkflowLongPeriodNoThrottle", workflowIdSuffix1);
+      NewJobConfigArrivalEvent newJobConfigArrivalEvent = createJobConfigArrivalEvent(properties1, workflowIdSuffix2);
+      jobScheduler.handleNewJobConfigArrival(newJobConfigArrivalEvent);
+      connectAndAssertWorkflowId(workflowIdSuffix1, newJobConfigArrivalEvent, helixManager);
+
+      properties1.setProperty(GobblinClusterConfigurationKeys.HELIX_JOB_SCHEDULING_THROTTLE_ENABLED_KEY, "false");
+      jobScheduler.handleUpdateJobConfigArrival(
+          new UpdateJobConfigArrivalEvent(properties1.getProperty(ConfigurationKeys.JOB_NAME_KEY), properties1));
+      connectAndAssertWorkflowId(workflowIdSuffix2, newJobConfigArrivalEvent, helixManager);
+    }
+  }
+
+  @Test
+  public void testUpdateSameWorkflowShortPeriodNoThrottle()
+      throws Exception {
+    try (MockedStatic<Instant> mocked = mockStatic(Instant.class, CALLS_REAL_METHODS)) {
+      mocked.when(Instant::now).thenReturn(beginTime, shortPeriod);
+      HelixManager helixManager = HelixManagerFactory
+          .getZKHelixManager(helixClusterName, TestHelper.TEST_HELIX_INSTANCE_NAME, InstanceType.CONTROLLER,
+              zkConnectingString);
+      GobblinHelixJobScheduler jobScheduler = createJobScheduler(helixManager);
+      final Properties properties1 =
+          GobblinHelixJobLauncherTest.generateJobProperties(
+              this.baseConfig, "UpdateSameWorkflowShortPeriodNoThrottle", workflowIdSuffix1);
+      NewJobConfigArrivalEvent newJobConfigArrivalEvent = createJobConfigArrivalEvent(properties1, workflowIdSuffix2);
+      jobScheduler.handleNewJobConfigArrival(newJobConfigArrivalEvent);
+      connectAndAssertWorkflowId(workflowIdSuffix1, newJobConfigArrivalEvent, helixManager);
+
+      properties1.setProperty(GobblinClusterConfigurationKeys.HELIX_JOB_SCHEDULING_THROTTLE_ENABLED_KEY, "false");
+      jobScheduler.handleUpdateJobConfigArrival(
+          new UpdateJobConfigArrivalEvent(properties1.getProperty(ConfigurationKeys.JOB_NAME_KEY), properties1));
+      connectAndAssertWorkflowId(workflowIdSuffix2, newJobConfigArrivalEvent, helixManager);
+    }
+  }
+
+  public void testUpdateDiffWorkflowShortPeriodThrottle()
+      throws Exception {
+    try (MockedStatic<Instant> mocked = mockStatic(Instant.class, CALLS_REAL_METHODS)) {
+      mocked.when(Instant::now).thenReturn(beginTime, shortPeriod);
+      HelixManager helixManager = HelixManagerFactory
+          .getZKHelixManager(helixClusterName, TestHelper.TEST_HELIX_INSTANCE_NAME, InstanceType.CONTROLLER,
+              zkConnectingString);
+      GobblinHelixJobScheduler jobScheduler = createJobScheduler(helixManager);
+      final Properties properties1 =
+          GobblinHelixJobLauncherTest.generateJobProperties(
+              this.baseConfig, "UpdateDiffWorkflowShortPeriodThrottle1", workflowIdSuffix1);
+      NewJobConfigArrivalEvent newJobConfigArrivalEvent = createJobConfigArrivalEvent(properties1, workflowIdSuffix2);
+      jobScheduler.handleNewJobConfigArrival(newJobConfigArrivalEvent);
+      connectAndAssertWorkflowId(workflowIdSuffix1, newJobConfigArrivalEvent, helixManager);
+
+      final Properties properties2 =
+          GobblinHelixJobLauncherTest.generateJobProperties(
+              this.baseConfig, "UpdateDiffWorkflowShortPeriodThrottle2", workflowIdSuffix3);
+      NewJobConfigArrivalEvent newJobConfigArrivalEvent2 =
+          new NewJobConfigArrivalEvent(properties2.getProperty(ConfigurationKeys.JOB_NAME_KEY), properties2);
+      properties2.setProperty(GobblinClusterConfigurationKeys.HELIX_JOB_SCHEDULING_THROTTLE_ENABLED_KEY, "true");
+      jobScheduler.handleUpdateJobConfigArrival(
+          new UpdateJobConfigArrivalEvent(properties2.getProperty(ConfigurationKeys.JOB_NAME_KEY), properties2));
+      connectAndAssertWorkflowId(workflowIdSuffix3, newJobConfigArrivalEvent2, helixManager);
+    }
+  }
+
+  @Test
+  public void testUpdateDiffWorkflowShortPeriodNoThrottle()
+      throws Exception {
+    try (MockedStatic<Instant> mocked = mockStatic(Instant.class, CALLS_REAL_METHODS)) {
+      mocked.when(Instant::now).thenReturn(beginTime, shortPeriod);
+      HelixManager helixManager = HelixManagerFactory
+          .getZKHelixManager(helixClusterName, TestHelper.TEST_HELIX_INSTANCE_NAME, InstanceType.CONTROLLER,
+              zkConnectingString);
+      GobblinHelixJobScheduler jobScheduler = createJobScheduler(helixManager);
+      final Properties properties1 =
+          GobblinHelixJobLauncherTest.generateJobProperties(
+              this.baseConfig, "UpdateDiffWorkflowShortPeriodNoThrottle1", workflowIdSuffix1);
+      NewJobConfigArrivalEvent newJobConfigArrivalEvent = createJobConfigArrivalEvent(properties1, workflowIdSuffix2);
+      jobScheduler.handleNewJobConfigArrival(newJobConfigArrivalEvent);
+      connectAndAssertWorkflowId(workflowIdSuffix1, newJobConfigArrivalEvent, helixManager);
+
+      final Properties properties2 =
+          GobblinHelixJobLauncherTest.generateJobProperties(
+              this.baseConfig, "UpdateDiffWorkflowShortPeriodNoThrottle2", workflowIdSuffix3);
+      NewJobConfigArrivalEvent newJobConfigArrivalEvent2 =
+          new NewJobConfigArrivalEvent(properties2.getProperty(ConfigurationKeys.JOB_NAME_KEY), properties2);
+      properties2.setProperty(GobblinClusterConfigurationKeys.HELIX_JOB_SCHEDULING_THROTTLE_ENABLED_KEY, "false");
+      jobScheduler.handleUpdateJobConfigArrival(
+          new UpdateJobConfigArrivalEvent(properties2.getProperty(ConfigurationKeys.JOB_NAME_KEY), properties2));
+      connectAndAssertWorkflowId(workflowIdSuffix3, newJobConfigArrivalEvent2, helixManager);
+    }
+  }
+
+  @Test
+  public void testUpdateDiffWorkflowLongPeriodThrottle()
+      throws Exception {
+    try (MockedStatic<Instant> mocked = mockStatic(Instant.class, CALLS_REAL_METHODS)) {
+      mocked.when(Instant::now).thenReturn(beginTime, longPeriod);
+      HelixManager helixManager = HelixManagerFactory
+          .getZKHelixManager(helixClusterName, TestHelper.TEST_HELIX_INSTANCE_NAME, InstanceType.CONTROLLER,
+              zkConnectingString);
+      GobblinHelixJobScheduler jobScheduler = createJobScheduler(helixManager);
+      final Properties properties1 =
+          GobblinHelixJobLauncherTest.generateJobProperties(
+              this.baseConfig, "UpdateDiffWorkflowLongPeriodThrottle1", workflowIdSuffix1);
+      NewJobConfigArrivalEvent newJobConfigArrivalEvent = createJobConfigArrivalEvent(properties1, workflowIdSuffix2);
+      jobScheduler.handleNewJobConfigArrival(newJobConfigArrivalEvent);
+      connectAndAssertWorkflowId(workflowIdSuffix1, newJobConfigArrivalEvent, helixManager);
+
+      final Properties properties2 =
+          GobblinHelixJobLauncherTest.generateJobProperties(
+              this.baseConfig, "UpdateDiffWorkflowLongPeriodThrottle2", workflowIdSuffix3);
+      NewJobConfigArrivalEvent newJobConfigArrivalEvent2 =
+          new NewJobConfigArrivalEvent(properties2.getProperty(ConfigurationKeys.JOB_NAME_KEY), properties2);
+      properties2.setProperty(GobblinClusterConfigurationKeys.HELIX_JOB_SCHEDULING_THROTTLE_ENABLED_KEY, "true");
+      jobScheduler.handleUpdateJobConfigArrival(
+          new UpdateJobConfigArrivalEvent(properties2.getProperty(ConfigurationKeys.JOB_NAME_KEY), properties2));
+      connectAndAssertWorkflowId(workflowIdSuffix3, newJobConfigArrivalEvent2, helixManager);
+    }
+  }
+
+  @Test
+  public void testUpdateDiffWorkflowLongPeriodNoThrottle()
+      throws Exception {
+    try (MockedStatic<Instant> mocked = mockStatic(Instant.class, CALLS_REAL_METHODS)) {
+      mocked.when(Instant::now).thenReturn(beginTime, longPeriod);
+      HelixManager helixManager = HelixManagerFactory
+          .getZKHelixManager(helixClusterName, TestHelper.TEST_HELIX_INSTANCE_NAME, InstanceType.CONTROLLER,
+              zkConnectingString);
+      GobblinHelixJobScheduler jobScheduler = createJobScheduler(helixManager);
+      final Properties properties1 =
+          GobblinHelixJobLauncherTest.generateJobProperties(
+              this.baseConfig, "UpdateDiffWorkflowLongPeriodNoThrottle1", workflowIdSuffix1);
+      NewJobConfigArrivalEvent newJobConfigArrivalEvent = createJobConfigArrivalEvent(properties1, workflowIdSuffix2);
+      jobScheduler.handleNewJobConfigArrival(newJobConfigArrivalEvent);
+      connectAndAssertWorkflowId(workflowIdSuffix1, newJobConfigArrivalEvent, helixManager);
+
+      final Properties properties2 =
+          GobblinHelixJobLauncherTest.generateJobProperties(
+              this.baseConfig, "UpdateDiffWorkflowLongPeriodNoThrottle2", workflowIdSuffix3);
+      NewJobConfigArrivalEvent newJobConfigArrivalEvent2 =
+          new NewJobConfigArrivalEvent(properties2.getProperty(ConfigurationKeys.JOB_NAME_KEY), properties2);
+      properties2.setProperty(GobblinClusterConfigurationKeys.HELIX_JOB_SCHEDULING_THROTTLE_ENABLED_KEY, "false");
+      jobScheduler.handleUpdateJobConfigArrival(
+          new UpdateJobConfigArrivalEvent(properties2.getProperty(ConfigurationKeys.JOB_NAME_KEY), properties2));
+      connectAndAssertWorkflowId(workflowIdSuffix3, newJobConfigArrivalEvent2, helixManager);
+    }
+  }
+
+  private GobblinHelixJobScheduler createJobScheduler(HelixManager helixManager) throws Exception {
+    java.nio.file.Path p = Files.createTempDirectory(GobblinHelixJobScheduler.class.getSimpleName());
     Config config = ConfigFactory.empty().withValue(ConfigurationKeys.JOB_CONFIG_FILE_GENERAL_PATH_KEY,
-        ConfigValueFactory.fromAnyRef("/tmp/" + GobblinHelixJobScheduler.class.getSimpleName()));
+        ConfigValueFactory.fromAnyRef(p.toString()));
     SchedulerService schedulerService = new SchedulerService(new Properties());
     NonObservingFSJobCatalog jobCatalog = new NonObservingFSJobCatalog(config);
     jobCatalog.startAsync();
-    GobblinHelixJobScheduler jobScheduler =
-        new GobblinHelixJobScheduler(ConfigFactory.empty(), this.helixManager, java.util.Optional.empty(),
-            new EventBus(), appWorkDir, Lists.emptyList(), schedulerService, jobCatalog);
-
-    final Properties properties1 =
-        GobblinHelixJobLauncherTest.generateJobProperties(this.baseConfig, "1", workflowIdSuffix1);
-    properties1.setProperty(GobblinClusterConfigurationKeys.CANCEL_RUNNING_JOB_ON_DELETE, "true");
+    return new GobblinHelixJobScheduler(ConfigFactory.empty(), helixManager, java.util.Optional.empty(),
+        new EventBus(), appWorkDir, Lists.emptyList(), schedulerService, jobCatalog);
+  }
 
+  private NewJobConfigArrivalEvent createJobConfigArrivalEvent(Properties properties, String suffix) {
+    properties.setProperty(GobblinClusterConfigurationKeys.CANCEL_RUNNING_JOB_ON_DELETE, "true");
     NewJobConfigArrivalEvent newJobConfigArrivalEvent =
-        new NewJobConfigArrivalEvent(properties1.getProperty(ConfigurationKeys.JOB_NAME_KEY), properties1);
-    jobScheduler.handleNewJobConfigArrival(newJobConfigArrivalEvent);
-    properties1.setProperty(ConfigurationKeys.JOB_ID_KEY,
-        "job_" + properties1.getProperty(ConfigurationKeys.JOB_NAME_KEY) + workflowIdSuffix2);
-    Map<String, String> workflowIdMap;
-    this.helixManager.connect();
+        new NewJobConfigArrivalEvent(properties.getProperty(ConfigurationKeys.JOB_NAME_KEY), properties);
+    properties.setProperty(ConfigurationKeys.JOB_ID_KEY,
+        "job_" + properties.getProperty(ConfigurationKeys.JOB_NAME_KEY) + suffix);
+    return newJobConfigArrivalEvent;
+  }
 
-    String workFlowId = null;
-    long endTime = System.currentTimeMillis() + 30000;
-    while (System.currentTimeMillis() < endTime) {
-      workflowIdMap = HelixUtils.getWorkflowIdsFromJobNames(this.helixManager,
-          Collections.singletonList(newJobConfigArrivalEvent.getJobName()));
-      if (workflowIdMap.containsKey(newJobConfigArrivalEvent.getJobName())) {
-        workFlowId = workflowIdMap.get(newJobConfigArrivalEvent.getJobName());
-        break;
-      }
-      Thread.sleep(100);
-    }
+  private void connectAndAssertWorkflowId(String expectedSuffix, NewJobConfigArrivalEvent newJobConfigArrivalEvent, HelixManager helixManager ) throws Exception {
+    helixManager.connect();
+    String workFlowId = getWorkflowID(newJobConfigArrivalEvent, helixManager);
     Assert.assertNotNull(workFlowId);
-    Assert.assertTrue(workFlowId.endsWith(workflowIdSuffix1));
+    Assert.assertTrue(workFlowId.endsWith(expectedSuffix));
+  }
 
-    jobScheduler.handleUpdateJobConfigArrival(
-        new UpdateJobConfigArrivalEvent(properties1.getProperty(ConfigurationKeys.JOB_NAME_KEY), properties1));
-    this.helixManager.connect();
-    endTime = System.currentTimeMillis() + 30000;
+  private String getWorkflowID (NewJobConfigArrivalEvent newJobConfigArrivalEvent, HelixManager helixManager )
+      throws Exception {
+    long endTime = System.currentTimeMillis() + 30000;

Review Comment:
   Yes, that will be good to explain why this value was chosen!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@gobblin.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [gobblin] umustafi commented on a diff in pull request #3704: [GOBBLIN-1840] Helix Job scheduler should not try to replace running workflow if within configured time

Posted by "umustafi (via GitHub)" <gi...@apache.org>.
umustafi commented on code in PR #3704:
URL: https://github.com/apache/gobblin/pull/3704#discussion_r1230375126


##########
gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobScheduler.java:
##########
@@ -162,6 +167,11 @@ public GobblinHelixJobScheduler(Config sysConfig,
     this.helixWorkflowListingTimeoutMillis = ConfigUtils.getLong(sysConfig, GobblinClusterConfigurationKeys.HELIX_WORKFLOW_LISTING_TIMEOUT_SECONDS,
         GobblinClusterConfigurationKeys.DEFAULT_HELIX_WORKFLOW_LISTING_TIMEOUT_SECONDS) * 1000;
 
+    this.throttleTimeoutDuration = Duration.of(ConfigUtils.getLong(sysConfig, GobblinClusterConfigurationKeys.HELIX_JOB_SCHEDULING_THROTTLE_TIMEOUT_SECONDS_KEY,

Review Comment:
   rename this to specify units as well `throttleTimeoutDurationSecs`



##########
gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinHelixJobSchedulerTest.java:
##########
@@ -132,57 +143,260 @@ public void setUp()
   @Test
   public void testNewJobAndUpdate()
       throws Exception {
+    try (MockedStatic<Instant> mocked = mockStatic(Instant.class, CALLS_REAL_METHODS)) {
+      mocked.when(Instant::now).thenReturn(beginTime, longPeriod);
+      HelixManager helixManager = HelixManagerFactory
+          .getZKHelixManager(helixClusterName, TestHelper.TEST_HELIX_INSTANCE_NAME, InstanceType.CONTROLLER,
+              zkConnectingString);
+
+      GobblinHelixJobScheduler jobScheduler = createJobScheduler(helixManager);
+
+      final Properties properties1 =
+          GobblinHelixJobLauncherTest.generateJobProperties(
+              this.baseConfig, "NewJobAndUpdate", workflowIdSuffix1);
+      NewJobConfigArrivalEvent newJobConfigArrivalEvent = createJobConfigArrivalEvent(properties1, workflowIdSuffix2);
+      jobScheduler.handleNewJobConfigArrival(newJobConfigArrivalEvent);
+
+      connectAndAssertWorkflowId(workflowIdSuffix1, newJobConfigArrivalEvent, helixManager);
+
+      properties1.setProperty(GobblinClusterConfigurationKeys.HELIX_JOB_SCHEDULING_THROTTLE_ENABLED_KEY, "true");
+      jobScheduler.handleUpdateJobConfigArrival(
+          new UpdateJobConfigArrivalEvent(properties1.getProperty(ConfigurationKeys.JOB_NAME_KEY), properties1));
+
+      connectAndAssertWorkflowId(workflowIdSuffix2, newJobConfigArrivalEvent, helixManager);
+    }
+  }
+
+  @Test
+  public void testUpdateSameWorkflowShortPeriodThrottle()
+      throws Exception {
+    try (MockedStatic<Instant> mocked = mockStatic(Instant.class, CALLS_REAL_METHODS)) {
+      mocked.when(Instant::now).thenReturn(beginTime, shortPeriod);
+      HelixManager helixManager = HelixManagerFactory

Review Comment:
   can u reuse `helixManager` across tests?



##########
gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobScheduler.java:
##########
@@ -333,6 +345,20 @@ public void handleNewJobConfigArrival(NewJobConfigArrivalEvent newJobArrival) {
   @Subscribe
   public void handleUpdateJobConfigArrival(UpdateJobConfigArrivalEvent updateJobArrival) {
     LOGGER.info("Received update for job configuration of job " + updateJobArrival.getJobName());
+    String jobName = updateJobArrival.getJobName();
+    boolean throttleEnabled = PropertiesUtils.getPropAsBoolean(updateJobArrival.getJobConfig(),

Review Comment:
   does this default to false if config is not provided? if not provide a default value



##########
gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinHelixJobSchedulerTest.java:
##########
@@ -70,9 +74,16 @@ public class GobblinHelixJobSchedulerTest {
   private GobblinTaskRunner gobblinTaskRunner;
 
   private Thread thread;
-
   private final String workflowIdSuffix1 = "_1504201348471";
   private final String workflowIdSuffix2 = "_1504201348472";
+  private final String workflowIdSuffix3 = "_1504201348473";
+
+  private Instant beginTime = Instant.ofEpochMilli(0);
+  private Instant shortPeriod = Instant.ofEpochMilli(1);
+  private Instant longPeriod = Instant.ofEpochMilli(3600001);

Review Comment:
   perhaps rename more clearly to `withinThrottlePeriod` `exceedsThrottlePeriod`



##########
gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobScheduler.java:
##########
@@ -110,6 +113,8 @@ public class GobblinHelixJobScheduler extends JobScheduler implements StandardMe
 
   private boolean startServicesCompleted;
   private final long helixJobStopTimeoutMillis;
+  private final Duration throttleTimeoutDuration;
+  private ConcurrentHashMap<String, Instant> jobStartTimeMap;

Review Comment:
   `jobUriToStartTimeMap`? better if u can clarify what the string is. Also lets be consistent between START/CREATE time you mention in description. Why do we use Instant rather than Timestamp or Long (milliseconds)? The latter is typically what I see used in our code. 



##########
gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobScheduler.java:
##########
@@ -333,6 +345,20 @@ public void handleNewJobConfigArrival(NewJobConfigArrivalEvent newJobArrival) {
   @Subscribe
   public void handleUpdateJobConfigArrival(UpdateJobConfigArrivalEvent updateJobArrival) {
     LOGGER.info("Received update for job configuration of job " + updateJobArrival.getJobName());
+    String jobName = updateJobArrival.getJobName();
+    boolean throttleEnabled = PropertiesUtils.getPropAsBoolean(updateJobArrival.getJobConfig(),

Review Comment:
   usually booleans are easily identified with `is....` like `isThrottleEnabled` 



##########
gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobScheduler.java:
##########
@@ -333,6 +345,20 @@ public void handleNewJobConfigArrival(NewJobConfigArrivalEvent newJobArrival) {
   @Subscribe
   public void handleUpdateJobConfigArrival(UpdateJobConfigArrivalEvent updateJobArrival) {
     LOGGER.info("Received update for job configuration of job " + updateJobArrival.getJobName());
+    String jobName = updateJobArrival.getJobName();
+    boolean throttleEnabled = PropertiesUtils.getPropAsBoolean(updateJobArrival.getJobConfig(),
+        GobblinClusterConfigurationKeys.HELIX_JOB_SCHEDULING_THROTTLE_ENABLED_KEY,
+        String.valueOf(GobblinClusterConfigurationKeys.DEFAULT_HELIX_JOB_SCHEDULING_THROTTLE_ENABLED_KEY));
+
+    if (throttleEnabled && this.jobStartTimeMap.containsKey(jobName)) {
+      Instant jobStartTime = this.jobStartTimeMap.get(jobName);
+      Duration workflowDuration = Duration.between(jobStartTime, Instant.now());
+      Duration difference = workflowDuration.minus(throttleTimeoutDuration);
+      if (difference.isNegative()) {

Review Comment:
   add comment hear to say we skip recalculation is workflowDuration is < throttleTimeoutDuration. It's also worth adding a log statement here so you can track when replanning is skipped



##########
gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java:
##########
@@ -222,4 +222,11 @@ public class GobblinClusterConfigurationKeys {
   public static final String CONTAINER_ID_KEY = GOBBLIN_HELIX_PREFIX + "containerId";
 
   public static final String GOBBLIN_CLUSTER_SYSTEM_PROPERTY_PREFIX = GOBBLIN_CLUSTER_PREFIX + "sysProps";
+
+  public static final String HELIX_JOB_SCHEDULING_THROTTLE_ENABLED_KEY = "helix.job.scheduling.throttle.enabled";
+  public static final boolean DEFAULT_HELIX_JOB_SCHEDULING_THROTTLE_ENABLED_KEY = false;

Review Comment:
   Not sure the use of word `throttle` captures what this key is actually used for. Phrases that make more sense to me
   - Short circuit replanning
   - Early exit
   - Shortcut evaluation



##########
gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinHelixJobSchedulerTest.java:
##########
@@ -132,57 +143,260 @@ public void setUp()
   @Test
   public void testNewJobAndUpdate()
       throws Exception {
+    try (MockedStatic<Instant> mocked = mockStatic(Instant.class, CALLS_REAL_METHODS)) {
+      mocked.when(Instant::now).thenReturn(beginTime, longPeriod);
+      HelixManager helixManager = HelixManagerFactory
+          .getZKHelixManager(helixClusterName, TestHelper.TEST_HELIX_INSTANCE_NAME, InstanceType.CONTROLLER,
+              zkConnectingString);
+
+      GobblinHelixJobScheduler jobScheduler = createJobScheduler(helixManager);
+
+      final Properties properties1 =
+          GobblinHelixJobLauncherTest.generateJobProperties(
+              this.baseConfig, "NewJobAndUpdate", workflowIdSuffix1);
+      NewJobConfigArrivalEvent newJobConfigArrivalEvent = createJobConfigArrivalEvent(properties1, workflowIdSuffix2);
+      jobScheduler.handleNewJobConfigArrival(newJobConfigArrivalEvent);
+
+      connectAndAssertWorkflowId(workflowIdSuffix1, newJobConfigArrivalEvent, helixManager);
+
+      properties1.setProperty(GobblinClusterConfigurationKeys.HELIX_JOB_SCHEDULING_THROTTLE_ENABLED_KEY, "true");
+      jobScheduler.handleUpdateJobConfigArrival(
+          new UpdateJobConfigArrivalEvent(properties1.getProperty(ConfigurationKeys.JOB_NAME_KEY), properties1));
+
+      connectAndAssertWorkflowId(workflowIdSuffix2, newJobConfigArrivalEvent, helixManager);
+    }
+  }
+
+  @Test
+  public void testUpdateSameWorkflowShortPeriodThrottle()
+      throws Exception {
+    try (MockedStatic<Instant> mocked = mockStatic(Instant.class, CALLS_REAL_METHODS)) {
+      mocked.when(Instant::now).thenReturn(beginTime, shortPeriod);
+      HelixManager helixManager = HelixManagerFactory
+          .getZKHelixManager(helixClusterName, TestHelper.TEST_HELIX_INSTANCE_NAME, InstanceType.CONTROLLER,
+              zkConnectingString);
+
+      GobblinHelixJobScheduler jobScheduler = createJobScheduler(helixManager);
+
+      final Properties properties1 =
+          GobblinHelixJobLauncherTest.generateJobProperties(
+              this.baseConfig, "UpdateSameWorkflowShortPeriodThrottle", workflowIdSuffix1);
+      NewJobConfigArrivalEvent newJobConfigArrivalEvent = createJobConfigArrivalEvent(properties1, workflowIdSuffix2);
+      jobScheduler.handleNewJobConfigArrival(newJobConfigArrivalEvent);
+      connectAndAssertWorkflowId(workflowIdSuffix1, newJobConfigArrivalEvent, helixManager);
+
+      properties1.setProperty(GobblinClusterConfigurationKeys.HELIX_JOB_SCHEDULING_THROTTLE_ENABLED_KEY, "true");
+      jobScheduler.handleUpdateJobConfigArrival(
+          new UpdateJobConfigArrivalEvent(properties1.getProperty(ConfigurationKeys.JOB_NAME_KEY), properties1));
+
+      connectAndAssertWorkflowId(workflowIdSuffix1, newJobConfigArrivalEvent, helixManager);
+    }
+  }
+
+  @Test
+  public void testUpdateSameWorkflowLongPeriodNoThrottle()
+      throws Exception {
+    try (MockedStatic<Instant> mocked = mockStatic(Instant.class, CALLS_REAL_METHODS)) {
+      mocked.when(Instant::now).thenReturn(beginTime, longPeriod);
+      HelixManager helixManager = HelixManagerFactory
+          .getZKHelixManager(helixClusterName, TestHelper.TEST_HELIX_INSTANCE_NAME, InstanceType.CONTROLLER,
+              zkConnectingString);
+      GobblinHelixJobScheduler jobScheduler = createJobScheduler(helixManager);
+      final Properties properties1 =
+          GobblinHelixJobLauncherTest.generateJobProperties(
+              this.baseConfig, "UpdateSameWorkflowLongPeriodNoThrottle", workflowIdSuffix1);
+      NewJobConfigArrivalEvent newJobConfigArrivalEvent = createJobConfigArrivalEvent(properties1, workflowIdSuffix2);
+      jobScheduler.handleNewJobConfigArrival(newJobConfigArrivalEvent);
+      connectAndAssertWorkflowId(workflowIdSuffix1, newJobConfigArrivalEvent, helixManager);
+
+      properties1.setProperty(GobblinClusterConfigurationKeys.HELIX_JOB_SCHEDULING_THROTTLE_ENABLED_KEY, "false");
+      jobScheduler.handleUpdateJobConfigArrival(
+          new UpdateJobConfigArrivalEvent(properties1.getProperty(ConfigurationKeys.JOB_NAME_KEY), properties1));
+      connectAndAssertWorkflowId(workflowIdSuffix2, newJobConfigArrivalEvent, helixManager);
+    }
+  }
+
+  @Test
+  public void testUpdateSameWorkflowShortPeriodNoThrottle()

Review Comment:
   tests should be easy to follow ideally with short comment above test to describe the case, the name is quite helpful but an additional comment above or within the code itself will help. I know that short period + no throttle means -> do not skip but you want to comment this behavior somewhere. 



##########
gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinHelixJobSchedulerTest.java:
##########
@@ -132,57 +143,260 @@ public void setUp()
   @Test
   public void testNewJobAndUpdate()
       throws Exception {
+    try (MockedStatic<Instant> mocked = mockStatic(Instant.class, CALLS_REAL_METHODS)) {
+      mocked.when(Instant::now).thenReturn(beginTime, longPeriod);
+      HelixManager helixManager = HelixManagerFactory
+          .getZKHelixManager(helixClusterName, TestHelper.TEST_HELIX_INSTANCE_NAME, InstanceType.CONTROLLER,
+              zkConnectingString);
+
+      GobblinHelixJobScheduler jobScheduler = createJobScheduler(helixManager);
+
+      final Properties properties1 =
+          GobblinHelixJobLauncherTest.generateJobProperties(
+              this.baseConfig, "NewJobAndUpdate", workflowIdSuffix1);
+      NewJobConfigArrivalEvent newJobConfigArrivalEvent = createJobConfigArrivalEvent(properties1, workflowIdSuffix2);
+      jobScheduler.handleNewJobConfigArrival(newJobConfigArrivalEvent);
+
+      connectAndAssertWorkflowId(workflowIdSuffix1, newJobConfigArrivalEvent, helixManager);
+
+      properties1.setProperty(GobblinClusterConfigurationKeys.HELIX_JOB_SCHEDULING_THROTTLE_ENABLED_KEY, "true");
+      jobScheduler.handleUpdateJobConfigArrival(
+          new UpdateJobConfigArrivalEvent(properties1.getProperty(ConfigurationKeys.JOB_NAME_KEY), properties1));
+
+      connectAndAssertWorkflowId(workflowIdSuffix2, newJobConfigArrivalEvent, helixManager);
+    }
+  }
+
+  @Test
+  public void testUpdateSameWorkflowShortPeriodThrottle()
+      throws Exception {
+    try (MockedStatic<Instant> mocked = mockStatic(Instant.class, CALLS_REAL_METHODS)) {
+      mocked.when(Instant::now).thenReturn(beginTime, shortPeriod);
+      HelixManager helixManager = HelixManagerFactory
+          .getZKHelixManager(helixClusterName, TestHelper.TEST_HELIX_INSTANCE_NAME, InstanceType.CONTROLLER,
+              zkConnectingString);
+
+      GobblinHelixJobScheduler jobScheduler = createJobScheduler(helixManager);
+
+      final Properties properties1 =
+          GobblinHelixJobLauncherTest.generateJobProperties(
+              this.baseConfig, "UpdateSameWorkflowShortPeriodThrottle", workflowIdSuffix1);
+      NewJobConfigArrivalEvent newJobConfigArrivalEvent = createJobConfigArrivalEvent(properties1, workflowIdSuffix2);
+      jobScheduler.handleNewJobConfigArrival(newJobConfigArrivalEvent);
+      connectAndAssertWorkflowId(workflowIdSuffix1, newJobConfigArrivalEvent, helixManager);
+
+      properties1.setProperty(GobblinClusterConfigurationKeys.HELIX_JOB_SCHEDULING_THROTTLE_ENABLED_KEY, "true");

Review Comment:
   if there's no `properties2`, then just name this `properties`



##########
gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinHelixJobSchedulerTest.java:
##########
@@ -132,57 +143,260 @@ public void setUp()
   @Test
   public void testNewJobAndUpdate()
       throws Exception {
+    try (MockedStatic<Instant> mocked = mockStatic(Instant.class, CALLS_REAL_METHODS)) {
+      mocked.when(Instant::now).thenReturn(beginTime, longPeriod);
+      HelixManager helixManager = HelixManagerFactory
+          .getZKHelixManager(helixClusterName, TestHelper.TEST_HELIX_INSTANCE_NAME, InstanceType.CONTROLLER,
+              zkConnectingString);
+
+      GobblinHelixJobScheduler jobScheduler = createJobScheduler(helixManager);
+
+      final Properties properties1 =
+          GobblinHelixJobLauncherTest.generateJobProperties(
+              this.baseConfig, "NewJobAndUpdate", workflowIdSuffix1);
+      NewJobConfigArrivalEvent newJobConfigArrivalEvent = createJobConfigArrivalEvent(properties1, workflowIdSuffix2);
+      jobScheduler.handleNewJobConfigArrival(newJobConfigArrivalEvent);
+
+      connectAndAssertWorkflowId(workflowIdSuffix1, newJobConfigArrivalEvent, helixManager);
+
+      properties1.setProperty(GobblinClusterConfigurationKeys.HELIX_JOB_SCHEDULING_THROTTLE_ENABLED_KEY, "true");
+      jobScheduler.handleUpdateJobConfigArrival(
+          new UpdateJobConfigArrivalEvent(properties1.getProperty(ConfigurationKeys.JOB_NAME_KEY), properties1));
+
+      connectAndAssertWorkflowId(workflowIdSuffix2, newJobConfigArrivalEvent, helixManager);
+    }
+  }
+
+  @Test
+  public void testUpdateSameWorkflowShortPeriodThrottle()
+      throws Exception {
+    try (MockedStatic<Instant> mocked = mockStatic(Instant.class, CALLS_REAL_METHODS)) {
+      mocked.when(Instant::now).thenReturn(beginTime, shortPeriod);
+      HelixManager helixManager = HelixManagerFactory
+          .getZKHelixManager(helixClusterName, TestHelper.TEST_HELIX_INSTANCE_NAME, InstanceType.CONTROLLER,
+              zkConnectingString);
+
+      GobblinHelixJobScheduler jobScheduler = createJobScheduler(helixManager);
+
+      final Properties properties1 =
+          GobblinHelixJobLauncherTest.generateJobProperties(
+              this.baseConfig, "UpdateSameWorkflowShortPeriodThrottle", workflowIdSuffix1);
+      NewJobConfigArrivalEvent newJobConfigArrivalEvent = createJobConfigArrivalEvent(properties1, workflowIdSuffix2);
+      jobScheduler.handleNewJobConfigArrival(newJobConfigArrivalEvent);
+      connectAndAssertWorkflowId(workflowIdSuffix1, newJobConfigArrivalEvent, helixManager);
+
+      properties1.setProperty(GobblinClusterConfigurationKeys.HELIX_JOB_SCHEDULING_THROTTLE_ENABLED_KEY, "true");
+      jobScheduler.handleUpdateJobConfigArrival(
+          new UpdateJobConfigArrivalEvent(properties1.getProperty(ConfigurationKeys.JOB_NAME_KEY), properties1));
+
+      connectAndAssertWorkflowId(workflowIdSuffix1, newJobConfigArrivalEvent, helixManager);
+    }
+  }
+
+  @Test
+  public void testUpdateSameWorkflowLongPeriodNoThrottle()
+      throws Exception {
+    try (MockedStatic<Instant> mocked = mockStatic(Instant.class, CALLS_REAL_METHODS)) {
+      mocked.when(Instant::now).thenReturn(beginTime, longPeriod);
+      HelixManager helixManager = HelixManagerFactory
+          .getZKHelixManager(helixClusterName, TestHelper.TEST_HELIX_INSTANCE_NAME, InstanceType.CONTROLLER,
+              zkConnectingString);
+      GobblinHelixJobScheduler jobScheduler = createJobScheduler(helixManager);
+      final Properties properties1 =
+          GobblinHelixJobLauncherTest.generateJobProperties(
+              this.baseConfig, "UpdateSameWorkflowLongPeriodNoThrottle", workflowIdSuffix1);
+      NewJobConfigArrivalEvent newJobConfigArrivalEvent = createJobConfigArrivalEvent(properties1, workflowIdSuffix2);
+      jobScheduler.handleNewJobConfigArrival(newJobConfigArrivalEvent);
+      connectAndAssertWorkflowId(workflowIdSuffix1, newJobConfigArrivalEvent, helixManager);
+
+      properties1.setProperty(GobblinClusterConfigurationKeys.HELIX_JOB_SCHEDULING_THROTTLE_ENABLED_KEY, "false");
+      jobScheduler.handleUpdateJobConfigArrival(
+          new UpdateJobConfigArrivalEvent(properties1.getProperty(ConfigurationKeys.JOB_NAME_KEY), properties1));
+      connectAndAssertWorkflowId(workflowIdSuffix2, newJobConfigArrivalEvent, helixManager);
+    }
+  }
+
+  @Test
+  public void testUpdateSameWorkflowShortPeriodNoThrottle()
+      throws Exception {
+    try (MockedStatic<Instant> mocked = mockStatic(Instant.class, CALLS_REAL_METHODS)) {
+      mocked.when(Instant::now).thenReturn(beginTime, shortPeriod);
+      HelixManager helixManager = HelixManagerFactory
+          .getZKHelixManager(helixClusterName, TestHelper.TEST_HELIX_INSTANCE_NAME, InstanceType.CONTROLLER,
+              zkConnectingString);
+      GobblinHelixJobScheduler jobScheduler = createJobScheduler(helixManager);
+      final Properties properties1 =
+          GobblinHelixJobLauncherTest.generateJobProperties(
+              this.baseConfig, "UpdateSameWorkflowShortPeriodNoThrottle", workflowIdSuffix1);
+      NewJobConfigArrivalEvent newJobConfigArrivalEvent = createJobConfigArrivalEvent(properties1, workflowIdSuffix2);
+      jobScheduler.handleNewJobConfigArrival(newJobConfigArrivalEvent);
+      connectAndAssertWorkflowId(workflowIdSuffix1, newJobConfigArrivalEvent, helixManager);
+
+      properties1.setProperty(GobblinClusterConfigurationKeys.HELIX_JOB_SCHEDULING_THROTTLE_ENABLED_KEY, "false");
+      jobScheduler.handleUpdateJobConfigArrival(
+          new UpdateJobConfigArrivalEvent(properties1.getProperty(ConfigurationKeys.JOB_NAME_KEY), properties1));
+      connectAndAssertWorkflowId(workflowIdSuffix2, newJobConfigArrivalEvent, helixManager);
+    }
+  }
+
+  public void testUpdateDiffWorkflowShortPeriodThrottle()
+      throws Exception {
+    try (MockedStatic<Instant> mocked = mockStatic(Instant.class, CALLS_REAL_METHODS)) {
+      mocked.when(Instant::now).thenReturn(beginTime, shortPeriod);
+      HelixManager helixManager = HelixManagerFactory
+          .getZKHelixManager(helixClusterName, TestHelper.TEST_HELIX_INSTANCE_NAME, InstanceType.CONTROLLER,
+              zkConnectingString);
+      GobblinHelixJobScheduler jobScheduler = createJobScheduler(helixManager);
+      final Properties properties1 =
+          GobblinHelixJobLauncherTest.generateJobProperties(
+              this.baseConfig, "UpdateDiffWorkflowShortPeriodThrottle1", workflowIdSuffix1);
+      NewJobConfigArrivalEvent newJobConfigArrivalEvent = createJobConfigArrivalEvent(properties1, workflowIdSuffix2);
+      jobScheduler.handleNewJobConfigArrival(newJobConfigArrivalEvent);
+      connectAndAssertWorkflowId(workflowIdSuffix1, newJobConfigArrivalEvent, helixManager);
+
+      final Properties properties2 =
+          GobblinHelixJobLauncherTest.generateJobProperties(
+              this.baseConfig, "UpdateDiffWorkflowShortPeriodThrottle2", workflowIdSuffix3);
+      NewJobConfigArrivalEvent newJobConfigArrivalEvent2 =
+          new NewJobConfigArrivalEvent(properties2.getProperty(ConfigurationKeys.JOB_NAME_KEY), properties2);
+      properties2.setProperty(GobblinClusterConfigurationKeys.HELIX_JOB_SCHEDULING_THROTTLE_ENABLED_KEY, "true");
+      jobScheduler.handleUpdateJobConfigArrival(
+          new UpdateJobConfigArrivalEvent(properties2.getProperty(ConfigurationKeys.JOB_NAME_KEY), properties2));
+      connectAndAssertWorkflowId(workflowIdSuffix3, newJobConfigArrivalEvent2, helixManager);
+    }
+  }
+
+  @Test
+  public void testUpdateDiffWorkflowShortPeriodNoThrottle()
+      throws Exception {
+    try (MockedStatic<Instant> mocked = mockStatic(Instant.class, CALLS_REAL_METHODS)) {
+      mocked.when(Instant::now).thenReturn(beginTime, shortPeriod);
+      HelixManager helixManager = HelixManagerFactory
+          .getZKHelixManager(helixClusterName, TestHelper.TEST_HELIX_INSTANCE_NAME, InstanceType.CONTROLLER,
+              zkConnectingString);
+      GobblinHelixJobScheduler jobScheduler = createJobScheduler(helixManager);
+      final Properties properties1 =
+          GobblinHelixJobLauncherTest.generateJobProperties(
+              this.baseConfig, "UpdateDiffWorkflowShortPeriodNoThrottle1", workflowIdSuffix1);
+      NewJobConfigArrivalEvent newJobConfigArrivalEvent = createJobConfigArrivalEvent(properties1, workflowIdSuffix2);
+      jobScheduler.handleNewJobConfigArrival(newJobConfigArrivalEvent);
+      connectAndAssertWorkflowId(workflowIdSuffix1, newJobConfigArrivalEvent, helixManager);
+
+      final Properties properties2 =
+          GobblinHelixJobLauncherTest.generateJobProperties(
+              this.baseConfig, "UpdateDiffWorkflowShortPeriodNoThrottle2", workflowIdSuffix3);
+      NewJobConfigArrivalEvent newJobConfigArrivalEvent2 =
+          new NewJobConfigArrivalEvent(properties2.getProperty(ConfigurationKeys.JOB_NAME_KEY), properties2);
+      properties2.setProperty(GobblinClusterConfigurationKeys.HELIX_JOB_SCHEDULING_THROTTLE_ENABLED_KEY, "false");
+      jobScheduler.handleUpdateJobConfigArrival(
+          new UpdateJobConfigArrivalEvent(properties2.getProperty(ConfigurationKeys.JOB_NAME_KEY), properties2));
+      connectAndAssertWorkflowId(workflowIdSuffix3, newJobConfigArrivalEvent2, helixManager);
+    }
+  }
+
+  @Test
+  public void testUpdateDiffWorkflowLongPeriodThrottle()
+      throws Exception {
+    try (MockedStatic<Instant> mocked = mockStatic(Instant.class, CALLS_REAL_METHODS)) {
+      mocked.when(Instant::now).thenReturn(beginTime, longPeriod);
+      HelixManager helixManager = HelixManagerFactory
+          .getZKHelixManager(helixClusterName, TestHelper.TEST_HELIX_INSTANCE_NAME, InstanceType.CONTROLLER,
+              zkConnectingString);
+      GobblinHelixJobScheduler jobScheduler = createJobScheduler(helixManager);
+      final Properties properties1 =
+          GobblinHelixJobLauncherTest.generateJobProperties(
+              this.baseConfig, "UpdateDiffWorkflowLongPeriodThrottle1", workflowIdSuffix1);
+      NewJobConfigArrivalEvent newJobConfigArrivalEvent = createJobConfigArrivalEvent(properties1, workflowIdSuffix2);
+      jobScheduler.handleNewJobConfigArrival(newJobConfigArrivalEvent);
+      connectAndAssertWorkflowId(workflowIdSuffix1, newJobConfigArrivalEvent, helixManager);
+
+      final Properties properties2 =
+          GobblinHelixJobLauncherTest.generateJobProperties(
+              this.baseConfig, "UpdateDiffWorkflowLongPeriodThrottle2", workflowIdSuffix3);
+      NewJobConfigArrivalEvent newJobConfigArrivalEvent2 =
+          new NewJobConfigArrivalEvent(properties2.getProperty(ConfigurationKeys.JOB_NAME_KEY), properties2);
+      properties2.setProperty(GobblinClusterConfigurationKeys.HELIX_JOB_SCHEDULING_THROTTLE_ENABLED_KEY, "true");
+      jobScheduler.handleUpdateJobConfigArrival(
+          new UpdateJobConfigArrivalEvent(properties2.getProperty(ConfigurationKeys.JOB_NAME_KEY), properties2));
+      connectAndAssertWorkflowId(workflowIdSuffix3, newJobConfigArrivalEvent2, helixManager);
+    }
+  }
+
+  @Test
+  public void testUpdateDiffWorkflowLongPeriodNoThrottle()
+      throws Exception {
+    try (MockedStatic<Instant> mocked = mockStatic(Instant.class, CALLS_REAL_METHODS)) {
+      mocked.when(Instant::now).thenReturn(beginTime, longPeriod);
+      HelixManager helixManager = HelixManagerFactory
+          .getZKHelixManager(helixClusterName, TestHelper.TEST_HELIX_INSTANCE_NAME, InstanceType.CONTROLLER,
+              zkConnectingString);
+      GobblinHelixJobScheduler jobScheduler = createJobScheduler(helixManager);
+      final Properties properties1 =
+          GobblinHelixJobLauncherTest.generateJobProperties(
+              this.baseConfig, "UpdateDiffWorkflowLongPeriodNoThrottle1", workflowIdSuffix1);
+      NewJobConfigArrivalEvent newJobConfigArrivalEvent = createJobConfigArrivalEvent(properties1, workflowIdSuffix2);
+      jobScheduler.handleNewJobConfigArrival(newJobConfigArrivalEvent);
+      connectAndAssertWorkflowId(workflowIdSuffix1, newJobConfigArrivalEvent, helixManager);
+
+      final Properties properties2 =
+          GobblinHelixJobLauncherTest.generateJobProperties(
+              this.baseConfig, "UpdateDiffWorkflowLongPeriodNoThrottle2", workflowIdSuffix3);
+      NewJobConfigArrivalEvent newJobConfigArrivalEvent2 =
+          new NewJobConfigArrivalEvent(properties2.getProperty(ConfigurationKeys.JOB_NAME_KEY), properties2);
+      properties2.setProperty(GobblinClusterConfigurationKeys.HELIX_JOB_SCHEDULING_THROTTLE_ENABLED_KEY, "false");
+      jobScheduler.handleUpdateJobConfigArrival(
+          new UpdateJobConfigArrivalEvent(properties2.getProperty(ConfigurationKeys.JOB_NAME_KEY), properties2));
+      connectAndAssertWorkflowId(workflowIdSuffix3, newJobConfigArrivalEvent2, helixManager);
+    }
+  }
+
+  private GobblinHelixJobScheduler createJobScheduler(HelixManager helixManager) throws Exception {
+    java.nio.file.Path p = Files.createTempDirectory(GobblinHelixJobScheduler.class.getSimpleName());
     Config config = ConfigFactory.empty().withValue(ConfigurationKeys.JOB_CONFIG_FILE_GENERAL_PATH_KEY,
-        ConfigValueFactory.fromAnyRef("/tmp/" + GobblinHelixJobScheduler.class.getSimpleName()));
+        ConfigValueFactory.fromAnyRef(p.toString()));
     SchedulerService schedulerService = new SchedulerService(new Properties());
     NonObservingFSJobCatalog jobCatalog = new NonObservingFSJobCatalog(config);
     jobCatalog.startAsync();
-    GobblinHelixJobScheduler jobScheduler =
-        new GobblinHelixJobScheduler(ConfigFactory.empty(), this.helixManager, java.util.Optional.empty(),
-            new EventBus(), appWorkDir, Lists.emptyList(), schedulerService, jobCatalog);
-
-    final Properties properties1 =
-        GobblinHelixJobLauncherTest.generateJobProperties(this.baseConfig, "1", workflowIdSuffix1);
-    properties1.setProperty(GobblinClusterConfigurationKeys.CANCEL_RUNNING_JOB_ON_DELETE, "true");
+    return new GobblinHelixJobScheduler(ConfigFactory.empty(), helixManager, java.util.Optional.empty(),
+        new EventBus(), appWorkDir, Lists.emptyList(), schedulerService, jobCatalog);
+  }
 
+  private NewJobConfigArrivalEvent createJobConfigArrivalEvent(Properties properties, String suffix) {
+    properties.setProperty(GobblinClusterConfigurationKeys.CANCEL_RUNNING_JOB_ON_DELETE, "true");
     NewJobConfigArrivalEvent newJobConfigArrivalEvent =
-        new NewJobConfigArrivalEvent(properties1.getProperty(ConfigurationKeys.JOB_NAME_KEY), properties1);
-    jobScheduler.handleNewJobConfigArrival(newJobConfigArrivalEvent);
-    properties1.setProperty(ConfigurationKeys.JOB_ID_KEY,
-        "job_" + properties1.getProperty(ConfigurationKeys.JOB_NAME_KEY) + workflowIdSuffix2);
-    Map<String, String> workflowIdMap;
-    this.helixManager.connect();
+        new NewJobConfigArrivalEvent(properties.getProperty(ConfigurationKeys.JOB_NAME_KEY), properties);
+    properties.setProperty(ConfigurationKeys.JOB_ID_KEY,
+        "job_" + properties.getProperty(ConfigurationKeys.JOB_NAME_KEY) + suffix);
+    return newJobConfigArrivalEvent;
+  }
 
-    String workFlowId = null;
-    long endTime = System.currentTimeMillis() + 30000;
-    while (System.currentTimeMillis() < endTime) {
-      workflowIdMap = HelixUtils.getWorkflowIdsFromJobNames(this.helixManager,
-          Collections.singletonList(newJobConfigArrivalEvent.getJobName()));
-      if (workflowIdMap.containsKey(newJobConfigArrivalEvent.getJobName())) {
-        workFlowId = workflowIdMap.get(newJobConfigArrivalEvent.getJobName());
-        break;
-      }
-      Thread.sleep(100);
-    }
+  private void connectAndAssertWorkflowId(String expectedSuffix, NewJobConfigArrivalEvent newJobConfigArrivalEvent, HelixManager helixManager ) throws Exception {
+    helixManager.connect();
+    String workFlowId = getWorkflowID(newJobConfigArrivalEvent, helixManager);
     Assert.assertNotNull(workFlowId);
-    Assert.assertTrue(workFlowId.endsWith(workflowIdSuffix1));
+    Assert.assertTrue(workFlowId.endsWith(expectedSuffix));
+  }
 
-    jobScheduler.handleUpdateJobConfigArrival(
-        new UpdateJobConfigArrivalEvent(properties1.getProperty(ConfigurationKeys.JOB_NAME_KEY), properties1));
-    this.helixManager.connect();
-    endTime = System.currentTimeMillis() + 30000;
+  private String getWorkflowID (NewJobConfigArrivalEvent newJobConfigArrivalEvent, HelixManager helixManager )
+      throws Exception {
+    long endTime = System.currentTimeMillis() + 30000;

Review Comment:
   why 30000 here, not obvious? add comment to explain what end time is being created



##########
gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinHelixJobSchedulerTest.java:
##########
@@ -132,57 +143,260 @@ public void setUp()
   @Test
   public void testNewJobAndUpdate()
       throws Exception {
+    try (MockedStatic<Instant> mocked = mockStatic(Instant.class, CALLS_REAL_METHODS)) {
+      mocked.when(Instant::now).thenReturn(beginTime, longPeriod);
+      HelixManager helixManager = HelixManagerFactory
+          .getZKHelixManager(helixClusterName, TestHelper.TEST_HELIX_INSTANCE_NAME, InstanceType.CONTROLLER,
+              zkConnectingString);
+
+      GobblinHelixJobScheduler jobScheduler = createJobScheduler(helixManager);
+
+      final Properties properties1 =
+          GobblinHelixJobLauncherTest.generateJobProperties(
+              this.baseConfig, "NewJobAndUpdate", workflowIdSuffix1);
+      NewJobConfigArrivalEvent newJobConfigArrivalEvent = createJobConfigArrivalEvent(properties1, workflowIdSuffix2);
+      jobScheduler.handleNewJobConfigArrival(newJobConfigArrivalEvent);
+
+      connectAndAssertWorkflowId(workflowIdSuffix1, newJobConfigArrivalEvent, helixManager);
+
+      properties1.setProperty(GobblinClusterConfigurationKeys.HELIX_JOB_SCHEDULING_THROTTLE_ENABLED_KEY, "true");
+      jobScheduler.handleUpdateJobConfigArrival(
+          new UpdateJobConfigArrivalEvent(properties1.getProperty(ConfigurationKeys.JOB_NAME_KEY), properties1));
+
+      connectAndAssertWorkflowId(workflowIdSuffix2, newJobConfigArrivalEvent, helixManager);
+    }
+  }
+
+  @Test
+  public void testUpdateSameWorkflowShortPeriodThrottle()
+      throws Exception {
+    try (MockedStatic<Instant> mocked = mockStatic(Instant.class, CALLS_REAL_METHODS)) {
+      mocked.when(Instant::now).thenReturn(beginTime, shortPeriod);
+      HelixManager helixManager = HelixManagerFactory
+          .getZKHelixManager(helixClusterName, TestHelper.TEST_HELIX_INSTANCE_NAME, InstanceType.CONTROLLER,
+              zkConnectingString);
+
+      GobblinHelixJobScheduler jobScheduler = createJobScheduler(helixManager);
+
+      final Properties properties1 =
+          GobblinHelixJobLauncherTest.generateJobProperties(
+              this.baseConfig, "UpdateSameWorkflowShortPeriodThrottle", workflowIdSuffix1);
+      NewJobConfigArrivalEvent newJobConfigArrivalEvent = createJobConfigArrivalEvent(properties1, workflowIdSuffix2);
+      jobScheduler.handleNewJobConfigArrival(newJobConfigArrivalEvent);
+      connectAndAssertWorkflowId(workflowIdSuffix1, newJobConfigArrivalEvent, helixManager);
+
+      properties1.setProperty(GobblinClusterConfigurationKeys.HELIX_JOB_SCHEDULING_THROTTLE_ENABLED_KEY, "true");
+      jobScheduler.handleUpdateJobConfigArrival(
+          new UpdateJobConfigArrivalEvent(properties1.getProperty(ConfigurationKeys.JOB_NAME_KEY), properties1));
+
+      connectAndAssertWorkflowId(workflowIdSuffix1, newJobConfigArrivalEvent, helixManager);
+    }
+  }
+
+  @Test
+  public void testUpdateSameWorkflowLongPeriodNoThrottle()
+      throws Exception {
+    try (MockedStatic<Instant> mocked = mockStatic(Instant.class, CALLS_REAL_METHODS)) {
+      mocked.when(Instant::now).thenReturn(beginTime, longPeriod);
+      HelixManager helixManager = HelixManagerFactory
+          .getZKHelixManager(helixClusterName, TestHelper.TEST_HELIX_INSTANCE_NAME, InstanceType.CONTROLLER,
+              zkConnectingString);
+      GobblinHelixJobScheduler jobScheduler = createJobScheduler(helixManager);
+      final Properties properties1 =
+          GobblinHelixJobLauncherTest.generateJobProperties(
+              this.baseConfig, "UpdateSameWorkflowLongPeriodNoThrottle", workflowIdSuffix1);
+      NewJobConfigArrivalEvent newJobConfigArrivalEvent = createJobConfigArrivalEvent(properties1, workflowIdSuffix2);
+      jobScheduler.handleNewJobConfigArrival(newJobConfigArrivalEvent);
+      connectAndAssertWorkflowId(workflowIdSuffix1, newJobConfigArrivalEvent, helixManager);
+
+      properties1.setProperty(GobblinClusterConfigurationKeys.HELIX_JOB_SCHEDULING_THROTTLE_ENABLED_KEY, "false");
+      jobScheduler.handleUpdateJobConfigArrival(
+          new UpdateJobConfigArrivalEvent(properties1.getProperty(ConfigurationKeys.JOB_NAME_KEY), properties1));
+      connectAndAssertWorkflowId(workflowIdSuffix2, newJobConfigArrivalEvent, helixManager);
+    }
+  }
+
+  @Test
+  public void testUpdateSameWorkflowShortPeriodNoThrottle()
+      throws Exception {
+    try (MockedStatic<Instant> mocked = mockStatic(Instant.class, CALLS_REAL_METHODS)) {
+      mocked.when(Instant::now).thenReturn(beginTime, shortPeriod);
+      HelixManager helixManager = HelixManagerFactory
+          .getZKHelixManager(helixClusterName, TestHelper.TEST_HELIX_INSTANCE_NAME, InstanceType.CONTROLLER,
+              zkConnectingString);
+      GobblinHelixJobScheduler jobScheduler = createJobScheduler(helixManager);
+      final Properties properties1 =
+          GobblinHelixJobLauncherTest.generateJobProperties(
+              this.baseConfig, "UpdateSameWorkflowShortPeriodNoThrottle", workflowIdSuffix1);
+      NewJobConfigArrivalEvent newJobConfigArrivalEvent = createJobConfigArrivalEvent(properties1, workflowIdSuffix2);
+      jobScheduler.handleNewJobConfigArrival(newJobConfigArrivalEvent);
+      connectAndAssertWorkflowId(workflowIdSuffix1, newJobConfigArrivalEvent, helixManager);
+
+      properties1.setProperty(GobblinClusterConfigurationKeys.HELIX_JOB_SCHEDULING_THROTTLE_ENABLED_KEY, "false");
+      jobScheduler.handleUpdateJobConfigArrival(
+          new UpdateJobConfigArrivalEvent(properties1.getProperty(ConfigurationKeys.JOB_NAME_KEY), properties1));
+      connectAndAssertWorkflowId(workflowIdSuffix2, newJobConfigArrivalEvent, helixManager);
+    }
+  }
+
+  public void testUpdateDiffWorkflowShortPeriodThrottle()
+      throws Exception {
+    try (MockedStatic<Instant> mocked = mockStatic(Instant.class, CALLS_REAL_METHODS)) {
+      mocked.when(Instant::now).thenReturn(beginTime, shortPeriod);
+      HelixManager helixManager = HelixManagerFactory
+          .getZKHelixManager(helixClusterName, TestHelper.TEST_HELIX_INSTANCE_NAME, InstanceType.CONTROLLER,
+              zkConnectingString);
+      GobblinHelixJobScheduler jobScheduler = createJobScheduler(helixManager);
+      final Properties properties1 =
+          GobblinHelixJobLauncherTest.generateJobProperties(
+              this.baseConfig, "UpdateDiffWorkflowShortPeriodThrottle1", workflowIdSuffix1);
+      NewJobConfigArrivalEvent newJobConfigArrivalEvent = createJobConfigArrivalEvent(properties1, workflowIdSuffix2);
+      jobScheduler.handleNewJobConfigArrival(newJobConfigArrivalEvent);
+      connectAndAssertWorkflowId(workflowIdSuffix1, newJobConfigArrivalEvent, helixManager);
+
+      final Properties properties2 =
+          GobblinHelixJobLauncherTest.generateJobProperties(
+              this.baseConfig, "UpdateDiffWorkflowShortPeriodThrottle2", workflowIdSuffix3);
+      NewJobConfigArrivalEvent newJobConfigArrivalEvent2 =
+          new NewJobConfigArrivalEvent(properties2.getProperty(ConfigurationKeys.JOB_NAME_KEY), properties2);
+      properties2.setProperty(GobblinClusterConfigurationKeys.HELIX_JOB_SCHEDULING_THROTTLE_ENABLED_KEY, "true");
+      jobScheduler.handleUpdateJobConfigArrival(
+          new UpdateJobConfigArrivalEvent(properties2.getProperty(ConfigurationKeys.JOB_NAME_KEY), properties2));
+      connectAndAssertWorkflowId(workflowIdSuffix3, newJobConfigArrivalEvent2, helixManager);
+    }
+  }
+
+  @Test
+  public void testUpdateDiffWorkflowShortPeriodNoThrottle()
+      throws Exception {
+    try (MockedStatic<Instant> mocked = mockStatic(Instant.class, CALLS_REAL_METHODS)) {
+      mocked.when(Instant::now).thenReturn(beginTime, shortPeriod);
+      HelixManager helixManager = HelixManagerFactory
+          .getZKHelixManager(helixClusterName, TestHelper.TEST_HELIX_INSTANCE_NAME, InstanceType.CONTROLLER,
+              zkConnectingString);
+      GobblinHelixJobScheduler jobScheduler = createJobScheduler(helixManager);
+      final Properties properties1 =
+          GobblinHelixJobLauncherTest.generateJobProperties(
+              this.baseConfig, "UpdateDiffWorkflowShortPeriodNoThrottle1", workflowIdSuffix1);
+      NewJobConfigArrivalEvent newJobConfigArrivalEvent = createJobConfigArrivalEvent(properties1, workflowIdSuffix2);
+      jobScheduler.handleNewJobConfigArrival(newJobConfigArrivalEvent);
+      connectAndAssertWorkflowId(workflowIdSuffix1, newJobConfigArrivalEvent, helixManager);
+
+      final Properties properties2 =
+          GobblinHelixJobLauncherTest.generateJobProperties(
+              this.baseConfig, "UpdateDiffWorkflowShortPeriodNoThrottle2", workflowIdSuffix3);
+      NewJobConfigArrivalEvent newJobConfigArrivalEvent2 =
+          new NewJobConfigArrivalEvent(properties2.getProperty(ConfigurationKeys.JOB_NAME_KEY), properties2);
+      properties2.setProperty(GobblinClusterConfigurationKeys.HELIX_JOB_SCHEDULING_THROTTLE_ENABLED_KEY, "false");
+      jobScheduler.handleUpdateJobConfigArrival(
+          new UpdateJobConfigArrivalEvent(properties2.getProperty(ConfigurationKeys.JOB_NAME_KEY), properties2));
+      connectAndAssertWorkflowId(workflowIdSuffix3, newJobConfigArrivalEvent2, helixManager);
+    }
+  }
+
+  @Test
+  public void testUpdateDiffWorkflowLongPeriodThrottle()
+      throws Exception {
+    try (MockedStatic<Instant> mocked = mockStatic(Instant.class, CALLS_REAL_METHODS)) {
+      mocked.when(Instant::now).thenReturn(beginTime, longPeriod);
+      HelixManager helixManager = HelixManagerFactory
+          .getZKHelixManager(helixClusterName, TestHelper.TEST_HELIX_INSTANCE_NAME, InstanceType.CONTROLLER,
+              zkConnectingString);
+      GobblinHelixJobScheduler jobScheduler = createJobScheduler(helixManager);
+      final Properties properties1 =
+          GobblinHelixJobLauncherTest.generateJobProperties(
+              this.baseConfig, "UpdateDiffWorkflowLongPeriodThrottle1", workflowIdSuffix1);
+      NewJobConfigArrivalEvent newJobConfigArrivalEvent = createJobConfigArrivalEvent(properties1, workflowIdSuffix2);
+      jobScheduler.handleNewJobConfigArrival(newJobConfigArrivalEvent);
+      connectAndAssertWorkflowId(workflowIdSuffix1, newJobConfigArrivalEvent, helixManager);
+
+      final Properties properties2 =
+          GobblinHelixJobLauncherTest.generateJobProperties(
+              this.baseConfig, "UpdateDiffWorkflowLongPeriodThrottle2", workflowIdSuffix3);
+      NewJobConfigArrivalEvent newJobConfigArrivalEvent2 =
+          new NewJobConfigArrivalEvent(properties2.getProperty(ConfigurationKeys.JOB_NAME_KEY), properties2);
+      properties2.setProperty(GobblinClusterConfigurationKeys.HELIX_JOB_SCHEDULING_THROTTLE_ENABLED_KEY, "true");
+      jobScheduler.handleUpdateJobConfigArrival(
+          new UpdateJobConfigArrivalEvent(properties2.getProperty(ConfigurationKeys.JOB_NAME_KEY), properties2));
+      connectAndAssertWorkflowId(workflowIdSuffix3, newJobConfigArrivalEvent2, helixManager);
+    }
+  }
+
+  @Test
+  public void testUpdateDiffWorkflowLongPeriodNoThrottle()
+      throws Exception {
+    try (MockedStatic<Instant> mocked = mockStatic(Instant.class, CALLS_REAL_METHODS)) {
+      mocked.when(Instant::now).thenReturn(beginTime, longPeriod);
+      HelixManager helixManager = HelixManagerFactory
+          .getZKHelixManager(helixClusterName, TestHelper.TEST_HELIX_INSTANCE_NAME, InstanceType.CONTROLLER,
+              zkConnectingString);
+      GobblinHelixJobScheduler jobScheduler = createJobScheduler(helixManager);

Review Comment:
   same here u can reuse perhaps, lots of repeated code. Let's try to DRY (do not repeat yourself)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@gobblin.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [gobblin] ZihanLi58 merged pull request #3704: [GOBBLIN-1840] Helix Job scheduler should not try to replace running workflow if within configured time

Posted by "ZihanLi58 (via GitHub)" <gi...@apache.org>.
ZihanLi58 merged PR #3704:
URL: https://github.com/apache/gobblin/pull/3704


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@gobblin.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [gobblin] homatthew commented on a diff in pull request #3704: Py helix scheduler throttle gobblin 1840

Posted by "homatthew (via GitHub)" <gi...@apache.org>.
homatthew commented on code in PR #3704:
URL: https://github.com/apache/gobblin/pull/3704#discussion_r1229996447


##########
gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobScheduler.java:
##########
@@ -110,14 +113,16 @@ public class GobblinHelixJobScheduler extends JobScheduler implements StandardMe
 
   private boolean startServicesCompleted;
   private final long helixJobStopTimeoutMillis;
+  private final Duration throttleTimeoutDuration;
+  private ConcurrentHashMap<String, Instant> jobStartTimeMap;
 
   public GobblinHelixJobScheduler(Config sysConfig,
-                                  HelixManager jobHelixManager,
-                                  Optional<HelixManager> taskDriverHelixManager,
-                                  EventBus eventBus,
-                                  Path appWorkDir, List<? extends Tag<?>> metadataTags,
-                                  SchedulerService schedulerService,
-                                  MutableJobCatalog jobCatalog) throws Exception {
+      HelixManager jobHelixManager,

Review Comment:
   Let's revert random whitespace changes



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@gobblin.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [gobblin] Peiyingy commented on a diff in pull request #3704: [GOBBLIN-1840] Helix Job scheduler should not try to replace running workflow if within configured time

Posted by "Peiyingy (via GitHub)" <gi...@apache.org>.
Peiyingy commented on code in PR #3704:
URL: https://github.com/apache/gobblin/pull/3704#discussion_r1231302778


##########
gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinHelixJobSchedulerTest.java:
##########
@@ -132,57 +143,260 @@ public void setUp()
   @Test
   public void testNewJobAndUpdate()
       throws Exception {
+    try (MockedStatic<Instant> mocked = mockStatic(Instant.class, CALLS_REAL_METHODS)) {
+      mocked.when(Instant::now).thenReturn(beginTime, longPeriod);
+      HelixManager helixManager = HelixManagerFactory
+          .getZKHelixManager(helixClusterName, TestHelper.TEST_HELIX_INSTANCE_NAME, InstanceType.CONTROLLER,
+              zkConnectingString);
+
+      GobblinHelixJobScheduler jobScheduler = createJobScheduler(helixManager);
+
+      final Properties properties1 =
+          GobblinHelixJobLauncherTest.generateJobProperties(
+              this.baseConfig, "NewJobAndUpdate", workflowIdSuffix1);
+      NewJobConfigArrivalEvent newJobConfigArrivalEvent = createJobConfigArrivalEvent(properties1, workflowIdSuffix2);
+      jobScheduler.handleNewJobConfigArrival(newJobConfigArrivalEvent);
+
+      connectAndAssertWorkflowId(workflowIdSuffix1, newJobConfigArrivalEvent, helixManager);
+
+      properties1.setProperty(GobblinClusterConfigurationKeys.HELIX_JOB_SCHEDULING_THROTTLE_ENABLED_KEY, "true");
+      jobScheduler.handleUpdateJobConfigArrival(
+          new UpdateJobConfigArrivalEvent(properties1.getProperty(ConfigurationKeys.JOB_NAME_KEY), properties1));
+
+      connectAndAssertWorkflowId(workflowIdSuffix2, newJobConfigArrivalEvent, helixManager);
+    }
+  }
+
+  @Test
+  public void testUpdateSameWorkflowShortPeriodThrottle()
+      throws Exception {
+    try (MockedStatic<Instant> mocked = mockStatic(Instant.class, CALLS_REAL_METHODS)) {
+      mocked.when(Instant::now).thenReturn(beginTime, shortPeriod);
+      HelixManager helixManager = HelixManagerFactory
+          .getZKHelixManager(helixClusterName, TestHelper.TEST_HELIX_INSTANCE_NAME, InstanceType.CONTROLLER,
+              zkConnectingString);
+
+      GobblinHelixJobScheduler jobScheduler = createJobScheduler(helixManager);
+
+      final Properties properties1 =
+          GobblinHelixJobLauncherTest.generateJobProperties(
+              this.baseConfig, "UpdateSameWorkflowShortPeriodThrottle", workflowIdSuffix1);
+      NewJobConfigArrivalEvent newJobConfigArrivalEvent = createJobConfigArrivalEvent(properties1, workflowIdSuffix2);
+      jobScheduler.handleNewJobConfigArrival(newJobConfigArrivalEvent);
+      connectAndAssertWorkflowId(workflowIdSuffix1, newJobConfigArrivalEvent, helixManager);
+
+      properties1.setProperty(GobblinClusterConfigurationKeys.HELIX_JOB_SCHEDULING_THROTTLE_ENABLED_KEY, "true");
+      jobScheduler.handleUpdateJobConfigArrival(
+          new UpdateJobConfigArrivalEvent(properties1.getProperty(ConfigurationKeys.JOB_NAME_KEY), properties1));
+
+      connectAndAssertWorkflowId(workflowIdSuffix1, newJobConfigArrivalEvent, helixManager);
+    }
+  }
+
+  @Test
+  public void testUpdateSameWorkflowLongPeriodNoThrottle()
+      throws Exception {
+    try (MockedStatic<Instant> mocked = mockStatic(Instant.class, CALLS_REAL_METHODS)) {
+      mocked.when(Instant::now).thenReturn(beginTime, longPeriod);
+      HelixManager helixManager = HelixManagerFactory
+          .getZKHelixManager(helixClusterName, TestHelper.TEST_HELIX_INSTANCE_NAME, InstanceType.CONTROLLER,
+              zkConnectingString);
+      GobblinHelixJobScheduler jobScheduler = createJobScheduler(helixManager);
+      final Properties properties1 =
+          GobblinHelixJobLauncherTest.generateJobProperties(
+              this.baseConfig, "UpdateSameWorkflowLongPeriodNoThrottle", workflowIdSuffix1);
+      NewJobConfigArrivalEvent newJobConfigArrivalEvent = createJobConfigArrivalEvent(properties1, workflowIdSuffix2);
+      jobScheduler.handleNewJobConfigArrival(newJobConfigArrivalEvent);
+      connectAndAssertWorkflowId(workflowIdSuffix1, newJobConfigArrivalEvent, helixManager);
+
+      properties1.setProperty(GobblinClusterConfigurationKeys.HELIX_JOB_SCHEDULING_THROTTLE_ENABLED_KEY, "false");
+      jobScheduler.handleUpdateJobConfigArrival(
+          new UpdateJobConfigArrivalEvent(properties1.getProperty(ConfigurationKeys.JOB_NAME_KEY), properties1));
+      connectAndAssertWorkflowId(workflowIdSuffix2, newJobConfigArrivalEvent, helixManager);
+    }
+  }
+
+  @Test
+  public void testUpdateSameWorkflowShortPeriodNoThrottle()
+      throws Exception {
+    try (MockedStatic<Instant> mocked = mockStatic(Instant.class, CALLS_REAL_METHODS)) {
+      mocked.when(Instant::now).thenReturn(beginTime, shortPeriod);
+      HelixManager helixManager = HelixManagerFactory
+          .getZKHelixManager(helixClusterName, TestHelper.TEST_HELIX_INSTANCE_NAME, InstanceType.CONTROLLER,
+              zkConnectingString);
+      GobblinHelixJobScheduler jobScheduler = createJobScheduler(helixManager);
+      final Properties properties1 =
+          GobblinHelixJobLauncherTest.generateJobProperties(
+              this.baseConfig, "UpdateSameWorkflowShortPeriodNoThrottle", workflowIdSuffix1);
+      NewJobConfigArrivalEvent newJobConfigArrivalEvent = createJobConfigArrivalEvent(properties1, workflowIdSuffix2);
+      jobScheduler.handleNewJobConfigArrival(newJobConfigArrivalEvent);
+      connectAndAssertWorkflowId(workflowIdSuffix1, newJobConfigArrivalEvent, helixManager);
+
+      properties1.setProperty(GobblinClusterConfigurationKeys.HELIX_JOB_SCHEDULING_THROTTLE_ENABLED_KEY, "false");
+      jobScheduler.handleUpdateJobConfigArrival(
+          new UpdateJobConfigArrivalEvent(properties1.getProperty(ConfigurationKeys.JOB_NAME_KEY), properties1));
+      connectAndAssertWorkflowId(workflowIdSuffix2, newJobConfigArrivalEvent, helixManager);
+    }
+  }
+
+  public void testUpdateDiffWorkflowShortPeriodThrottle()
+      throws Exception {
+    try (MockedStatic<Instant> mocked = mockStatic(Instant.class, CALLS_REAL_METHODS)) {
+      mocked.when(Instant::now).thenReturn(beginTime, shortPeriod);
+      HelixManager helixManager = HelixManagerFactory
+          .getZKHelixManager(helixClusterName, TestHelper.TEST_HELIX_INSTANCE_NAME, InstanceType.CONTROLLER,
+              zkConnectingString);
+      GobblinHelixJobScheduler jobScheduler = createJobScheduler(helixManager);
+      final Properties properties1 =
+          GobblinHelixJobLauncherTest.generateJobProperties(
+              this.baseConfig, "UpdateDiffWorkflowShortPeriodThrottle1", workflowIdSuffix1);
+      NewJobConfigArrivalEvent newJobConfigArrivalEvent = createJobConfigArrivalEvent(properties1, workflowIdSuffix2);
+      jobScheduler.handleNewJobConfigArrival(newJobConfigArrivalEvent);
+      connectAndAssertWorkflowId(workflowIdSuffix1, newJobConfigArrivalEvent, helixManager);
+
+      final Properties properties2 =
+          GobblinHelixJobLauncherTest.generateJobProperties(
+              this.baseConfig, "UpdateDiffWorkflowShortPeriodThrottle2", workflowIdSuffix3);
+      NewJobConfigArrivalEvent newJobConfigArrivalEvent2 =
+          new NewJobConfigArrivalEvent(properties2.getProperty(ConfigurationKeys.JOB_NAME_KEY), properties2);
+      properties2.setProperty(GobblinClusterConfigurationKeys.HELIX_JOB_SCHEDULING_THROTTLE_ENABLED_KEY, "true");
+      jobScheduler.handleUpdateJobConfigArrival(
+          new UpdateJobConfigArrivalEvent(properties2.getProperty(ConfigurationKeys.JOB_NAME_KEY), properties2));
+      connectAndAssertWorkflowId(workflowIdSuffix3, newJobConfigArrivalEvent2, helixManager);
+    }
+  }
+
+  @Test
+  public void testUpdateDiffWorkflowShortPeriodNoThrottle()
+      throws Exception {
+    try (MockedStatic<Instant> mocked = mockStatic(Instant.class, CALLS_REAL_METHODS)) {
+      mocked.when(Instant::now).thenReturn(beginTime, shortPeriod);
+      HelixManager helixManager = HelixManagerFactory
+          .getZKHelixManager(helixClusterName, TestHelper.TEST_HELIX_INSTANCE_NAME, InstanceType.CONTROLLER,
+              zkConnectingString);
+      GobblinHelixJobScheduler jobScheduler = createJobScheduler(helixManager);
+      final Properties properties1 =
+          GobblinHelixJobLauncherTest.generateJobProperties(
+              this.baseConfig, "UpdateDiffWorkflowShortPeriodNoThrottle1", workflowIdSuffix1);
+      NewJobConfigArrivalEvent newJobConfigArrivalEvent = createJobConfigArrivalEvent(properties1, workflowIdSuffix2);
+      jobScheduler.handleNewJobConfigArrival(newJobConfigArrivalEvent);
+      connectAndAssertWorkflowId(workflowIdSuffix1, newJobConfigArrivalEvent, helixManager);
+
+      final Properties properties2 =
+          GobblinHelixJobLauncherTest.generateJobProperties(
+              this.baseConfig, "UpdateDiffWorkflowShortPeriodNoThrottle2", workflowIdSuffix3);
+      NewJobConfigArrivalEvent newJobConfigArrivalEvent2 =
+          new NewJobConfigArrivalEvent(properties2.getProperty(ConfigurationKeys.JOB_NAME_KEY), properties2);
+      properties2.setProperty(GobblinClusterConfigurationKeys.HELIX_JOB_SCHEDULING_THROTTLE_ENABLED_KEY, "false");
+      jobScheduler.handleUpdateJobConfigArrival(
+          new UpdateJobConfigArrivalEvent(properties2.getProperty(ConfigurationKeys.JOB_NAME_KEY), properties2));
+      connectAndAssertWorkflowId(workflowIdSuffix3, newJobConfigArrivalEvent2, helixManager);
+    }
+  }
+
+  @Test
+  public void testUpdateDiffWorkflowLongPeriodThrottle()
+      throws Exception {
+    try (MockedStatic<Instant> mocked = mockStatic(Instant.class, CALLS_REAL_METHODS)) {
+      mocked.when(Instant::now).thenReturn(beginTime, longPeriod);
+      HelixManager helixManager = HelixManagerFactory
+          .getZKHelixManager(helixClusterName, TestHelper.TEST_HELIX_INSTANCE_NAME, InstanceType.CONTROLLER,
+              zkConnectingString);
+      GobblinHelixJobScheduler jobScheduler = createJobScheduler(helixManager);
+      final Properties properties1 =
+          GobblinHelixJobLauncherTest.generateJobProperties(
+              this.baseConfig, "UpdateDiffWorkflowLongPeriodThrottle1", workflowIdSuffix1);
+      NewJobConfigArrivalEvent newJobConfigArrivalEvent = createJobConfigArrivalEvent(properties1, workflowIdSuffix2);
+      jobScheduler.handleNewJobConfigArrival(newJobConfigArrivalEvent);
+      connectAndAssertWorkflowId(workflowIdSuffix1, newJobConfigArrivalEvent, helixManager);
+
+      final Properties properties2 =
+          GobblinHelixJobLauncherTest.generateJobProperties(
+              this.baseConfig, "UpdateDiffWorkflowLongPeriodThrottle2", workflowIdSuffix3);
+      NewJobConfigArrivalEvent newJobConfigArrivalEvent2 =
+          new NewJobConfigArrivalEvent(properties2.getProperty(ConfigurationKeys.JOB_NAME_KEY), properties2);
+      properties2.setProperty(GobblinClusterConfigurationKeys.HELIX_JOB_SCHEDULING_THROTTLE_ENABLED_KEY, "true");
+      jobScheduler.handleUpdateJobConfigArrival(
+          new UpdateJobConfigArrivalEvent(properties2.getProperty(ConfigurationKeys.JOB_NAME_KEY), properties2));
+      connectAndAssertWorkflowId(workflowIdSuffix3, newJobConfigArrivalEvent2, helixManager);
+    }
+  }
+
+  @Test
+  public void testUpdateDiffWorkflowLongPeriodNoThrottle()
+      throws Exception {
+    try (MockedStatic<Instant> mocked = mockStatic(Instant.class, CALLS_REAL_METHODS)) {
+      mocked.when(Instant::now).thenReturn(beginTime, longPeriod);
+      HelixManager helixManager = HelixManagerFactory
+          .getZKHelixManager(helixClusterName, TestHelper.TEST_HELIX_INSTANCE_NAME, InstanceType.CONTROLLER,
+              zkConnectingString);
+      GobblinHelixJobScheduler jobScheduler = createJobScheduler(helixManager);
+      final Properties properties1 =
+          GobblinHelixJobLauncherTest.generateJobProperties(
+              this.baseConfig, "UpdateDiffWorkflowLongPeriodNoThrottle1", workflowIdSuffix1);
+      NewJobConfigArrivalEvent newJobConfigArrivalEvent = createJobConfigArrivalEvent(properties1, workflowIdSuffix2);
+      jobScheduler.handleNewJobConfigArrival(newJobConfigArrivalEvent);
+      connectAndAssertWorkflowId(workflowIdSuffix1, newJobConfigArrivalEvent, helixManager);
+
+      final Properties properties2 =
+          GobblinHelixJobLauncherTest.generateJobProperties(
+              this.baseConfig, "UpdateDiffWorkflowLongPeriodNoThrottle2", workflowIdSuffix3);
+      NewJobConfigArrivalEvent newJobConfigArrivalEvent2 =
+          new NewJobConfigArrivalEvent(properties2.getProperty(ConfigurationKeys.JOB_NAME_KEY), properties2);
+      properties2.setProperty(GobblinClusterConfigurationKeys.HELIX_JOB_SCHEDULING_THROTTLE_ENABLED_KEY, "false");
+      jobScheduler.handleUpdateJobConfigArrival(
+          new UpdateJobConfigArrivalEvent(properties2.getProperty(ConfigurationKeys.JOB_NAME_KEY), properties2));
+      connectAndAssertWorkflowId(workflowIdSuffix3, newJobConfigArrivalEvent2, helixManager);
+    }
+  }
+
+  private GobblinHelixJobScheduler createJobScheduler(HelixManager helixManager) throws Exception {
+    java.nio.file.Path p = Files.createTempDirectory(GobblinHelixJobScheduler.class.getSimpleName());
     Config config = ConfigFactory.empty().withValue(ConfigurationKeys.JOB_CONFIG_FILE_GENERAL_PATH_KEY,
-        ConfigValueFactory.fromAnyRef("/tmp/" + GobblinHelixJobScheduler.class.getSimpleName()));
+        ConfigValueFactory.fromAnyRef(p.toString()));
     SchedulerService schedulerService = new SchedulerService(new Properties());
     NonObservingFSJobCatalog jobCatalog = new NonObservingFSJobCatalog(config);
     jobCatalog.startAsync();
-    GobblinHelixJobScheduler jobScheduler =
-        new GobblinHelixJobScheduler(ConfigFactory.empty(), this.helixManager, java.util.Optional.empty(),
-            new EventBus(), appWorkDir, Lists.emptyList(), schedulerService, jobCatalog);
-
-    final Properties properties1 =
-        GobblinHelixJobLauncherTest.generateJobProperties(this.baseConfig, "1", workflowIdSuffix1);
-    properties1.setProperty(GobblinClusterConfigurationKeys.CANCEL_RUNNING_JOB_ON_DELETE, "true");
+    return new GobblinHelixJobScheduler(ConfigFactory.empty(), helixManager, java.util.Optional.empty(),
+        new EventBus(), appWorkDir, Lists.emptyList(), schedulerService, jobCatalog);
+  }
 
+  private NewJobConfigArrivalEvent createJobConfigArrivalEvent(Properties properties, String suffix) {
+    properties.setProperty(GobblinClusterConfigurationKeys.CANCEL_RUNNING_JOB_ON_DELETE, "true");
     NewJobConfigArrivalEvent newJobConfigArrivalEvent =
-        new NewJobConfigArrivalEvent(properties1.getProperty(ConfigurationKeys.JOB_NAME_KEY), properties1);
-    jobScheduler.handleNewJobConfigArrival(newJobConfigArrivalEvent);
-    properties1.setProperty(ConfigurationKeys.JOB_ID_KEY,
-        "job_" + properties1.getProperty(ConfigurationKeys.JOB_NAME_KEY) + workflowIdSuffix2);
-    Map<String, String> workflowIdMap;
-    this.helixManager.connect();
+        new NewJobConfigArrivalEvent(properties.getProperty(ConfigurationKeys.JOB_NAME_KEY), properties);
+    properties.setProperty(ConfigurationKeys.JOB_ID_KEY,
+        "job_" + properties.getProperty(ConfigurationKeys.JOB_NAME_KEY) + suffix);
+    return newJobConfigArrivalEvent;
+  }
 
-    String workFlowId = null;
-    long endTime = System.currentTimeMillis() + 30000;
-    while (System.currentTimeMillis() < endTime) {
-      workflowIdMap = HelixUtils.getWorkflowIdsFromJobNames(this.helixManager,
-          Collections.singletonList(newJobConfigArrivalEvent.getJobName()));
-      if (workflowIdMap.containsKey(newJobConfigArrivalEvent.getJobName())) {
-        workFlowId = workflowIdMap.get(newJobConfigArrivalEvent.getJobName());
-        break;
-      }
-      Thread.sleep(100);
-    }
+  private void connectAndAssertWorkflowId(String expectedSuffix, NewJobConfigArrivalEvent newJobConfigArrivalEvent, HelixManager helixManager ) throws Exception {
+    helixManager.connect();
+    String workFlowId = getWorkflowID(newJobConfigArrivalEvent, helixManager);
     Assert.assertNotNull(workFlowId);
-    Assert.assertTrue(workFlowId.endsWith(workflowIdSuffix1));
+    Assert.assertTrue(workFlowId.endsWith(expectedSuffix));
+  }
 
-    jobScheduler.handleUpdateJobConfigArrival(
-        new UpdateJobConfigArrivalEvent(properties1.getProperty(ConfigurationKeys.JOB_NAME_KEY), properties1));
-    this.helixManager.connect();
-    endTime = System.currentTimeMillis() + 30000;
+  private String getWorkflowID (NewJobConfigArrivalEvent newJobConfigArrivalEvent, HelixManager helixManager )
+      throws Exception {
+    long endTime = System.currentTimeMillis() + 30000;

Review Comment:
   The endTime setting is based on the original testNewJobAndUpdate approach. Maybe I should add a comment here to explain this hardcoded time is the period we allow workflowIdMap to fetch workflowIdMap before timeout?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@gobblin.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [gobblin] Peiyingy commented on a diff in pull request #3704: [GOBBLIN-1840] Helix Job scheduler should not try to replace running workflow if within configured time

Posted by "Peiyingy (via GitHub)" <gi...@apache.org>.
Peiyingy commented on code in PR #3704:
URL: https://github.com/apache/gobblin/pull/3704#discussion_r1231395859


##########
gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobScheduler.java:
##########
@@ -333,6 +345,20 @@ public void handleNewJobConfigArrival(NewJobConfigArrivalEvent newJobArrival) {
   @Subscribe
   public void handleUpdateJobConfigArrival(UpdateJobConfigArrivalEvent updateJobArrival) {
     LOGGER.info("Received update for job configuration of job " + updateJobArrival.getJobName());
+    String jobName = updateJobArrival.getJobName();
+    boolean throttleEnabled = PropertiesUtils.getPropAsBoolean(updateJobArrival.getJobConfig(),

Review Comment:
   In the function of getPropAsBoolean, that is:
   ```
   public static boolean getPropAsBoolean(   @NotNull  Properties properties,
     String key,
     String defaultValue )
   ```
   so it will call `String.valueOf(GobblinClusterConfigurationKeys.DEFAULT_HELIX_JOB_SCHEDULING_THROTTLE_ENABLED_KEY) `as the default value if `GobblinClusterConfigurationKeys.HELIX_JOB_SCHEDULING_THROTTLE_ENABLED_KEY` is not assigned



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@gobblin.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [gobblin] homatthew commented on a diff in pull request #3704: [GOBBLIN-1840] Helix Job scheduler should not try to replace running workflow if within configured time

Posted by "homatthew (via GitHub)" <gi...@apache.org>.
homatthew commented on code in PR #3704:
URL: https://github.com/apache/gobblin/pull/3704#discussion_r1244382872


##########
gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobScheduler.java:
##########
@@ -360,7 +400,7 @@ private void waitForJobCompletion(String jobName) {
   }
 
   @Subscribe
-  public void handleDeleteJobConfigArrival(DeleteJobConfigArrivalEvent deleteJobArrival) throws InterruptedException {
+  public synchronized void handleDeleteJobConfigArrival(DeleteJobConfigArrivalEvent deleteJobArrival) throws InterruptedException {

Review Comment:
   1. Since handleDeleteJobConfigArrival is a completely synchronous method, the synchronized method handleUpdateJobConfigArrival would just hold the lock while deleting and then proceed to handleNewJobConfigArrival. There would be no need to distinguish between the two since @Peiyingy needs to address the race condition described in https://github.com/apache/gobblin/pull/3704/files#r1243111712 by updating the map immediately in the newJobConfigArrival method. 
   
   2. Yeah we don't call explicitly call delete so it's just semantics about which is more intuitive behavior if we ever use this in the future. Since this is purely hypothetical I don't want to waste effort changing the behavior to (1).  I think we should just add a comment describing that deleting a workflow with throttling enabled means that the next schedulable time for the workflow will remain unchanged and you have to wait out the throttle timeout before being able to reschedule



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@gobblin.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [gobblin] homatthew commented on a diff in pull request #3704: [GOBBLIN-1840] Helix Job scheduler should not try to replace running workflow if within configured time

Posted by "homatthew (via GitHub)" <gi...@apache.org>.
homatthew commented on code in PR #3704:
URL: https://github.com/apache/gobblin/pull/3704#discussion_r1244467590


##########
gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobScheduler.java:
##########
@@ -415,6 +418,15 @@ private void waitForJobCompletion(String jobName) {
     }
   }
 
+  /***
+   * Deleting a workflow with throttling enabled means that the next
+   * schedulable time for the workflow will remain unchanged.
+   * Note: In such case, it is required to wait until the throttle
+   * timeout period elapses before the workflow can be rescheduled.

Review Comment:
   Nice comment!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@gobblin.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org