You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by ab...@apache.org on 2018/03/21 08:30:34 UTC

[16/50] incubator-gobblin git commit: [GOBBLIN-404] Disable immediate execution of all flows in FlowCatalog on Gobblin Service restart[]

[GOBBLIN-404] Disable immediate execution of all flows in FlowCatalog on Gobblin Service restart[]

Closes #2279 from sv2000/gaasScheduler


Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/94bcc169
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/94bcc169
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/94bcc169

Branch: refs/heads/0.12.0
Commit: 94bcc1694ae5575cc1dcfcba12b0efab3ec8ac4e
Parents: de83a3f
Author: suvasude <su...@linkedin.biz>
Authored: Mon Feb 5 11:40:08 2018 -0800
Committer: Abhishek Tiwari <ab...@gmail.com>
Committed: Mon Feb 5 11:40:08 2018 -0800

----------------------------------------------------------------------
 .../scheduler/GobblinServiceJobScheduler.java   | 67 ++++++++++++++------
 .../GobblinServiceJobSchedulerTest.java         | 59 +++++++++++++++++
 2 files changed, 106 insertions(+), 20 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/94bcc169/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java
----------------------------------------------------------------------
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java
index 5c26445..9cb39fb 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java
@@ -23,8 +23,10 @@ import java.util.Map;
 import java.util.Properties;
 
 import java.util.UUID;
+
 import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
+
 import org.apache.commons.lang.StringUtils;
 import org.apache.helix.HelixManager;
 import org.apache.helix.InstanceType;
@@ -37,9 +39,11 @@ import org.quartz.UnableToInterruptJobException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Optional;
 import com.google.common.collect.Maps;
 import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
 
 import org.apache.gobblin.annotation.Alpha;
 import org.apache.gobblin.configuration.ConfigurationKeys;
@@ -77,9 +81,10 @@ public class GobblinServiceJobScheduler extends JobScheduler implements SpecCata
   @Getter
   protected volatile boolean isActive;
 
-  public GobblinServiceJobScheduler(Config config, Optional<HelixManager> helixManager, Optional<FlowCatalog> flowCatalog,
-      Optional<TopologyCatalog> topologyCatalog, Orchestrator orchestrator, SchedulerService schedulerService,
-      Optional<Logger> log) throws Exception {
+  public GobblinServiceJobScheduler(Config config, Optional<HelixManager> helixManager,
+      Optional<FlowCatalog> flowCatalog, Optional<TopologyCatalog> topologyCatalog, Orchestrator orchestrator,
+      SchedulerService schedulerService, Optional<Logger> log)
+      throws Exception {
     super(ConfigUtils.configToProperties(config), schedulerService);
 
     _log = log.isPresent() ? log.get() : LoggerFactory.getLogger(getClass());
@@ -90,13 +95,14 @@ public class GobblinServiceJobScheduler extends JobScheduler implements SpecCata
     this.scheduledFlowSpecs = Maps.newHashMap();
   }
 
-  public GobblinServiceJobScheduler(Config config, Optional<HelixManager> helixManager, Optional<FlowCatalog> flowCatalog,
-      Optional<TopologyCatalog> topologyCatalog, SchedulerService schedulerService, Optional<Logger> log) throws Exception {
+  public GobblinServiceJobScheduler(Config config, Optional<HelixManager> helixManager,
+      Optional<FlowCatalog> flowCatalog, Optional<TopologyCatalog> topologyCatalog, SchedulerService schedulerService,
+      Optional<Logger> log)
+      throws Exception {
     this(config, helixManager, flowCatalog, topologyCatalog, new Orchestrator(config, topologyCatalog, log),
         schedulerService, log);
   }
 
-
   public synchronized void setActive(boolean isActive) {
     if (this.isActive == isActive) {
       // No-op if already in correct state
@@ -111,7 +117,13 @@ public class GobblinServiceJobScheduler extends JobScheduler implements SpecCata
       if (this.flowCatalog.isPresent()) {
         Collection<Spec> specs = this.flowCatalog.get().getSpecsWithTimeUpdate();
         for (Spec spec : specs) {
-          onAddSpec(spec);
+          //Disable FLOW_RUN_IMMEDIATELY on service startup or leadership change
+          if (spec instanceof FlowSpec) {
+            Spec modifiedSpec = disableFlowRunImmediatelyOnStart((FlowSpec) spec);
+            onAddSpec(modifiedSpec);
+          } else {
+            onAddSpec(spec);
+          }
         }
       }
     }
@@ -126,8 +138,19 @@ public class GobblinServiceJobScheduler extends JobScheduler implements SpecCata
     }
   }
 
+  @VisibleForTesting
+  protected static Spec disableFlowRunImmediatelyOnStart(FlowSpec spec) {
+    Properties properties = spec.getConfigAsProperties();
+    properties.setProperty(ConfigurationKeys.FLOW_RUN_IMMEDIATELY, "false");
+    Config config = ConfigFactory.parseProperties(properties);
+    FlowSpec flowSpec = new FlowSpec(spec.getUri(), spec.getVersion(), spec.getDescription(), config, properties,
+        spec.getTemplateURIs(), spec.getChildSpecs());
+    return flowSpec;
+  }
+
   @Override
-  protected void startUp() throws Exception {
+  protected void startUp()
+      throws Exception {
     super.startUp();
   }
 
@@ -135,7 +158,8 @@ public class GobblinServiceJobScheduler extends JobScheduler implements SpecCata
    * Synchronize the job scheduling because the same flowSpec can be scheduled by different threads.
    */
   @Override
-  public synchronized void scheduleJob(Properties jobProps, JobListener jobListener) throws JobException {
+  public synchronized void scheduleJob(Properties jobProps, JobListener jobListener)
+      throws JobException {
     Map<String, Object> additionalJobDataMap = Maps.newHashMap();
     additionalJobDataMap.put(ServiceConfigKeys.GOBBLIN_SERVICE_FLOWSPEC,
         this.scheduledFlowSpecs.get(jobProps.getProperty(ConfigurationKeys.JOB_NAME_KEY)));
@@ -148,7 +172,8 @@ public class GobblinServiceJobScheduler extends JobScheduler implements SpecCata
   }
 
   @Override
-  public void runJob(Properties jobProps, JobListener jobListener) throws JobException {
+  public void runJob(Properties jobProps, JobListener jobListener)
+      throws JobException {
     try {
       Spec flowSpec = this.scheduledFlowSpecs.get(jobProps.getProperty(ConfigurationKeys.JOB_NAME_KEY));
       this.orchestrator.orchestrate(flowSpec);
@@ -185,10 +210,11 @@ public class GobblinServiceJobScheduler extends JobScheduler implements SpecCata
         jobConfig.setProperty(ConfigurationKeys.JOB_GROUP_KEY,
             ((FlowSpec) addedSpec).getConfig().getValue(ConfigurationKeys.FLOW_GROUP_KEY).toString());
         jobConfig.setProperty(ConfigurationKeys.FLOW_RUN_IMMEDIATELY,
-            ConfigUtils.getString(((FlowSpec) addedSpec).getConfig(), ConfigurationKeys.FLOW_RUN_IMMEDIATELY,"false"));
-        if (flowSpecProperties.containsKey(ConfigurationKeys.JOB_SCHEDULE_KEY)
-            && StringUtils.isNotBlank(flowSpecProperties.getProperty(ConfigurationKeys.JOB_SCHEDULE_KEY))) {
-          jobConfig.setProperty(ConfigurationKeys.JOB_SCHEDULE_KEY, flowSpecProperties.getProperty(ConfigurationKeys.JOB_SCHEDULE_KEY));
+            ConfigUtils.getString(((FlowSpec) addedSpec).getConfig(), ConfigurationKeys.FLOW_RUN_IMMEDIATELY, "false"));
+        if (flowSpecProperties.containsKey(ConfigurationKeys.JOB_SCHEDULE_KEY) && StringUtils
+            .isNotBlank(flowSpecProperties.getProperty(ConfigurationKeys.JOB_SCHEDULE_KEY))) {
+          jobConfig.setProperty(ConfigurationKeys.JOB_SCHEDULE_KEY,
+              flowSpecProperties.getProperty(ConfigurationKeys.JOB_SCHEDULE_KEY));
         }
 
         this.scheduledFlowSpecs.put(addedSpec.getUri().toString(), addedSpec);
@@ -223,8 +249,9 @@ public class GobblinServiceJobScheduler extends JobScheduler implements SpecCata
 
     if (!isActive && helixManager.isPresent()) {
       _log.info("Scheduler running in slave mode, forward Spec delete via Helix message to master: " + deletedSpecURI);
-      HelixUtils.sendUserDefinedMessage(ServiceConfigKeys.HELIX_FLOWSPEC_REMOVE, deletedSpecURI.toString() + ":" +
-              deletedSpecVersion, UUID.randomUUID().toString(), InstanceType.CONTROLLER, helixManager.get(), _log);
+      HelixUtils.sendUserDefinedMessage(ServiceConfigKeys.HELIX_FLOWSPEC_REMOVE,
+          deletedSpecURI.toString() + ":" + deletedSpecVersion, UUID.randomUUID().toString(), InstanceType.CONTROLLER,
+          helixManager.get(), _log);
       return;
     }
 
@@ -235,12 +262,12 @@ public class GobblinServiceJobScheduler extends JobScheduler implements SpecCata
         this.scheduledFlowSpecs.remove(deletedSpecURI.toString());
         unscheduleJob(deletedSpecURI.toString());
       } else {
-        _log.warn(String.format("Spec with URI: %s was not found in cache. May be it was cleaned, if not please "
-                + "clean it manually", deletedSpecURI));
+        _log.warn(String.format(
+            "Spec with URI: %s was not found in cache. May be it was cleaned, if not please " + "clean it manually",
+            deletedSpecURI));
       }
     } catch (JobException e) {
-      _log.warn(String.format("Spec with URI: %s was not unscheduled cleaning",
-          deletedSpecURI), e);
+      _log.warn(String.format("Spec with URI: %s was not unscheduled cleaning", deletedSpecURI), e);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/94bcc169/gobblin-service/src/test/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobSchedulerTest.java
----------------------------------------------------------------------
diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobSchedulerTest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobSchedulerTest.java
new file mode 100644
index 0000000..a6e1bc5
--- /dev/null
+++ b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobSchedulerTest.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.service.modules.scheduler;
+
+import java.net.URI;
+import java.util.Properties;
+
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.runtime.api.FlowSpec;
+
+
+public class GobblinServiceJobSchedulerTest {
+  private static final String TEST_GROUP_NAME = "testGroup";
+  private static final String TEST_FLOW_NAME = "testFlow";
+  private static final String TEST_SCHEDULE = "0 1/0 * ? * *";
+  private static final String TEST_TEMPLATE_URI = "FS:///templates/test.template";
+
+  @Test
+  public void testDisableFlowRunImmediatelyOnStart()
+      throws Exception {
+    Properties properties = new Properties();
+    properties.setProperty(ConfigurationKeys.FLOW_RUN_IMMEDIATELY, "true");
+    properties.setProperty(ConfigurationKeys.JOB_SCHEDULE_KEY, TEST_SCHEDULE);
+    properties.setProperty(ConfigurationKeys.JOB_GROUP_KEY, TEST_GROUP_NAME);
+    properties.setProperty(ConfigurationKeys.JOB_NAME_KEY, TEST_FLOW_NAME);
+    Config config = ConfigFactory.parseProperties(properties);
+    FlowSpec spec = FlowSpec.builder().withTemplate(new URI(TEST_TEMPLATE_URI)).withVersion("version")
+        .withConfigAsProperties(properties).withConfig(config).build();
+    FlowSpec modifiedSpec = (FlowSpec) GobblinServiceJobScheduler.disableFlowRunImmediatelyOnStart(spec);
+    for (URI templateURI : modifiedSpec.getTemplateURIs().get()) {
+      Assert.assertEquals(templateURI.toString(), TEST_TEMPLATE_URI);
+    }
+    Assert.assertEquals(modifiedSpec.getVersion(), "version");
+    Config modifiedConfig = modifiedSpec.getConfig();
+    Assert.assertFalse(modifiedConfig.getBoolean(ConfigurationKeys.FLOW_RUN_IMMEDIATELY));
+    Assert.assertEquals(modifiedConfig.getString(ConfigurationKeys.JOB_GROUP_KEY), TEST_GROUP_NAME);
+    Assert.assertEquals(modifiedConfig.getString(ConfigurationKeys.JOB_NAME_KEY), TEST_FLOW_NAME);
+  }
+}
\ No newline at end of file