You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by hu...@apache.org on 2019/04/04 20:25:43 UTC

[incubator-gobblin] branch master updated: [GOBBLIN-722] add option to unschedule a flow set schedule even if the job is already scheduled

This is an automated email from the ASF dual-hosted git repository.

hutran pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 1c2312e  [GOBBLIN-722] add option to unschedule a flow set schedule even if the job is already scheduled
1c2312e is described below

commit 1c2312e5b1490f99ccbcc9372e6974cb539c50a8
Author: Arjun <ab...@linkedin.com>
AuthorDate: Thu Apr 4 13:25:32 2019 -0700

    [GOBBLIN-722] add option to unschedule a flow
    set schedule even if the job is already scheduled
    
    Closes #2589 from arjun4084346/unscheduleFlow
---
 .../gobblin/configuration/ConfigurationKeys.java     |  1 +
 .../org/apache/gobblin/service/FlowConfigTest.java   | 20 ++++++++++++++++++++
 .../service/FlowConfigResourceLocalHandler.java      | 11 +++++++++++
 .../org/apache/gobblin/scheduler/JobScheduler.java   |  4 ++--
 4 files changed, 34 insertions(+), 2 deletions(-)

diff --git a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
index c6ba28d..75fbc35 100644
--- a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
+++ b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
@@ -137,6 +137,7 @@ public class ConfigurationKeys {
   public static final String FLOW_APPLY_INPUT_RETENTION = "flow.applyInputRetention";
   public static final String FLOW_ALLOW_CONCURRENT_EXECUTION = "flow.allowConcurrentExecution";
   public static final String FLOW_EXPLAIN_KEY = "flow.explain";
+  public static final String FLOW_UNSCHEDULE_KEY = "flow.unschedule";
 
   /**
    * Common topology configuration properties.
diff --git a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/test/java/org/apache/gobblin/service/FlowConfigTest.java b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/test/java/org/apache/gobblin/service/FlowConfigTest.java
index 3a15cb4..1e549e8 100644
--- a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/test/java/org/apache/gobblin/service/FlowConfigTest.java
+++ b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/test/java/org/apache/gobblin/service/FlowConfigTest.java
@@ -222,6 +222,26 @@ public class FlowConfigTest {
   }
 
   @Test (dependsOnMethods = "testUpdate")
+  public void testUnschedule() throws Exception {
+    FlowId flowId = new FlowId().setFlowGroup(TEST_GROUP_NAME).setFlowName(TEST_FLOW_NAME);
+    Map<String, String> flowProperties = Maps.newHashMap();
+    flowProperties.put("param1", "value1");
+    flowProperties.put(ConfigurationKeys.FLOW_UNSCHEDULE_KEY, "true");
+
+    FlowConfig flowConfig = new FlowConfig().setId(flowId)
+        .setTemplateUris(TEST_TEMPLATE_URI).setSchedule(new Schedule().setCronSchedule(TEST_SCHEDULE).
+            setRunImmediately(true))
+        .setProperties(new StringMap(flowProperties));
+
+    _client.updateFlowConfig(flowConfig);
+
+    FlowConfig persistedFlowConfig = _client.getFlowConfig(flowId);
+
+    Assert.assertFalse(persistedFlowConfig.getProperties().containsKey(ConfigurationKeys.FLOW_UNSCHEDULE_KEY));
+    Assert.assertEquals(persistedFlowConfig.getSchedule().getCronSchedule(), FlowConfigResourceLocalHandler.NEVER_RUN_CRON_SCHEDULE.getCronSchedule());
+  }
+
+  @Test (dependsOnMethods = "testUnschedule")
   public void testDelete() throws Exception {
     FlowId flowId = new FlowId().setFlowGroup(TEST_GROUP_NAME).setFlowName(TEST_FLOW_NAME);
 
diff --git a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigResourceLocalHandler.java b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigResourceLocalHandler.java
index 9b05767..a82e760 100644
--- a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigResourceLocalHandler.java
+++ b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigResourceLocalHandler.java
@@ -48,6 +48,7 @@ import org.apache.gobblin.runtime.spec_catalog.FlowCatalog;
  */
 @Slf4j
 public class FlowConfigResourceLocalHandler implements FlowConfigsResourceHandler {
+  public static final Schedule NEVER_RUN_CRON_SCHEDULE = new Schedule().setCronSchedule("0 0 0 ? 1 1 2050");
   @Getter
   protected FlowCatalog flowCatalog;
   public FlowConfigResourceLocalHandler(FlowCatalog flowCatalog) {
@@ -142,11 +143,21 @@ public class FlowConfigResourceLocalHandler implements FlowConfigsResourceHandle
       throw new FlowConfigLoggedException(HttpStatus.S_400_BAD_REQUEST,
           "flowName and flowGroup cannot be changed in update", null);
     }
+    if (isUnscheduleRequest(flowConfig)) {
+      // flow config is not changed if it is just a request to un-schedule
+      FlowConfig originalFlowConfig = getFlowConfig(flowId);
+      originalFlowConfig.setSchedule(NEVER_RUN_CRON_SCHEDULE);
+      flowConfig = originalFlowConfig;
+    }
 
     this.flowCatalog.put(createFlowSpecForConfig(flowConfig), triggerListener);
     return new UpdateResponse(HttpStatus.S_200_OK);
   }
 
+  private boolean isUnscheduleRequest(FlowConfig flowConfig) {
+    return Boolean.parseBoolean(flowConfig.getProperties().getOrDefault(ConfigurationKeys.FLOW_UNSCHEDULE_KEY, "false"));
+  }
+
   /**
    * Update flowConfig locally and trigger all listeners
    */
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/scheduler/JobScheduler.java b/gobblin-runtime/src/main/java/org/apache/gobblin/scheduler/JobScheduler.java
index c737431..a675d99 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/scheduler/JobScheduler.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/scheduler/JobScheduler.java
@@ -351,8 +351,8 @@ public class JobScheduler extends AbstractIdleService {
     String jobName = jobProps.getProperty(ConfigurationKeys.JOB_NAME_KEY);
 
     if (this.scheduledJobs.containsKey(jobName)) {
-      LOG.warn("Job " + jobName + " has already been scheduled");
-      return;
+      LOG.info("Job " + jobName + " was already scheduled, un-scheduling it now.");
+      unscheduleJob(jobName);
     }
 
     // Check if the job has been disabled