You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@gobblin.apache.org by "umustafi (via GitHub)" <gi...@apache.org> on 2023/06/15 03:34:31 UTC

[GitHub] [gobblin] umustafi commented on a diff in pull request #3704: [GOBBLIN-1840] Helix Job scheduler should not try to replace running workflow if within configured time

umustafi commented on code in PR #3704:
URL: https://github.com/apache/gobblin/pull/3704#discussion_r1230375126


##########
gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobScheduler.java:
##########
@@ -162,6 +167,11 @@ public GobblinHelixJobScheduler(Config sysConfig,
     this.helixWorkflowListingTimeoutMillis = ConfigUtils.getLong(sysConfig, GobblinClusterConfigurationKeys.HELIX_WORKFLOW_LISTING_TIMEOUT_SECONDS,
         GobblinClusterConfigurationKeys.DEFAULT_HELIX_WORKFLOW_LISTING_TIMEOUT_SECONDS) * 1000;
 
+    this.throttleTimeoutDuration = Duration.of(ConfigUtils.getLong(sysConfig, GobblinClusterConfigurationKeys.HELIX_JOB_SCHEDULING_THROTTLE_TIMEOUT_SECONDS_KEY,

Review Comment:
   rename this to specify units as well `throttleTimeoutDurationSecs`



##########
gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinHelixJobSchedulerTest.java:
##########
@@ -132,57 +143,260 @@ public void setUp()
   @Test
   public void testNewJobAndUpdate()
       throws Exception {
+    try (MockedStatic<Instant> mocked = mockStatic(Instant.class, CALLS_REAL_METHODS)) {
+      mocked.when(Instant::now).thenReturn(beginTime, longPeriod);
+      HelixManager helixManager = HelixManagerFactory
+          .getZKHelixManager(helixClusterName, TestHelper.TEST_HELIX_INSTANCE_NAME, InstanceType.CONTROLLER,
+              zkConnectingString);
+
+      GobblinHelixJobScheduler jobScheduler = createJobScheduler(helixManager);
+
+      final Properties properties1 =
+          GobblinHelixJobLauncherTest.generateJobProperties(
+              this.baseConfig, "NewJobAndUpdate", workflowIdSuffix1);
+      NewJobConfigArrivalEvent newJobConfigArrivalEvent = createJobConfigArrivalEvent(properties1, workflowIdSuffix2);
+      jobScheduler.handleNewJobConfigArrival(newJobConfigArrivalEvent);
+
+      connectAndAssertWorkflowId(workflowIdSuffix1, newJobConfigArrivalEvent, helixManager);
+
+      properties1.setProperty(GobblinClusterConfigurationKeys.HELIX_JOB_SCHEDULING_THROTTLE_ENABLED_KEY, "true");
+      jobScheduler.handleUpdateJobConfigArrival(
+          new UpdateJobConfigArrivalEvent(properties1.getProperty(ConfigurationKeys.JOB_NAME_KEY), properties1));
+
+      connectAndAssertWorkflowId(workflowIdSuffix2, newJobConfigArrivalEvent, helixManager);
+    }
+  }
+
+  @Test
+  public void testUpdateSameWorkflowShortPeriodThrottle()
+      throws Exception {
+    try (MockedStatic<Instant> mocked = mockStatic(Instant.class, CALLS_REAL_METHODS)) {
+      mocked.when(Instant::now).thenReturn(beginTime, shortPeriod);
+      HelixManager helixManager = HelixManagerFactory

Review Comment:
   can u reuse `helixManager` across tests?



##########
gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobScheduler.java:
##########
@@ -333,6 +345,20 @@ public void handleNewJobConfigArrival(NewJobConfigArrivalEvent newJobArrival) {
   @Subscribe
   public void handleUpdateJobConfigArrival(UpdateJobConfigArrivalEvent updateJobArrival) {
     LOGGER.info("Received update for job configuration of job " + updateJobArrival.getJobName());
+    String jobName = updateJobArrival.getJobName();
+    boolean throttleEnabled = PropertiesUtils.getPropAsBoolean(updateJobArrival.getJobConfig(),

Review Comment:
   does this default to false if config is not provided? if not provide a default value



##########
gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinHelixJobSchedulerTest.java:
##########
@@ -70,9 +74,16 @@ public class GobblinHelixJobSchedulerTest {
   private GobblinTaskRunner gobblinTaskRunner;
 
   private Thread thread;
-
   private final String workflowIdSuffix1 = "_1504201348471";
   private final String workflowIdSuffix2 = "_1504201348472";
+  private final String workflowIdSuffix3 = "_1504201348473";
+
+  private Instant beginTime = Instant.ofEpochMilli(0);
+  private Instant shortPeriod = Instant.ofEpochMilli(1);
+  private Instant longPeriod = Instant.ofEpochMilli(3600001);

Review Comment:
   perhaps rename more clearly to `withinThrottlePeriod` `exceedsThrottlePeriod`



##########
gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobScheduler.java:
##########
@@ -110,6 +113,8 @@ public class GobblinHelixJobScheduler extends JobScheduler implements StandardMe
 
   private boolean startServicesCompleted;
   private final long helixJobStopTimeoutMillis;
+  private final Duration throttleTimeoutDuration;
+  private ConcurrentHashMap<String, Instant> jobStartTimeMap;

Review Comment:
   `jobUriToStartTimeMap`? better if u can clarify what the string is. Also lets be consistent between START/CREATE time you mention in description. Why do we use Instant rather than Timestamp or Long (milliseconds)? The latter is typically what I see used in our code. 



##########
gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobScheduler.java:
##########
@@ -333,6 +345,20 @@ public void handleNewJobConfigArrival(NewJobConfigArrivalEvent newJobArrival) {
   @Subscribe
   public void handleUpdateJobConfigArrival(UpdateJobConfigArrivalEvent updateJobArrival) {
     LOGGER.info("Received update for job configuration of job " + updateJobArrival.getJobName());
+    String jobName = updateJobArrival.getJobName();
+    boolean throttleEnabled = PropertiesUtils.getPropAsBoolean(updateJobArrival.getJobConfig(),

Review Comment:
   usually booleans are easily identified with `is....` like `isThrottleEnabled` 



##########
gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobScheduler.java:
##########
@@ -333,6 +345,20 @@ public void handleNewJobConfigArrival(NewJobConfigArrivalEvent newJobArrival) {
   @Subscribe
   public void handleUpdateJobConfigArrival(UpdateJobConfigArrivalEvent updateJobArrival) {
     LOGGER.info("Received update for job configuration of job " + updateJobArrival.getJobName());
+    String jobName = updateJobArrival.getJobName();
+    boolean throttleEnabled = PropertiesUtils.getPropAsBoolean(updateJobArrival.getJobConfig(),
+        GobblinClusterConfigurationKeys.HELIX_JOB_SCHEDULING_THROTTLE_ENABLED_KEY,
+        String.valueOf(GobblinClusterConfigurationKeys.DEFAULT_HELIX_JOB_SCHEDULING_THROTTLE_ENABLED_KEY));
+
+    if (throttleEnabled && this.jobStartTimeMap.containsKey(jobName)) {
+      Instant jobStartTime = this.jobStartTimeMap.get(jobName);
+      Duration workflowDuration = Duration.between(jobStartTime, Instant.now());
+      Duration difference = workflowDuration.minus(throttleTimeoutDuration);
+      if (difference.isNegative()) {

Review Comment:
   add comment hear to say we skip recalculation is workflowDuration is < throttleTimeoutDuration. It's also worth adding a log statement here so you can track when replanning is skipped



##########
gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java:
##########
@@ -222,4 +222,11 @@ public class GobblinClusterConfigurationKeys {
   public static final String CONTAINER_ID_KEY = GOBBLIN_HELIX_PREFIX + "containerId";
 
   public static final String GOBBLIN_CLUSTER_SYSTEM_PROPERTY_PREFIX = GOBBLIN_CLUSTER_PREFIX + "sysProps";
+
+  public static final String HELIX_JOB_SCHEDULING_THROTTLE_ENABLED_KEY = "helix.job.scheduling.throttle.enabled";
+  public static final boolean DEFAULT_HELIX_JOB_SCHEDULING_THROTTLE_ENABLED_KEY = false;

Review Comment:
   Not sure the use of word `throttle` captures what this key is actually used for. Phrases that make more sense to me
   - Short circuit replanning
   - Early exit
   - Shortcut evaluation



##########
gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinHelixJobSchedulerTest.java:
##########
@@ -132,57 +143,260 @@ public void setUp()
   @Test
   public void testNewJobAndUpdate()
       throws Exception {
+    try (MockedStatic<Instant> mocked = mockStatic(Instant.class, CALLS_REAL_METHODS)) {
+      mocked.when(Instant::now).thenReturn(beginTime, longPeriod);
+      HelixManager helixManager = HelixManagerFactory
+          .getZKHelixManager(helixClusterName, TestHelper.TEST_HELIX_INSTANCE_NAME, InstanceType.CONTROLLER,
+              zkConnectingString);
+
+      GobblinHelixJobScheduler jobScheduler = createJobScheduler(helixManager);
+
+      final Properties properties1 =
+          GobblinHelixJobLauncherTest.generateJobProperties(
+              this.baseConfig, "NewJobAndUpdate", workflowIdSuffix1);
+      NewJobConfigArrivalEvent newJobConfigArrivalEvent = createJobConfigArrivalEvent(properties1, workflowIdSuffix2);
+      jobScheduler.handleNewJobConfigArrival(newJobConfigArrivalEvent);
+
+      connectAndAssertWorkflowId(workflowIdSuffix1, newJobConfigArrivalEvent, helixManager);
+
+      properties1.setProperty(GobblinClusterConfigurationKeys.HELIX_JOB_SCHEDULING_THROTTLE_ENABLED_KEY, "true");
+      jobScheduler.handleUpdateJobConfigArrival(
+          new UpdateJobConfigArrivalEvent(properties1.getProperty(ConfigurationKeys.JOB_NAME_KEY), properties1));
+
+      connectAndAssertWorkflowId(workflowIdSuffix2, newJobConfigArrivalEvent, helixManager);
+    }
+  }
+
+  @Test
+  public void testUpdateSameWorkflowShortPeriodThrottle()
+      throws Exception {
+    try (MockedStatic<Instant> mocked = mockStatic(Instant.class, CALLS_REAL_METHODS)) {
+      mocked.when(Instant::now).thenReturn(beginTime, shortPeriod);
+      HelixManager helixManager = HelixManagerFactory
+          .getZKHelixManager(helixClusterName, TestHelper.TEST_HELIX_INSTANCE_NAME, InstanceType.CONTROLLER,
+              zkConnectingString);
+
+      GobblinHelixJobScheduler jobScheduler = createJobScheduler(helixManager);
+
+      final Properties properties1 =
+          GobblinHelixJobLauncherTest.generateJobProperties(
+              this.baseConfig, "UpdateSameWorkflowShortPeriodThrottle", workflowIdSuffix1);
+      NewJobConfigArrivalEvent newJobConfigArrivalEvent = createJobConfigArrivalEvent(properties1, workflowIdSuffix2);
+      jobScheduler.handleNewJobConfigArrival(newJobConfigArrivalEvent);
+      connectAndAssertWorkflowId(workflowIdSuffix1, newJobConfigArrivalEvent, helixManager);
+
+      properties1.setProperty(GobblinClusterConfigurationKeys.HELIX_JOB_SCHEDULING_THROTTLE_ENABLED_KEY, "true");
+      jobScheduler.handleUpdateJobConfigArrival(
+          new UpdateJobConfigArrivalEvent(properties1.getProperty(ConfigurationKeys.JOB_NAME_KEY), properties1));
+
+      connectAndAssertWorkflowId(workflowIdSuffix1, newJobConfigArrivalEvent, helixManager);
+    }
+  }
+
+  @Test
+  public void testUpdateSameWorkflowLongPeriodNoThrottle()
+      throws Exception {
+    try (MockedStatic<Instant> mocked = mockStatic(Instant.class, CALLS_REAL_METHODS)) {
+      mocked.when(Instant::now).thenReturn(beginTime, longPeriod);
+      HelixManager helixManager = HelixManagerFactory
+          .getZKHelixManager(helixClusterName, TestHelper.TEST_HELIX_INSTANCE_NAME, InstanceType.CONTROLLER,
+              zkConnectingString);
+      GobblinHelixJobScheduler jobScheduler = createJobScheduler(helixManager);
+      final Properties properties1 =
+          GobblinHelixJobLauncherTest.generateJobProperties(
+              this.baseConfig, "UpdateSameWorkflowLongPeriodNoThrottle", workflowIdSuffix1);
+      NewJobConfigArrivalEvent newJobConfigArrivalEvent = createJobConfigArrivalEvent(properties1, workflowIdSuffix2);
+      jobScheduler.handleNewJobConfigArrival(newJobConfigArrivalEvent);
+      connectAndAssertWorkflowId(workflowIdSuffix1, newJobConfigArrivalEvent, helixManager);
+
+      properties1.setProperty(GobblinClusterConfigurationKeys.HELIX_JOB_SCHEDULING_THROTTLE_ENABLED_KEY, "false");
+      jobScheduler.handleUpdateJobConfigArrival(
+          new UpdateJobConfigArrivalEvent(properties1.getProperty(ConfigurationKeys.JOB_NAME_KEY), properties1));
+      connectAndAssertWorkflowId(workflowIdSuffix2, newJobConfigArrivalEvent, helixManager);
+    }
+  }
+
+  @Test
+  public void testUpdateSameWorkflowShortPeriodNoThrottle()

Review Comment:
   tests should be easy to follow ideally with short comment above test to describe the case, the name is quite helpful but an additional comment above or within the code itself will help. I know that short period + no throttle means -> do not skip but you want to comment this behavior somewhere. 



##########
gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinHelixJobSchedulerTest.java:
##########
@@ -132,57 +143,260 @@ public void setUp()
   @Test
   public void testNewJobAndUpdate()
       throws Exception {
+    try (MockedStatic<Instant> mocked = mockStatic(Instant.class, CALLS_REAL_METHODS)) {
+      mocked.when(Instant::now).thenReturn(beginTime, longPeriod);
+      HelixManager helixManager = HelixManagerFactory
+          .getZKHelixManager(helixClusterName, TestHelper.TEST_HELIX_INSTANCE_NAME, InstanceType.CONTROLLER,
+              zkConnectingString);
+
+      GobblinHelixJobScheduler jobScheduler = createJobScheduler(helixManager);
+
+      final Properties properties1 =
+          GobblinHelixJobLauncherTest.generateJobProperties(
+              this.baseConfig, "NewJobAndUpdate", workflowIdSuffix1);
+      NewJobConfigArrivalEvent newJobConfigArrivalEvent = createJobConfigArrivalEvent(properties1, workflowIdSuffix2);
+      jobScheduler.handleNewJobConfigArrival(newJobConfigArrivalEvent);
+
+      connectAndAssertWorkflowId(workflowIdSuffix1, newJobConfigArrivalEvent, helixManager);
+
+      properties1.setProperty(GobblinClusterConfigurationKeys.HELIX_JOB_SCHEDULING_THROTTLE_ENABLED_KEY, "true");
+      jobScheduler.handleUpdateJobConfigArrival(
+          new UpdateJobConfigArrivalEvent(properties1.getProperty(ConfigurationKeys.JOB_NAME_KEY), properties1));
+
+      connectAndAssertWorkflowId(workflowIdSuffix2, newJobConfigArrivalEvent, helixManager);
+    }
+  }
+
+  @Test
+  public void testUpdateSameWorkflowShortPeriodThrottle()
+      throws Exception {
+    try (MockedStatic<Instant> mocked = mockStatic(Instant.class, CALLS_REAL_METHODS)) {
+      mocked.when(Instant::now).thenReturn(beginTime, shortPeriod);
+      HelixManager helixManager = HelixManagerFactory
+          .getZKHelixManager(helixClusterName, TestHelper.TEST_HELIX_INSTANCE_NAME, InstanceType.CONTROLLER,
+              zkConnectingString);
+
+      GobblinHelixJobScheduler jobScheduler = createJobScheduler(helixManager);
+
+      final Properties properties1 =
+          GobblinHelixJobLauncherTest.generateJobProperties(
+              this.baseConfig, "UpdateSameWorkflowShortPeriodThrottle", workflowIdSuffix1);
+      NewJobConfigArrivalEvent newJobConfigArrivalEvent = createJobConfigArrivalEvent(properties1, workflowIdSuffix2);
+      jobScheduler.handleNewJobConfigArrival(newJobConfigArrivalEvent);
+      connectAndAssertWorkflowId(workflowIdSuffix1, newJobConfigArrivalEvent, helixManager);
+
+      properties1.setProperty(GobblinClusterConfigurationKeys.HELIX_JOB_SCHEDULING_THROTTLE_ENABLED_KEY, "true");

Review Comment:
   if there's no `properties2`, then just name this `properties`



##########
gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinHelixJobSchedulerTest.java:
##########
@@ -132,57 +143,260 @@ public void setUp()
   @Test
   public void testNewJobAndUpdate()
       throws Exception {
+    try (MockedStatic<Instant> mocked = mockStatic(Instant.class, CALLS_REAL_METHODS)) {
+      mocked.when(Instant::now).thenReturn(beginTime, longPeriod);
+      HelixManager helixManager = HelixManagerFactory
+          .getZKHelixManager(helixClusterName, TestHelper.TEST_HELIX_INSTANCE_NAME, InstanceType.CONTROLLER,
+              zkConnectingString);
+
+      GobblinHelixJobScheduler jobScheduler = createJobScheduler(helixManager);
+
+      final Properties properties1 =
+          GobblinHelixJobLauncherTest.generateJobProperties(
+              this.baseConfig, "NewJobAndUpdate", workflowIdSuffix1);
+      NewJobConfigArrivalEvent newJobConfigArrivalEvent = createJobConfigArrivalEvent(properties1, workflowIdSuffix2);
+      jobScheduler.handleNewJobConfigArrival(newJobConfigArrivalEvent);
+
+      connectAndAssertWorkflowId(workflowIdSuffix1, newJobConfigArrivalEvent, helixManager);
+
+      properties1.setProperty(GobblinClusterConfigurationKeys.HELIX_JOB_SCHEDULING_THROTTLE_ENABLED_KEY, "true");
+      jobScheduler.handleUpdateJobConfigArrival(
+          new UpdateJobConfigArrivalEvent(properties1.getProperty(ConfigurationKeys.JOB_NAME_KEY), properties1));
+
+      connectAndAssertWorkflowId(workflowIdSuffix2, newJobConfigArrivalEvent, helixManager);
+    }
+  }
+
+  @Test
+  public void testUpdateSameWorkflowShortPeriodThrottle()
+      throws Exception {
+    try (MockedStatic<Instant> mocked = mockStatic(Instant.class, CALLS_REAL_METHODS)) {
+      mocked.when(Instant::now).thenReturn(beginTime, shortPeriod);
+      HelixManager helixManager = HelixManagerFactory
+          .getZKHelixManager(helixClusterName, TestHelper.TEST_HELIX_INSTANCE_NAME, InstanceType.CONTROLLER,
+              zkConnectingString);
+
+      GobblinHelixJobScheduler jobScheduler = createJobScheduler(helixManager);
+
+      final Properties properties1 =
+          GobblinHelixJobLauncherTest.generateJobProperties(
+              this.baseConfig, "UpdateSameWorkflowShortPeriodThrottle", workflowIdSuffix1);
+      NewJobConfigArrivalEvent newJobConfigArrivalEvent = createJobConfigArrivalEvent(properties1, workflowIdSuffix2);
+      jobScheduler.handleNewJobConfigArrival(newJobConfigArrivalEvent);
+      connectAndAssertWorkflowId(workflowIdSuffix1, newJobConfigArrivalEvent, helixManager);
+
+      properties1.setProperty(GobblinClusterConfigurationKeys.HELIX_JOB_SCHEDULING_THROTTLE_ENABLED_KEY, "true");
+      jobScheduler.handleUpdateJobConfigArrival(
+          new UpdateJobConfigArrivalEvent(properties1.getProperty(ConfigurationKeys.JOB_NAME_KEY), properties1));
+
+      connectAndAssertWorkflowId(workflowIdSuffix1, newJobConfigArrivalEvent, helixManager);
+    }
+  }
+
+  @Test
+  public void testUpdateSameWorkflowLongPeriodNoThrottle()
+      throws Exception {
+    try (MockedStatic<Instant> mocked = mockStatic(Instant.class, CALLS_REAL_METHODS)) {
+      mocked.when(Instant::now).thenReturn(beginTime, longPeriod);
+      HelixManager helixManager = HelixManagerFactory
+          .getZKHelixManager(helixClusterName, TestHelper.TEST_HELIX_INSTANCE_NAME, InstanceType.CONTROLLER,
+              zkConnectingString);
+      GobblinHelixJobScheduler jobScheduler = createJobScheduler(helixManager);
+      final Properties properties1 =
+          GobblinHelixJobLauncherTest.generateJobProperties(
+              this.baseConfig, "UpdateSameWorkflowLongPeriodNoThrottle", workflowIdSuffix1);
+      NewJobConfigArrivalEvent newJobConfigArrivalEvent = createJobConfigArrivalEvent(properties1, workflowIdSuffix2);
+      jobScheduler.handleNewJobConfigArrival(newJobConfigArrivalEvent);
+      connectAndAssertWorkflowId(workflowIdSuffix1, newJobConfigArrivalEvent, helixManager);
+
+      properties1.setProperty(GobblinClusterConfigurationKeys.HELIX_JOB_SCHEDULING_THROTTLE_ENABLED_KEY, "false");
+      jobScheduler.handleUpdateJobConfigArrival(
+          new UpdateJobConfigArrivalEvent(properties1.getProperty(ConfigurationKeys.JOB_NAME_KEY), properties1));
+      connectAndAssertWorkflowId(workflowIdSuffix2, newJobConfigArrivalEvent, helixManager);
+    }
+  }
+
+  @Test
+  public void testUpdateSameWorkflowShortPeriodNoThrottle()
+      throws Exception {
+    try (MockedStatic<Instant> mocked = mockStatic(Instant.class, CALLS_REAL_METHODS)) {
+      mocked.when(Instant::now).thenReturn(beginTime, shortPeriod);
+      HelixManager helixManager = HelixManagerFactory
+          .getZKHelixManager(helixClusterName, TestHelper.TEST_HELIX_INSTANCE_NAME, InstanceType.CONTROLLER,
+              zkConnectingString);
+      GobblinHelixJobScheduler jobScheduler = createJobScheduler(helixManager);
+      final Properties properties1 =
+          GobblinHelixJobLauncherTest.generateJobProperties(
+              this.baseConfig, "UpdateSameWorkflowShortPeriodNoThrottle", workflowIdSuffix1);
+      NewJobConfigArrivalEvent newJobConfigArrivalEvent = createJobConfigArrivalEvent(properties1, workflowIdSuffix2);
+      jobScheduler.handleNewJobConfigArrival(newJobConfigArrivalEvent);
+      connectAndAssertWorkflowId(workflowIdSuffix1, newJobConfigArrivalEvent, helixManager);
+
+      properties1.setProperty(GobblinClusterConfigurationKeys.HELIX_JOB_SCHEDULING_THROTTLE_ENABLED_KEY, "false");
+      jobScheduler.handleUpdateJobConfigArrival(
+          new UpdateJobConfigArrivalEvent(properties1.getProperty(ConfigurationKeys.JOB_NAME_KEY), properties1));
+      connectAndAssertWorkflowId(workflowIdSuffix2, newJobConfigArrivalEvent, helixManager);
+    }
+  }
+
+  public void testUpdateDiffWorkflowShortPeriodThrottle()
+      throws Exception {
+    try (MockedStatic<Instant> mocked = mockStatic(Instant.class, CALLS_REAL_METHODS)) {
+      mocked.when(Instant::now).thenReturn(beginTime, shortPeriod);
+      HelixManager helixManager = HelixManagerFactory
+          .getZKHelixManager(helixClusterName, TestHelper.TEST_HELIX_INSTANCE_NAME, InstanceType.CONTROLLER,
+              zkConnectingString);
+      GobblinHelixJobScheduler jobScheduler = createJobScheduler(helixManager);
+      final Properties properties1 =
+          GobblinHelixJobLauncherTest.generateJobProperties(
+              this.baseConfig, "UpdateDiffWorkflowShortPeriodThrottle1", workflowIdSuffix1);
+      NewJobConfigArrivalEvent newJobConfigArrivalEvent = createJobConfigArrivalEvent(properties1, workflowIdSuffix2);
+      jobScheduler.handleNewJobConfigArrival(newJobConfigArrivalEvent);
+      connectAndAssertWorkflowId(workflowIdSuffix1, newJobConfigArrivalEvent, helixManager);
+
+      final Properties properties2 =
+          GobblinHelixJobLauncherTest.generateJobProperties(
+              this.baseConfig, "UpdateDiffWorkflowShortPeriodThrottle2", workflowIdSuffix3);
+      NewJobConfigArrivalEvent newJobConfigArrivalEvent2 =
+          new NewJobConfigArrivalEvent(properties2.getProperty(ConfigurationKeys.JOB_NAME_KEY), properties2);
+      properties2.setProperty(GobblinClusterConfigurationKeys.HELIX_JOB_SCHEDULING_THROTTLE_ENABLED_KEY, "true");
+      jobScheduler.handleUpdateJobConfigArrival(
+          new UpdateJobConfigArrivalEvent(properties2.getProperty(ConfigurationKeys.JOB_NAME_KEY), properties2));
+      connectAndAssertWorkflowId(workflowIdSuffix3, newJobConfigArrivalEvent2, helixManager);
+    }
+  }
+
+  @Test
+  public void testUpdateDiffWorkflowShortPeriodNoThrottle()
+      throws Exception {
+    try (MockedStatic<Instant> mocked = mockStatic(Instant.class, CALLS_REAL_METHODS)) {
+      mocked.when(Instant::now).thenReturn(beginTime, shortPeriod);
+      HelixManager helixManager = HelixManagerFactory
+          .getZKHelixManager(helixClusterName, TestHelper.TEST_HELIX_INSTANCE_NAME, InstanceType.CONTROLLER,
+              zkConnectingString);
+      GobblinHelixJobScheduler jobScheduler = createJobScheduler(helixManager);
+      final Properties properties1 =
+          GobblinHelixJobLauncherTest.generateJobProperties(
+              this.baseConfig, "UpdateDiffWorkflowShortPeriodNoThrottle1", workflowIdSuffix1);
+      NewJobConfigArrivalEvent newJobConfigArrivalEvent = createJobConfigArrivalEvent(properties1, workflowIdSuffix2);
+      jobScheduler.handleNewJobConfigArrival(newJobConfigArrivalEvent);
+      connectAndAssertWorkflowId(workflowIdSuffix1, newJobConfigArrivalEvent, helixManager);
+
+      final Properties properties2 =
+          GobblinHelixJobLauncherTest.generateJobProperties(
+              this.baseConfig, "UpdateDiffWorkflowShortPeriodNoThrottle2", workflowIdSuffix3);
+      NewJobConfigArrivalEvent newJobConfigArrivalEvent2 =
+          new NewJobConfigArrivalEvent(properties2.getProperty(ConfigurationKeys.JOB_NAME_KEY), properties2);
+      properties2.setProperty(GobblinClusterConfigurationKeys.HELIX_JOB_SCHEDULING_THROTTLE_ENABLED_KEY, "false");
+      jobScheduler.handleUpdateJobConfigArrival(
+          new UpdateJobConfigArrivalEvent(properties2.getProperty(ConfigurationKeys.JOB_NAME_KEY), properties2));
+      connectAndAssertWorkflowId(workflowIdSuffix3, newJobConfigArrivalEvent2, helixManager);
+    }
+  }
+
+  @Test
+  public void testUpdateDiffWorkflowLongPeriodThrottle()
+      throws Exception {
+    try (MockedStatic<Instant> mocked = mockStatic(Instant.class, CALLS_REAL_METHODS)) {
+      mocked.when(Instant::now).thenReturn(beginTime, longPeriod);
+      HelixManager helixManager = HelixManagerFactory
+          .getZKHelixManager(helixClusterName, TestHelper.TEST_HELIX_INSTANCE_NAME, InstanceType.CONTROLLER,
+              zkConnectingString);
+      GobblinHelixJobScheduler jobScheduler = createJobScheduler(helixManager);
+      final Properties properties1 =
+          GobblinHelixJobLauncherTest.generateJobProperties(
+              this.baseConfig, "UpdateDiffWorkflowLongPeriodThrottle1", workflowIdSuffix1);
+      NewJobConfigArrivalEvent newJobConfigArrivalEvent = createJobConfigArrivalEvent(properties1, workflowIdSuffix2);
+      jobScheduler.handleNewJobConfigArrival(newJobConfigArrivalEvent);
+      connectAndAssertWorkflowId(workflowIdSuffix1, newJobConfigArrivalEvent, helixManager);
+
+      final Properties properties2 =
+          GobblinHelixJobLauncherTest.generateJobProperties(
+              this.baseConfig, "UpdateDiffWorkflowLongPeriodThrottle2", workflowIdSuffix3);
+      NewJobConfigArrivalEvent newJobConfigArrivalEvent2 =
+          new NewJobConfigArrivalEvent(properties2.getProperty(ConfigurationKeys.JOB_NAME_KEY), properties2);
+      properties2.setProperty(GobblinClusterConfigurationKeys.HELIX_JOB_SCHEDULING_THROTTLE_ENABLED_KEY, "true");
+      jobScheduler.handleUpdateJobConfigArrival(
+          new UpdateJobConfigArrivalEvent(properties2.getProperty(ConfigurationKeys.JOB_NAME_KEY), properties2));
+      connectAndAssertWorkflowId(workflowIdSuffix3, newJobConfigArrivalEvent2, helixManager);
+    }
+  }
+
+  @Test
+  public void testUpdateDiffWorkflowLongPeriodNoThrottle()
+      throws Exception {
+    try (MockedStatic<Instant> mocked = mockStatic(Instant.class, CALLS_REAL_METHODS)) {
+      mocked.when(Instant::now).thenReturn(beginTime, longPeriod);
+      HelixManager helixManager = HelixManagerFactory
+          .getZKHelixManager(helixClusterName, TestHelper.TEST_HELIX_INSTANCE_NAME, InstanceType.CONTROLLER,
+              zkConnectingString);
+      GobblinHelixJobScheduler jobScheduler = createJobScheduler(helixManager);
+      final Properties properties1 =
+          GobblinHelixJobLauncherTest.generateJobProperties(
+              this.baseConfig, "UpdateDiffWorkflowLongPeriodNoThrottle1", workflowIdSuffix1);
+      NewJobConfigArrivalEvent newJobConfigArrivalEvent = createJobConfigArrivalEvent(properties1, workflowIdSuffix2);
+      jobScheduler.handleNewJobConfigArrival(newJobConfigArrivalEvent);
+      connectAndAssertWorkflowId(workflowIdSuffix1, newJobConfigArrivalEvent, helixManager);
+
+      final Properties properties2 =
+          GobblinHelixJobLauncherTest.generateJobProperties(
+              this.baseConfig, "UpdateDiffWorkflowLongPeriodNoThrottle2", workflowIdSuffix3);
+      NewJobConfigArrivalEvent newJobConfigArrivalEvent2 =
+          new NewJobConfigArrivalEvent(properties2.getProperty(ConfigurationKeys.JOB_NAME_KEY), properties2);
+      properties2.setProperty(GobblinClusterConfigurationKeys.HELIX_JOB_SCHEDULING_THROTTLE_ENABLED_KEY, "false");
+      jobScheduler.handleUpdateJobConfigArrival(
+          new UpdateJobConfigArrivalEvent(properties2.getProperty(ConfigurationKeys.JOB_NAME_KEY), properties2));
+      connectAndAssertWorkflowId(workflowIdSuffix3, newJobConfigArrivalEvent2, helixManager);
+    }
+  }
+
+  private GobblinHelixJobScheduler createJobScheduler(HelixManager helixManager) throws Exception {
+    java.nio.file.Path p = Files.createTempDirectory(GobblinHelixJobScheduler.class.getSimpleName());
     Config config = ConfigFactory.empty().withValue(ConfigurationKeys.JOB_CONFIG_FILE_GENERAL_PATH_KEY,
-        ConfigValueFactory.fromAnyRef("/tmp/" + GobblinHelixJobScheduler.class.getSimpleName()));
+        ConfigValueFactory.fromAnyRef(p.toString()));
     SchedulerService schedulerService = new SchedulerService(new Properties());
     NonObservingFSJobCatalog jobCatalog = new NonObservingFSJobCatalog(config);
     jobCatalog.startAsync();
-    GobblinHelixJobScheduler jobScheduler =
-        new GobblinHelixJobScheduler(ConfigFactory.empty(), this.helixManager, java.util.Optional.empty(),
-            new EventBus(), appWorkDir, Lists.emptyList(), schedulerService, jobCatalog);
-
-    final Properties properties1 =
-        GobblinHelixJobLauncherTest.generateJobProperties(this.baseConfig, "1", workflowIdSuffix1);
-    properties1.setProperty(GobblinClusterConfigurationKeys.CANCEL_RUNNING_JOB_ON_DELETE, "true");
+    return new GobblinHelixJobScheduler(ConfigFactory.empty(), helixManager, java.util.Optional.empty(),
+        new EventBus(), appWorkDir, Lists.emptyList(), schedulerService, jobCatalog);
+  }
 
+  private NewJobConfigArrivalEvent createJobConfigArrivalEvent(Properties properties, String suffix) {
+    properties.setProperty(GobblinClusterConfigurationKeys.CANCEL_RUNNING_JOB_ON_DELETE, "true");
     NewJobConfigArrivalEvent newJobConfigArrivalEvent =
-        new NewJobConfigArrivalEvent(properties1.getProperty(ConfigurationKeys.JOB_NAME_KEY), properties1);
-    jobScheduler.handleNewJobConfigArrival(newJobConfigArrivalEvent);
-    properties1.setProperty(ConfigurationKeys.JOB_ID_KEY,
-        "job_" + properties1.getProperty(ConfigurationKeys.JOB_NAME_KEY) + workflowIdSuffix2);
-    Map<String, String> workflowIdMap;
-    this.helixManager.connect();
+        new NewJobConfigArrivalEvent(properties.getProperty(ConfigurationKeys.JOB_NAME_KEY), properties);
+    properties.setProperty(ConfigurationKeys.JOB_ID_KEY,
+        "job_" + properties.getProperty(ConfigurationKeys.JOB_NAME_KEY) + suffix);
+    return newJobConfigArrivalEvent;
+  }
 
-    String workFlowId = null;
-    long endTime = System.currentTimeMillis() + 30000;
-    while (System.currentTimeMillis() < endTime) {
-      workflowIdMap = HelixUtils.getWorkflowIdsFromJobNames(this.helixManager,
-          Collections.singletonList(newJobConfigArrivalEvent.getJobName()));
-      if (workflowIdMap.containsKey(newJobConfigArrivalEvent.getJobName())) {
-        workFlowId = workflowIdMap.get(newJobConfigArrivalEvent.getJobName());
-        break;
-      }
-      Thread.sleep(100);
-    }
+  private void connectAndAssertWorkflowId(String expectedSuffix, NewJobConfigArrivalEvent newJobConfigArrivalEvent, HelixManager helixManager ) throws Exception {
+    helixManager.connect();
+    String workFlowId = getWorkflowID(newJobConfigArrivalEvent, helixManager);
     Assert.assertNotNull(workFlowId);
-    Assert.assertTrue(workFlowId.endsWith(workflowIdSuffix1));
+    Assert.assertTrue(workFlowId.endsWith(expectedSuffix));
+  }
 
-    jobScheduler.handleUpdateJobConfigArrival(
-        new UpdateJobConfigArrivalEvent(properties1.getProperty(ConfigurationKeys.JOB_NAME_KEY), properties1));
-    this.helixManager.connect();
-    endTime = System.currentTimeMillis() + 30000;
+  private String getWorkflowID (NewJobConfigArrivalEvent newJobConfigArrivalEvent, HelixManager helixManager )
+      throws Exception {
+    long endTime = System.currentTimeMillis() + 30000;

Review Comment:
   why 30000 here, not obvious? add comment to explain what end time is being created



##########
gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinHelixJobSchedulerTest.java:
##########
@@ -132,57 +143,260 @@ public void setUp()
   @Test
   public void testNewJobAndUpdate()
       throws Exception {
+    try (MockedStatic<Instant> mocked = mockStatic(Instant.class, CALLS_REAL_METHODS)) {
+      mocked.when(Instant::now).thenReturn(beginTime, longPeriod);
+      HelixManager helixManager = HelixManagerFactory
+          .getZKHelixManager(helixClusterName, TestHelper.TEST_HELIX_INSTANCE_NAME, InstanceType.CONTROLLER,
+              zkConnectingString);
+
+      GobblinHelixJobScheduler jobScheduler = createJobScheduler(helixManager);
+
+      final Properties properties1 =
+          GobblinHelixJobLauncherTest.generateJobProperties(
+              this.baseConfig, "NewJobAndUpdate", workflowIdSuffix1);
+      NewJobConfigArrivalEvent newJobConfigArrivalEvent = createJobConfigArrivalEvent(properties1, workflowIdSuffix2);
+      jobScheduler.handleNewJobConfigArrival(newJobConfigArrivalEvent);
+
+      connectAndAssertWorkflowId(workflowIdSuffix1, newJobConfigArrivalEvent, helixManager);
+
+      properties1.setProperty(GobblinClusterConfigurationKeys.HELIX_JOB_SCHEDULING_THROTTLE_ENABLED_KEY, "true");
+      jobScheduler.handleUpdateJobConfigArrival(
+          new UpdateJobConfigArrivalEvent(properties1.getProperty(ConfigurationKeys.JOB_NAME_KEY), properties1));
+
+      connectAndAssertWorkflowId(workflowIdSuffix2, newJobConfigArrivalEvent, helixManager);
+    }
+  }
+
+  @Test
+  public void testUpdateSameWorkflowShortPeriodThrottle()
+      throws Exception {
+    try (MockedStatic<Instant> mocked = mockStatic(Instant.class, CALLS_REAL_METHODS)) {
+      mocked.when(Instant::now).thenReturn(beginTime, shortPeriod);
+      HelixManager helixManager = HelixManagerFactory
+          .getZKHelixManager(helixClusterName, TestHelper.TEST_HELIX_INSTANCE_NAME, InstanceType.CONTROLLER,
+              zkConnectingString);
+
+      GobblinHelixJobScheduler jobScheduler = createJobScheduler(helixManager);
+
+      final Properties properties1 =
+          GobblinHelixJobLauncherTest.generateJobProperties(
+              this.baseConfig, "UpdateSameWorkflowShortPeriodThrottle", workflowIdSuffix1);
+      NewJobConfigArrivalEvent newJobConfigArrivalEvent = createJobConfigArrivalEvent(properties1, workflowIdSuffix2);
+      jobScheduler.handleNewJobConfigArrival(newJobConfigArrivalEvent);
+      connectAndAssertWorkflowId(workflowIdSuffix1, newJobConfigArrivalEvent, helixManager);
+
+      properties1.setProperty(GobblinClusterConfigurationKeys.HELIX_JOB_SCHEDULING_THROTTLE_ENABLED_KEY, "true");
+      jobScheduler.handleUpdateJobConfigArrival(
+          new UpdateJobConfigArrivalEvent(properties1.getProperty(ConfigurationKeys.JOB_NAME_KEY), properties1));
+
+      connectAndAssertWorkflowId(workflowIdSuffix1, newJobConfigArrivalEvent, helixManager);
+    }
+  }
+
+  @Test
+  public void testUpdateSameWorkflowLongPeriodNoThrottle()
+      throws Exception {
+    try (MockedStatic<Instant> mocked = mockStatic(Instant.class, CALLS_REAL_METHODS)) {
+      mocked.when(Instant::now).thenReturn(beginTime, longPeriod);
+      HelixManager helixManager = HelixManagerFactory
+          .getZKHelixManager(helixClusterName, TestHelper.TEST_HELIX_INSTANCE_NAME, InstanceType.CONTROLLER,
+              zkConnectingString);
+      GobblinHelixJobScheduler jobScheduler = createJobScheduler(helixManager);
+      final Properties properties1 =
+          GobblinHelixJobLauncherTest.generateJobProperties(
+              this.baseConfig, "UpdateSameWorkflowLongPeriodNoThrottle", workflowIdSuffix1);
+      NewJobConfigArrivalEvent newJobConfigArrivalEvent = createJobConfigArrivalEvent(properties1, workflowIdSuffix2);
+      jobScheduler.handleNewJobConfigArrival(newJobConfigArrivalEvent);
+      connectAndAssertWorkflowId(workflowIdSuffix1, newJobConfigArrivalEvent, helixManager);
+
+      properties1.setProperty(GobblinClusterConfigurationKeys.HELIX_JOB_SCHEDULING_THROTTLE_ENABLED_KEY, "false");
+      jobScheduler.handleUpdateJobConfigArrival(
+          new UpdateJobConfigArrivalEvent(properties1.getProperty(ConfigurationKeys.JOB_NAME_KEY), properties1));
+      connectAndAssertWorkflowId(workflowIdSuffix2, newJobConfigArrivalEvent, helixManager);
+    }
+  }
+
+  @Test
+  public void testUpdateSameWorkflowShortPeriodNoThrottle()
+      throws Exception {
+    try (MockedStatic<Instant> mocked = mockStatic(Instant.class, CALLS_REAL_METHODS)) {
+      mocked.when(Instant::now).thenReturn(beginTime, shortPeriod);
+      HelixManager helixManager = HelixManagerFactory
+          .getZKHelixManager(helixClusterName, TestHelper.TEST_HELIX_INSTANCE_NAME, InstanceType.CONTROLLER,
+              zkConnectingString);
+      GobblinHelixJobScheduler jobScheduler = createJobScheduler(helixManager);
+      final Properties properties1 =
+          GobblinHelixJobLauncherTest.generateJobProperties(
+              this.baseConfig, "UpdateSameWorkflowShortPeriodNoThrottle", workflowIdSuffix1);
+      NewJobConfigArrivalEvent newJobConfigArrivalEvent = createJobConfigArrivalEvent(properties1, workflowIdSuffix2);
+      jobScheduler.handleNewJobConfigArrival(newJobConfigArrivalEvent);
+      connectAndAssertWorkflowId(workflowIdSuffix1, newJobConfigArrivalEvent, helixManager);
+
+      properties1.setProperty(GobblinClusterConfigurationKeys.HELIX_JOB_SCHEDULING_THROTTLE_ENABLED_KEY, "false");
+      jobScheduler.handleUpdateJobConfigArrival(
+          new UpdateJobConfigArrivalEvent(properties1.getProperty(ConfigurationKeys.JOB_NAME_KEY), properties1));
+      connectAndAssertWorkflowId(workflowIdSuffix2, newJobConfigArrivalEvent, helixManager);
+    }
+  }
+
+  public void testUpdateDiffWorkflowShortPeriodThrottle()
+      throws Exception {
+    try (MockedStatic<Instant> mocked = mockStatic(Instant.class, CALLS_REAL_METHODS)) {
+      mocked.when(Instant::now).thenReturn(beginTime, shortPeriod);
+      HelixManager helixManager = HelixManagerFactory
+          .getZKHelixManager(helixClusterName, TestHelper.TEST_HELIX_INSTANCE_NAME, InstanceType.CONTROLLER,
+              zkConnectingString);
+      GobblinHelixJobScheduler jobScheduler = createJobScheduler(helixManager);
+      final Properties properties1 =
+          GobblinHelixJobLauncherTest.generateJobProperties(
+              this.baseConfig, "UpdateDiffWorkflowShortPeriodThrottle1", workflowIdSuffix1);
+      NewJobConfigArrivalEvent newJobConfigArrivalEvent = createJobConfigArrivalEvent(properties1, workflowIdSuffix2);
+      jobScheduler.handleNewJobConfigArrival(newJobConfigArrivalEvent);
+      connectAndAssertWorkflowId(workflowIdSuffix1, newJobConfigArrivalEvent, helixManager);
+
+      final Properties properties2 =
+          GobblinHelixJobLauncherTest.generateJobProperties(
+              this.baseConfig, "UpdateDiffWorkflowShortPeriodThrottle2", workflowIdSuffix3);
+      NewJobConfigArrivalEvent newJobConfigArrivalEvent2 =
+          new NewJobConfigArrivalEvent(properties2.getProperty(ConfigurationKeys.JOB_NAME_KEY), properties2);
+      properties2.setProperty(GobblinClusterConfigurationKeys.HELIX_JOB_SCHEDULING_THROTTLE_ENABLED_KEY, "true");
+      jobScheduler.handleUpdateJobConfigArrival(
+          new UpdateJobConfigArrivalEvent(properties2.getProperty(ConfigurationKeys.JOB_NAME_KEY), properties2));
+      connectAndAssertWorkflowId(workflowIdSuffix3, newJobConfigArrivalEvent2, helixManager);
+    }
+  }
+
+  @Test
+  public void testUpdateDiffWorkflowShortPeriodNoThrottle()
+      throws Exception {
+    try (MockedStatic<Instant> mocked = mockStatic(Instant.class, CALLS_REAL_METHODS)) {
+      mocked.when(Instant::now).thenReturn(beginTime, shortPeriod);
+      HelixManager helixManager = HelixManagerFactory
+          .getZKHelixManager(helixClusterName, TestHelper.TEST_HELIX_INSTANCE_NAME, InstanceType.CONTROLLER,
+              zkConnectingString);
+      GobblinHelixJobScheduler jobScheduler = createJobScheduler(helixManager);
+      final Properties properties1 =
+          GobblinHelixJobLauncherTest.generateJobProperties(
+              this.baseConfig, "UpdateDiffWorkflowShortPeriodNoThrottle1", workflowIdSuffix1);
+      NewJobConfigArrivalEvent newJobConfigArrivalEvent = createJobConfigArrivalEvent(properties1, workflowIdSuffix2);
+      jobScheduler.handleNewJobConfigArrival(newJobConfigArrivalEvent);
+      connectAndAssertWorkflowId(workflowIdSuffix1, newJobConfigArrivalEvent, helixManager);
+
+      final Properties properties2 =
+          GobblinHelixJobLauncherTest.generateJobProperties(
+              this.baseConfig, "UpdateDiffWorkflowShortPeriodNoThrottle2", workflowIdSuffix3);
+      NewJobConfigArrivalEvent newJobConfigArrivalEvent2 =
+          new NewJobConfigArrivalEvent(properties2.getProperty(ConfigurationKeys.JOB_NAME_KEY), properties2);
+      properties2.setProperty(GobblinClusterConfigurationKeys.HELIX_JOB_SCHEDULING_THROTTLE_ENABLED_KEY, "false");
+      jobScheduler.handleUpdateJobConfigArrival(
+          new UpdateJobConfigArrivalEvent(properties2.getProperty(ConfigurationKeys.JOB_NAME_KEY), properties2));
+      connectAndAssertWorkflowId(workflowIdSuffix3, newJobConfigArrivalEvent2, helixManager);
+    }
+  }
+
+  @Test
+  public void testUpdateDiffWorkflowLongPeriodThrottle()
+      throws Exception {
+    try (MockedStatic<Instant> mocked = mockStatic(Instant.class, CALLS_REAL_METHODS)) {
+      mocked.when(Instant::now).thenReturn(beginTime, longPeriod);
+      HelixManager helixManager = HelixManagerFactory
+          .getZKHelixManager(helixClusterName, TestHelper.TEST_HELIX_INSTANCE_NAME, InstanceType.CONTROLLER,
+              zkConnectingString);
+      GobblinHelixJobScheduler jobScheduler = createJobScheduler(helixManager);
+      final Properties properties1 =
+          GobblinHelixJobLauncherTest.generateJobProperties(
+              this.baseConfig, "UpdateDiffWorkflowLongPeriodThrottle1", workflowIdSuffix1);
+      NewJobConfigArrivalEvent newJobConfigArrivalEvent = createJobConfigArrivalEvent(properties1, workflowIdSuffix2);
+      jobScheduler.handleNewJobConfigArrival(newJobConfigArrivalEvent);
+      connectAndAssertWorkflowId(workflowIdSuffix1, newJobConfigArrivalEvent, helixManager);
+
+      final Properties properties2 =
+          GobblinHelixJobLauncherTest.generateJobProperties(
+              this.baseConfig, "UpdateDiffWorkflowLongPeriodThrottle2", workflowIdSuffix3);
+      NewJobConfigArrivalEvent newJobConfigArrivalEvent2 =
+          new NewJobConfigArrivalEvent(properties2.getProperty(ConfigurationKeys.JOB_NAME_KEY), properties2);
+      properties2.setProperty(GobblinClusterConfigurationKeys.HELIX_JOB_SCHEDULING_THROTTLE_ENABLED_KEY, "true");
+      jobScheduler.handleUpdateJobConfigArrival(
+          new UpdateJobConfigArrivalEvent(properties2.getProperty(ConfigurationKeys.JOB_NAME_KEY), properties2));
+      connectAndAssertWorkflowId(workflowIdSuffix3, newJobConfigArrivalEvent2, helixManager);
+    }
+  }
+
+  @Test
+  public void testUpdateDiffWorkflowLongPeriodNoThrottle()
+      throws Exception {
+    try (MockedStatic<Instant> mocked = mockStatic(Instant.class, CALLS_REAL_METHODS)) {
+      mocked.when(Instant::now).thenReturn(beginTime, longPeriod);
+      HelixManager helixManager = HelixManagerFactory
+          .getZKHelixManager(helixClusterName, TestHelper.TEST_HELIX_INSTANCE_NAME, InstanceType.CONTROLLER,
+              zkConnectingString);
+      GobblinHelixJobScheduler jobScheduler = createJobScheduler(helixManager);

Review Comment:
   same here u can reuse perhaps, lots of repeated code. Let's try to DRY (do not repeat yourself)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@gobblin.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org