You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by ab...@apache.org on 2018/02/05 19:40:14 UTC
incubator-gobblin git commit: [GOBBLIN-404] Disable immediate
execution of all flows in FlowCatalog on Gobblin Service restart[]
Repository: incubator-gobblin
Updated Branches:
refs/heads/master de83a3fb5 -> 94bcc1694
[GOBBLIN-404] Disable immediate execution of all flows in FlowCatalog on Gobblin Service restart[]
Closes #2279 from sv2000/gaasScheduler
Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/94bcc169
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/94bcc169
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/94bcc169
Branch: refs/heads/master
Commit: 94bcc1694ae5575cc1dcfcba12b0efab3ec8ac4e
Parents: de83a3f
Author: suvasude <su...@linkedin.biz>
Authored: Mon Feb 5 11:40:08 2018 -0800
Committer: Abhishek Tiwari <ab...@gmail.com>
Committed: Mon Feb 5 11:40:08 2018 -0800
----------------------------------------------------------------------
.../scheduler/GobblinServiceJobScheduler.java | 67 ++++++++++++++------
.../GobblinServiceJobSchedulerTest.java | 59 +++++++++++++++++
2 files changed, 106 insertions(+), 20 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/94bcc169/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java
----------------------------------------------------------------------
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java
index 5c26445..9cb39fb 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java
@@ -23,8 +23,10 @@ import java.util.Map;
import java.util.Properties;
import java.util.UUID;
+
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
+
import org.apache.commons.lang.StringUtils;
import org.apache.helix.HelixManager;
import org.apache.helix.InstanceType;
@@ -37,9 +39,11 @@ import org.quartz.UnableToInterruptJobException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Optional;
import com.google.common.collect.Maps;
import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
import org.apache.gobblin.annotation.Alpha;
import org.apache.gobblin.configuration.ConfigurationKeys;
@@ -77,9 +81,10 @@ public class GobblinServiceJobScheduler extends JobScheduler implements SpecCata
@Getter
protected volatile boolean isActive;
- public GobblinServiceJobScheduler(Config config, Optional<HelixManager> helixManager, Optional<FlowCatalog> flowCatalog,
- Optional<TopologyCatalog> topologyCatalog, Orchestrator orchestrator, SchedulerService schedulerService,
- Optional<Logger> log) throws Exception {
+ public GobblinServiceJobScheduler(Config config, Optional<HelixManager> helixManager,
+ Optional<FlowCatalog> flowCatalog, Optional<TopologyCatalog> topologyCatalog, Orchestrator orchestrator,
+ SchedulerService schedulerService, Optional<Logger> log)
+ throws Exception {
super(ConfigUtils.configToProperties(config), schedulerService);
_log = log.isPresent() ? log.get() : LoggerFactory.getLogger(getClass());
@@ -90,13 +95,14 @@ public class GobblinServiceJobScheduler extends JobScheduler implements SpecCata
this.scheduledFlowSpecs = Maps.newHashMap();
}
- public GobblinServiceJobScheduler(Config config, Optional<HelixManager> helixManager, Optional<FlowCatalog> flowCatalog,
- Optional<TopologyCatalog> topologyCatalog, SchedulerService schedulerService, Optional<Logger> log) throws Exception {
+ public GobblinServiceJobScheduler(Config config, Optional<HelixManager> helixManager,
+ Optional<FlowCatalog> flowCatalog, Optional<TopologyCatalog> topologyCatalog, SchedulerService schedulerService,
+ Optional<Logger> log)
+ throws Exception {
this(config, helixManager, flowCatalog, topologyCatalog, new Orchestrator(config, topologyCatalog, log),
schedulerService, log);
}
-
public synchronized void setActive(boolean isActive) {
if (this.isActive == isActive) {
// No-op if already in correct state
@@ -111,7 +117,13 @@ public class GobblinServiceJobScheduler extends JobScheduler implements SpecCata
if (this.flowCatalog.isPresent()) {
Collection<Spec> specs = this.flowCatalog.get().getSpecsWithTimeUpdate();
for (Spec spec : specs) {
- onAddSpec(spec);
+ //Disable FLOW_RUN_IMMEDIATELY on service startup or leadership change
+ if (spec instanceof FlowSpec) {
+ Spec modifiedSpec = disableFlowRunImmediatelyOnStart((FlowSpec) spec);
+ onAddSpec(modifiedSpec);
+ } else {
+ onAddSpec(spec);
+ }
}
}
}
@@ -126,8 +138,19 @@ public class GobblinServiceJobScheduler extends JobScheduler implements SpecCata
}
}
+ @VisibleForTesting
+ protected static Spec disableFlowRunImmediatelyOnStart(FlowSpec spec) {
+ Properties properties = spec.getConfigAsProperties();
+ properties.setProperty(ConfigurationKeys.FLOW_RUN_IMMEDIATELY, "false");
+ Config config = ConfigFactory.parseProperties(properties);
+ FlowSpec flowSpec = new FlowSpec(spec.getUri(), spec.getVersion(), spec.getDescription(), config, properties,
+ spec.getTemplateURIs(), spec.getChildSpecs());
+ return flowSpec;
+ }
+
@Override
- protected void startUp() throws Exception {
+ protected void startUp()
+ throws Exception {
super.startUp();
}
@@ -135,7 +158,8 @@ public class GobblinServiceJobScheduler extends JobScheduler implements SpecCata
* Synchronize the job scheduling because the same flowSpec can be scheduled by different threads.
*/
@Override
- public synchronized void scheduleJob(Properties jobProps, JobListener jobListener) throws JobException {
+ public synchronized void scheduleJob(Properties jobProps, JobListener jobListener)
+ throws JobException {
Map<String, Object> additionalJobDataMap = Maps.newHashMap();
additionalJobDataMap.put(ServiceConfigKeys.GOBBLIN_SERVICE_FLOWSPEC,
this.scheduledFlowSpecs.get(jobProps.getProperty(ConfigurationKeys.JOB_NAME_KEY)));
@@ -148,7 +172,8 @@ public class GobblinServiceJobScheduler extends JobScheduler implements SpecCata
}
@Override
- public void runJob(Properties jobProps, JobListener jobListener) throws JobException {
+ public void runJob(Properties jobProps, JobListener jobListener)
+ throws JobException {
try {
Spec flowSpec = this.scheduledFlowSpecs.get(jobProps.getProperty(ConfigurationKeys.JOB_NAME_KEY));
this.orchestrator.orchestrate(flowSpec);
@@ -185,10 +210,11 @@ public class GobblinServiceJobScheduler extends JobScheduler implements SpecCata
jobConfig.setProperty(ConfigurationKeys.JOB_GROUP_KEY,
((FlowSpec) addedSpec).getConfig().getValue(ConfigurationKeys.FLOW_GROUP_KEY).toString());
jobConfig.setProperty(ConfigurationKeys.FLOW_RUN_IMMEDIATELY,
- ConfigUtils.getString(((FlowSpec) addedSpec).getConfig(), ConfigurationKeys.FLOW_RUN_IMMEDIATELY,"false"));
- if (flowSpecProperties.containsKey(ConfigurationKeys.JOB_SCHEDULE_KEY)
- && StringUtils.isNotBlank(flowSpecProperties.getProperty(ConfigurationKeys.JOB_SCHEDULE_KEY))) {
- jobConfig.setProperty(ConfigurationKeys.JOB_SCHEDULE_KEY, flowSpecProperties.getProperty(ConfigurationKeys.JOB_SCHEDULE_KEY));
+ ConfigUtils.getString(((FlowSpec) addedSpec).getConfig(), ConfigurationKeys.FLOW_RUN_IMMEDIATELY, "false"));
+ if (flowSpecProperties.containsKey(ConfigurationKeys.JOB_SCHEDULE_KEY) && StringUtils
+ .isNotBlank(flowSpecProperties.getProperty(ConfigurationKeys.JOB_SCHEDULE_KEY))) {
+ jobConfig.setProperty(ConfigurationKeys.JOB_SCHEDULE_KEY,
+ flowSpecProperties.getProperty(ConfigurationKeys.JOB_SCHEDULE_KEY));
}
this.scheduledFlowSpecs.put(addedSpec.getUri().toString(), addedSpec);
@@ -223,8 +249,9 @@ public class GobblinServiceJobScheduler extends JobScheduler implements SpecCata
if (!isActive && helixManager.isPresent()) {
_log.info("Scheduler running in slave mode, forward Spec delete via Helix message to master: " + deletedSpecURI);
- HelixUtils.sendUserDefinedMessage(ServiceConfigKeys.HELIX_FLOWSPEC_REMOVE, deletedSpecURI.toString() + ":" +
- deletedSpecVersion, UUID.randomUUID().toString(), InstanceType.CONTROLLER, helixManager.get(), _log);
+ HelixUtils.sendUserDefinedMessage(ServiceConfigKeys.HELIX_FLOWSPEC_REMOVE,
+ deletedSpecURI.toString() + ":" + deletedSpecVersion, UUID.randomUUID().toString(), InstanceType.CONTROLLER,
+ helixManager.get(), _log);
return;
}
@@ -235,12 +262,12 @@ public class GobblinServiceJobScheduler extends JobScheduler implements SpecCata
this.scheduledFlowSpecs.remove(deletedSpecURI.toString());
unscheduleJob(deletedSpecURI.toString());
} else {
- _log.warn(String.format("Spec with URI: %s was not found in cache. May be it was cleaned, if not please "
- + "clean it manually", deletedSpecURI));
+ _log.warn(String.format(
+ "Spec with URI: %s was not found in cache. May be it was cleaned, if not please " + "clean it manually",
+ deletedSpecURI));
}
} catch (JobException e) {
- _log.warn(String.format("Spec with URI: %s was not unscheduled cleaning",
- deletedSpecURI), e);
+ _log.warn(String.format("Spec with URI: %s was not unscheduled cleaning", deletedSpecURI), e);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/94bcc169/gobblin-service/src/test/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobSchedulerTest.java
----------------------------------------------------------------------
diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobSchedulerTest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobSchedulerTest.java
new file mode 100644
index 0000000..a6e1bc5
--- /dev/null
+++ b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobSchedulerTest.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.service.modules.scheduler;
+
+import java.net.URI;
+import java.util.Properties;
+
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.runtime.api.FlowSpec;
+
+
+public class GobblinServiceJobSchedulerTest {
+ private static final String TEST_GROUP_NAME = "testGroup";
+ private static final String TEST_FLOW_NAME = "testFlow";
+ private static final String TEST_SCHEDULE = "0 1/0 * ? * *";
+ private static final String TEST_TEMPLATE_URI = "FS:///templates/test.template";
+
+ @Test
+ public void testDisableFlowRunImmediatelyOnStart()
+ throws Exception {
+ Properties properties = new Properties();
+ properties.setProperty(ConfigurationKeys.FLOW_RUN_IMMEDIATELY, "true");
+ properties.setProperty(ConfigurationKeys.JOB_SCHEDULE_KEY, TEST_SCHEDULE);
+ properties.setProperty(ConfigurationKeys.JOB_GROUP_KEY, TEST_GROUP_NAME);
+ properties.setProperty(ConfigurationKeys.JOB_NAME_KEY, TEST_FLOW_NAME);
+ Config config = ConfigFactory.parseProperties(properties);
+ FlowSpec spec = FlowSpec.builder().withTemplate(new URI(TEST_TEMPLATE_URI)).withVersion("version")
+ .withConfigAsProperties(properties).withConfig(config).build();
+ FlowSpec modifiedSpec = (FlowSpec) GobblinServiceJobScheduler.disableFlowRunImmediatelyOnStart(spec);
+ for (URI templateURI : modifiedSpec.getTemplateURIs().get()) {
+ Assert.assertEquals(templateURI.toString(), TEST_TEMPLATE_URI);
+ }
+ Assert.assertEquals(modifiedSpec.getVersion(), "version");
+ Config modifiedConfig = modifiedSpec.getConfig();
+ Assert.assertFalse(modifiedConfig.getBoolean(ConfigurationKeys.FLOW_RUN_IMMEDIATELY));
+ Assert.assertEquals(modifiedConfig.getString(ConfigurationKeys.JOB_GROUP_KEY), TEST_GROUP_NAME);
+ Assert.assertEquals(modifiedConfig.getString(ConfigurationKeys.JOB_NAME_KEY), TEST_FLOW_NAME);
+ }
+}
\ No newline at end of file