You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@gobblin.apache.org by GitBox <gi...@apache.org> on 2021/02/23 18:19:24 UTC

[GitHub] [gobblin] autumnust commented on a change in pull request #3229: GOBBLIN-1390: Add an option to run a subset of jobs from job config d…

autumnust commented on a change in pull request #3229:
URL: https://github.com/apache/gobblin/pull/3229#discussion_r581276828



##########
File path: gobblin-cluster/src/main/java/org/apache/gobblin/cluster/JobConfigurationManager.java
##########
@@ -21,12 +21,18 @@
 import java.io.IOException;
 import java.util.List;
 import java.util.Properties;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import java.util.regex.PatternSyntaxException;
 
 import org.apache.gobblin.cluster.event.CancelJobConfigArrivalEvent;
 import org.apache.gobblin.runtime.job_spec.JobSpecResolver;
+
+import org.apache.parquet.Strings;

Review comment:
       Seems wrong imports

##########
File path: gobblin-cluster/src/main/java/org/apache/gobblin/cluster/JobConfigurationManager.java
##########
@@ -106,14 +120,31 @@ protected void startUp() throws Exception {
         List<Properties> jobConfigs = SchedulerUtils.loadGenericJobConfigs(properties, this.jobSpecResolver);
         LOGGER.info("Loaded " + jobConfigs.size() + " job configuration(s)");
         for (Properties config : jobConfigs) {
-          postNewJobConfigArrival(config.getProperty(ConfigurationKeys.JOB_NAME_KEY), config);
+          if (!jobsToRun.isPresent() || shouldRun(jobsToRun.get(), config)) {
+            postNewJobConfigArrival(config.getProperty(ConfigurationKeys.JOB_NAME_KEY), config);
+          } else {
+            LOGGER.error("Job {} has been filtered and will not be run in the cluster.", config.getProperty(ConfigurationKeys.JOB_NAME_KEY));
+          }
         }
       } else {
         LOGGER.warn("Job configuration directory " + jobConfigDir + " not found");
       }
     }
   }
 
+  @VisibleForTesting
+  /**
+   * A helper method to determine if a given job should be submitted to cluster for execution based on the
+   * regex defining the jobs to run.
+   */
+  protected static boolean shouldRun(Pattern jobsToRun, Properties jobConfig) {
+    Matcher matcher = jobsToRun.matcher(jobConfig.getProperty(ConfigurationKeys.JOB_NAME_KEY));
+    if (matcher.matches()) {

Review comment:
       return `matcher.matches()` ? 

##########
File path: gobblin-cluster/src/main/java/org/apache/gobblin/cluster/JobConfigurationManager.java
##########
@@ -106,14 +120,31 @@ protected void startUp() throws Exception {
         List<Properties> jobConfigs = SchedulerUtils.loadGenericJobConfigs(properties, this.jobSpecResolver);
         LOGGER.info("Loaded " + jobConfigs.size() + " job configuration(s)");
         for (Properties config : jobConfigs) {
-          postNewJobConfigArrival(config.getProperty(ConfigurationKeys.JOB_NAME_KEY), config);
+          if (!jobsToRun.isPresent() || shouldRun(jobsToRun.get(), config)) {
+            postNewJobConfigArrival(config.getProperty(ConfigurationKeys.JOB_NAME_KEY), config);
+          } else {
+            LOGGER.error("Job {} has been filtered and will not be run in the cluster.", config.getProperty(ConfigurationKeys.JOB_NAME_KEY));

Review comment:
       warn instead? 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org