You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@gobblin.apache.org by "phet (via GitHub)" <gi...@apache.org> on 2023/03/09 02:09:54 UTC

[GitHub] [gobblin] phet commented on a diff in pull request #3656: [GOBBLIN-1797] Skip scheduling flows far into future

phet commented on code in PR #3656:
URL: https://github.com/apache/gobblin/pull/3656#discussion_r1130259453


##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java:
##########
@@ -108,24 +113,31 @@ public class GobblinServiceJobScheduler extends JobScheduler implements SpecCata
   @Getter
   protected final Map<String, Long> lastUpdatedTimeForFlowSpec;
   protected volatile int loadSpecsBatchSize = -1;
+  protected int thresholdToSkipSchedulingFlowsAfter;
   @Getter
   private volatile boolean isActive;
   private String serviceName;
-  private volatile Long averageGetSpecTimeValue = -1L;
+  private volatile Long perSpecGetRateValue = -1L;
   private volatile Long timeToInitializeSchedulerValue = -1L;
-  private volatile Long timeToObtainSpecUrisNanosValue = -1L;
-  private volatile Long individualGetSpecSpeedNanosValue = -1L;
-  private volatile Long addSpecTimeNanosValue = -1L;
-  private volatile Long flowCompilationTimeNanosValue = -1L;
-  private volatile Long timeToScheduleOneJobValue = -1L;
-  private final ContextAwareGauge averageGetSpecTimeNanos = metricContext.newContextAwareGauge(RuntimeMetrics.GOBBLIN_JOB_SCHEDULER_AVERAGE_GET_SPEC_SPEED_WHILE_LOADING_ALL_SPECS_NANOS, () -> this.averageGetSpecTimeValue);;
+  private volatile Long timeToObtainSpecUrisValue = -1L;
+  private volatile Long individualGetSpecSpeedValue = -1L;
+  private volatile Long eachCompleteAddSpecValue = -1L;
+  private volatile Long eachSpecCompilationValue = -1L;
+  private volatile Long eachScheduleJobValue = -1L;
+  private volatile Long totalGetSpecTimeValue = -1L;
+  private volatile Long totalAddSpecTimeValue = -1L;
+  private volatile int numJobsScheduledDuringStartupValue = -1;
+  private final ContextAwareGauge getSpecsPerSpecRateNanos = metricContext.newContextAwareGauge(RuntimeMetrics.GOBBLIN_JOB_SCHEDULER_GET_SPECS_DURING_STARTUP_PER_SPEC_RATE_NANOS, () -> this.perSpecGetRateValue);;

Review Comment:
   extra semi



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java:
##########
@@ -162,22 +174,26 @@ public GobblinServiceJobScheduler(@Named(InjectionNames.SERVICE_NAME) String ser
     this.scheduledFlowSpecs = Maps.newHashMap();
     this.lastUpdatedTimeForFlowSpec = Maps.newHashMap();
     this.loadSpecsBatchSize = Integer.parseInt(ConfigUtils.configToProperties(config).getProperty(ConfigurationKeys.LOAD_SPEC_BATCH_SIZE, String.valueOf(ConfigurationKeys.DEFAULT_LOAD_SPEC_BATCH_SIZE)));
+    this.thresholdToSkipSchedulingFlowsAfter = Integer.parseInt(ConfigUtils.configToProperties(config).getProperty(ConfigurationKeys.SKIP_SCHEDULING_FLOWS_AFTER_NUM_DAYS, String.valueOf(ConfigurationKeys.DEFAULT_NUM_DAYS_TO_SKIP_AFTER)));

Review Comment:
   since this is a number of days, let's name the variable accordingly



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java:
##########
@@ -231,17 +247,59 @@ public void run() {
     }
   }
 
-  /** Helps modify spec before adding to scheduler for adhoc flows */
+  /** Check that a spec should be scheduled and if it is, modify the spec of an adhoc flow before adding to scheduler*/
   private void addSpecHelperMethod(Spec spec) {
-    // Disable FLOW_RUN_IMMEDIATELY on service startup or leadership change if the property is set to true
-    if (spec instanceof FlowSpec && PropertiesUtils
-        .getPropAsBoolean(((FlowSpec) spec).getConfigAsProperties(), ConfigurationKeys.FLOW_RUN_IMMEDIATELY,
-            "false")) {
-      Spec modifiedSpec = disableFlowRunImmediatelyOnStart((FlowSpec) spec);
-      onAddSpec(modifiedSpec);
+    // Adhoc flows will not have any job schedule key, but we should schedule them
+    FlowSpec flowSpec = (FlowSpec) spec;

Review Comment:
   previously there was a `instanceof` check guarding the cast... are we absolutely certain RTTI will succeed?  even if so, best practice would be to code defensively...



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java:
##########
@@ -231,17 +247,59 @@ public void run() {
     }
   }
 
-  /** Helps modify spec before adding to scheduler for adhoc flows */
+  /** Check that a spec should be scheduled and if it is, modify the spec of an adhoc flow before adding to scheduler*/
   private void addSpecHelperMethod(Spec spec) {
-    // Disable FLOW_RUN_IMMEDIATELY on service startup or leadership change if the property is set to true
-    if (spec instanceof FlowSpec && PropertiesUtils
-        .getPropAsBoolean(((FlowSpec) spec).getConfigAsProperties(), ConfigurationKeys.FLOW_RUN_IMMEDIATELY,
-            "false")) {
-      Spec modifiedSpec = disableFlowRunImmediatelyOnStart((FlowSpec) spec);
-      onAddSpec(modifiedSpec);
+    // Adhoc flows will not have any job schedule key, but we should schedule them
+    FlowSpec flowSpec = (FlowSpec) spec;
+    if (!flowSpec.getConfig().hasPath(ConfigurationKeys.JOB_SCHEDULE_KEY)
+        || isNextRunWithinRangeToSchedule(flowSpec.getConfig().getString(ConfigurationKeys.JOB_SCHEDULE_KEY),
+        this.thresholdToSkipSchedulingFlowsAfter)) {
+      // Disable FLOW_RUN_IMMEDIATELY on service startup or leadership change if the property is set to true
+      if (spec instanceof FlowSpec && PropertiesUtils.getPropAsBoolean(((FlowSpec) spec).getConfigAsProperties(),
+          ConfigurationKeys.FLOW_RUN_IMMEDIATELY, "false")) {
+        Spec modifiedSpec = disableFlowRunImmediatelyOnStart((FlowSpec) spec);
+        onAddSpec(modifiedSpec);
+      } else {
+        onAddSpec(spec);
+      }
     } else {
-      onAddSpec(spec);
+      _log.info("Not scheduling spec {} during startup as next job to schedule is outside of threshold.", spec);

Review Comment:
   seems noisy to log >100k lines at info level.
   
   incrementing `numJobsScheduledDuringStartupValue` must apparently occur some ways away from this code... (that feels like the more appropriate substitute to track the skipped)



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java:
##########
@@ -251,9 +309,14 @@ private void addSpecHelperMethod(Spec spec) {
    * If it is newly brought up as the DR handler, will load additional FlowSpecs and handle transition properly.
    */
   private void scheduleSpecsFromCatalog() {
+    // TODO: clean up metrics after bottleneck is determined for startup to only keep most important ones

Review Comment:
   if helpful to measure now, likely helpful to retain...



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java:
##########
@@ -292,17 +355,23 @@ private void scheduleSpecsFromCatalog() {
         Spec spec = batchOfSpecsIterator.next();
         try {
           addSpecHelperMethod(spec);
-          urisLeftToSchedule.remove(spec.getUri());
+          totalAddSpecTime += this.eachCompleteAddSpecValue; // this is updated by each call to onAddSpec
+          actualNumFlowsScheduled += 1;
         } catch (Exception e) {
           // If there is an uncaught error thrown during compilation, log it and continue adding flows
           _log.error("Could not schedule spec {} from flowCatalog due to ", spec, e);
         }
+        urisLeftToSchedule.remove(spec.getUri());
       }
       startOffset += this.loadSpecsBatchSize;
-      // This count is used to ensure the average spec get time is calculated accurately for the last batch which may be
-      // smaller than the loadSpecsBatchSize
-      averageGetSpecTimeValue = (batchGetEndTime - batchGetStartTime) / batchOfSpecs.size();
+      totalGetTime += batchGetEndTime - batchGetStartTime;
+      // Don't skew the average get spec time value with the last batch that may be very small
+      if (batchOfSpecs.size() == this.loadSpecsBatchSize) {

Review Comment:
   pretty strict... 
   * maybe log always if `startOffset == 0` (to account for total num records < batch size)
   * perhaps so long as size() > round(0.75 * batchSize) ?



##########
gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java:
##########
@@ -90,7 +90,9 @@ public class ConfigurationKeys {
   public static final String JOB_RETRIGGERING_ENABLED = "job.retriggering.enabled";
   public static final String DEFAULT_JOB_RETRIGGERING_ENABLED = "true";
   public static final String LOAD_SPEC_BATCH_SIZE = "load.spec.batch.size";
-  public static final int DEFAULT_LOAD_SPEC_BATCH_SIZE = 100;
+  public static final int DEFAULT_LOAD_SPEC_BATCH_SIZE = 500;
+  public static final String SKIP_SCHEDULING_FLOWS_AFTER_NUM_DAYS = "skip.scheduling.flows.after.num.days";
+  public static final int DEFAULT_NUM_DAYS_TO_SKIP_AFTER = 100;

Review Comment:
   my $0.02 -
   
   sounds like 100d vs 365d shouldn't materially change how many flows get scheduled... but it would definitely extend the timeframe to accomplish redeployment, which is clearly safer ITO ensuring the presumed redeployment actually happens in time.  hence I vote also for 1 year.



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java:
##########
@@ -292,17 +355,23 @@ private void scheduleSpecsFromCatalog() {
         Spec spec = batchOfSpecsIterator.next();
         try {
           addSpecHelperMethod(spec);
-          urisLeftToSchedule.remove(spec.getUri());
+          totalAddSpecTime += this.eachCompleteAddSpecValue; // this is updated by each call to onAddSpec
+          actualNumFlowsScheduled += 1;
         } catch (Exception e) {
           // If there is an uncaught error thrown during compilation, log it and continue adding flows
           _log.error("Could not schedule spec {} from flowCatalog due to ", spec, e);
         }
+        urisLeftToSchedule.remove(spec.getUri());
       }
       startOffset += this.loadSpecsBatchSize;
-      // This count is used to ensure the average spec get time is calculated accurately for the last batch which may be
-      // smaller than the loadSpecsBatchSize
-      averageGetSpecTimeValue = (batchGetEndTime - batchGetStartTime) / batchOfSpecs.size();
+      totalGetTime += batchGetEndTime - batchGetStartTime;
+      // Don't skew the average get spec time value with the last batch that may be very small
+      if (batchOfSpecs.size() == this.loadSpecsBatchSize) {
+        perSpecGetRateValue = (batchGetEndTime - batchGetStartTime) / batchOfSpecs.size();
+      }
     }
+    // Reset value after its last value to get an accurate reading
+    perSpecGetRateValue = -1L;

Review Comment:
   if other metrics are to be reset too, let's do them all together at the end.  ideally we'd even perform in a timer callback 1 minute in the future, to ensure the value is finished emitting before being cleared...



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java:
##########
@@ -231,17 +247,59 @@ public void run() {
     }
   }
 
-  /** Helps modify spec before adding to scheduler for adhoc flows */
+  /** Check that a spec should be scheduled and if it is, modify the spec of an adhoc flow before adding to scheduler*/
   private void addSpecHelperMethod(Spec spec) {
-    // Disable FLOW_RUN_IMMEDIATELY on service startup or leadership change if the property is set to true
-    if (spec instanceof FlowSpec && PropertiesUtils
-        .getPropAsBoolean(((FlowSpec) spec).getConfigAsProperties(), ConfigurationKeys.FLOW_RUN_IMMEDIATELY,
-            "false")) {
-      Spec modifiedSpec = disableFlowRunImmediatelyOnStart((FlowSpec) spec);
-      onAddSpec(modifiedSpec);
+    // Adhoc flows will not have any job schedule key, but we should schedule them
+    FlowSpec flowSpec = (FlowSpec) spec;
+    if (!flowSpec.getConfig().hasPath(ConfigurationKeys.JOB_SCHEDULE_KEY)
+        || isNextRunWithinRangeToSchedule(flowSpec.getConfig().getString(ConfigurationKeys.JOB_SCHEDULE_KEY),
+        this.thresholdToSkipSchedulingFlowsAfter)) {
+      // Disable FLOW_RUN_IMMEDIATELY on service startup or leadership change if the property is set to true
+      if (spec instanceof FlowSpec && PropertiesUtils.getPropAsBoolean(((FlowSpec) spec).getConfigAsProperties(),
+          ConfigurationKeys.FLOW_RUN_IMMEDIATELY, "false")) {
+        Spec modifiedSpec = disableFlowRunImmediatelyOnStart((FlowSpec) spec);
+        onAddSpec(modifiedSpec);
+      } else {
+        onAddSpec(spec);
+      }
     } else {
-      onAddSpec(spec);
+      _log.info("Not scheduling spec {} during startup as next job to schedule is outside of threshold.", spec);
+    }
+  }
+
+  /**
+   * Given a cron expression calculates the time for next run in days from current time, rounding up to the nearest day.
+   * @param cronExpression
+   * @return num days until next run, max integer in the case it cannot be calculated
+   */
+  public static int nextRunInDays(String cronExpression) {
+    CronExpression cron = null;
+    Calendar cal = Calendar.getInstance(TimeZone.getTimeZone("UTC"));
+    double numMillisInADay = 86400000;
+    try {
+      cron = new CronExpression(cronExpression);
+      cron.setTimeZone(TimeZone.getTimeZone("UTC"));
+      Date now = new Date();
+      Date nextValidTimeAfter = cron.getNextValidTimeAfter(now);
+      cal.setTime(nextValidTimeAfter);
+      long diff = cal.getTimeInMillis() - System.currentTimeMillis();
+      double diffInDays = diff / numMillisInADay;
+      return (int) Math.round(diffInDays);
+    } catch (ParseException e) {
+      e.printStackTrace();
     }
+    return -1;
+  }
+
+  /**
+   * Returns true if next run for the given cron schedule is sooner than the threshold to skip scheduling after, false
+   * otherwise. If the cron expression cannot be parsed and the next run cannot be calculated returns true to schedule.
+   * @param cronExpression
+   * @param thresholdToSkipScheduling represents number of days
+   */
+  public static boolean isNextRunWithinRangeToSchedule(String cronExpression, int thresholdToSkipScheduling) {

Review Comment:
   `@VisibleForTesting`?



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java:
##########
@@ -231,17 +247,59 @@ public void run() {
     }
   }
 
-  /** Helps modify spec before adding to scheduler for adhoc flows */
+  /** Check that a spec should be scheduled and if it is, modify the spec of an adhoc flow before adding to scheduler*/
   private void addSpecHelperMethod(Spec spec) {
-    // Disable FLOW_RUN_IMMEDIATELY on service startup or leadership change if the property is set to true
-    if (spec instanceof FlowSpec && PropertiesUtils
-        .getPropAsBoolean(((FlowSpec) spec).getConfigAsProperties(), ConfigurationKeys.FLOW_RUN_IMMEDIATELY,
-            "false")) {
-      Spec modifiedSpec = disableFlowRunImmediatelyOnStart((FlowSpec) spec);
-      onAddSpec(modifiedSpec);
+    // Adhoc flows will not have any job schedule key, but we should schedule them
+    FlowSpec flowSpec = (FlowSpec) spec;
+    if (!flowSpec.getConfig().hasPath(ConfigurationKeys.JOB_SCHEDULE_KEY)
+        || isNextRunWithinRangeToSchedule(flowSpec.getConfig().getString(ConfigurationKeys.JOB_SCHEDULE_KEY),
+        this.thresholdToSkipSchedulingFlowsAfter)) {
+      // Disable FLOW_RUN_IMMEDIATELY on service startup or leadership change if the property is set to true
+      if (spec instanceof FlowSpec && PropertiesUtils.getPropAsBoolean(((FlowSpec) spec).getConfigAsProperties(),
+          ConfigurationKeys.FLOW_RUN_IMMEDIATELY, "false")) {
+        Spec modifiedSpec = disableFlowRunImmediatelyOnStart((FlowSpec) spec);
+        onAddSpec(modifiedSpec);
+      } else {
+        onAddSpec(spec);
+      }
     } else {
-      onAddSpec(spec);
+      _log.info("Not scheduling spec {} during startup as next job to schedule is outside of threshold.", spec);
+    }
+  }
+
+  /**
+   * Given a cron expression calculates the time for next run in days from current time, rounding up to the nearest day.
+   * @param cronExpression
+   * @return num days until next run, max integer in the case it cannot be calculated
+   */
+  public static int nextRunInDays(String cronExpression) {
+    CronExpression cron = null;
+    Calendar cal = Calendar.getInstance(TimeZone.getTimeZone("UTC"));
+    double numMillisInADay = 86400000;
+    try {
+      cron = new CronExpression(cronExpression);
+      cron.setTimeZone(TimeZone.getTimeZone("UTC"));
+      Date now = new Date();
+      Date nextValidTimeAfter = cron.getNextValidTimeAfter(now);
+      cal.setTime(nextValidTimeAfter);
+      long diff = cal.getTimeInMillis() - System.currentTimeMillis();
+      double diffInDays = diff / numMillisInADay;
+      return (int) Math.round(diffInDays);
+    } catch (ParseException e) {
+      e.printStackTrace();
     }
+    return -1;
+  }
+
+  /**
+   * Returns true if next run for the given cron schedule is sooner than the threshold to skip scheduling after, false
+   * otherwise. If the cron expression cannot be parsed and the next run cannot be calculated returns true to schedule.
+   * @param cronExpression
+   * @param thresholdToSkipScheduling represents number of days
+   */
+  public static boolean isNextRunWithinRangeToSchedule(String cronExpression, int thresholdToSkipScheduling) {
+    int days = nextRunInDays(cronExpression);
+    return days < thresholdToSkipScheduling;

Review Comment:
   this and its helper seem possible to streamline; e.g:
   ```
   boolean isWithinRange(...) {
     CronExpression cron = ...
     long nextCronEventAsEpochSecs = cron.getNextValidTimeAfter(new Date()).getTime();
     return nextCronEventAsEpochSecs <= Instant.now().plus(maxNumDaysToScheduleWithin, ChronoUnit.DAYS).getEpochSecond();
   }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@gobblin.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org