You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by su...@apache.org on 2019/09/24 22:01:00 UTC

[incubator-gobblin] branch master updated: [GOBBLIN-701] Add secure templates (duplicate of #2571)

This is an automated email from the ASF dual-hosted git repository.

suvasude pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 7d260ab  [GOBBLIN-701] Add secure templates (duplicate of #2571)
7d260ab is described below

commit 7d260ab2c32a773e317a31cd264f5c97d2d48a10
Author: Jack Moseley <jm...@linkedin.com>
AuthorDate: Tue Sep 24 15:00:52 2019 -0700

    [GOBBLIN-701] Add secure templates (duplicate of #2571)
    
    Closes #2739 from jack-moseley/secure-template
---
 .../gobblin/aws/AWSJobConfigurationManager.java    |   9 +-
 .../gobblin/cluster/JobConfigurationManager.java   |  10 +-
 .../apache/gobblin/azkaban/AzkabanJobLauncher.java |  13 +-
 .../orchestration/AzkabanProjectConfigTest.java    |  12 +-
 .../org/apache/gobblin/runtime/api/JobSpec.java    |  15 +-
 .../apache/gobblin/runtime/api/JobTemplate.java    |   3 +-
 .../gobblin/runtime/api/SecureJobTemplate.java     |  69 +++++++++
 .../gobblin/runtime/embedded/EmbeddedGobblin.java  |  12 +-
 .../runtime/job_spec/JobResolutionCallbacks.java   |  42 ++++++
 .../gobblin/runtime/job_spec/JobSpecResolver.java  | 167 +++++++++++++++++++++
 .../gobblin/runtime/job_spec/ResolvedJobSpec.java  |  18 ++-
 .../runtime/job_spec/SecureTemplateEnforcer.java   |  43 ++++++
 .../runtime/template/StaticJobTemplate.java        |  24 ++-
 .../org/apache/gobblin/scheduler/JobScheduler.java |   9 +-
 .../PathAlterationListenerAdaptorForMonitor.java   |   7 +-
 .../org/apache/gobblin/util/SchedulerUtils.java    |  30 ++--
 .../gobblin/runtime/api/SecureJobTemplateTest.java |  79 ++++++++++
 .../runtime/job_spec/JobSpecResolverTest.java      | 100 ++++++++++++
 .../runtime/template/StaticJobTemplateTest.java    |  25 +++
 .../apache/gobblin/util/SchedulerUtilsTest.java    |  18 ++-
 .../modules/template/StaticFlowTemplate.java       |  28 +++-
 21 files changed, 678 insertions(+), 55 deletions(-)

diff --git a/gobblin-aws/src/main/java/org/apache/gobblin/aws/AWSJobConfigurationManager.java b/gobblin-aws/src/main/java/org/apache/gobblin/aws/AWSJobConfigurationManager.java
index 042503f..cc99373 100644
--- a/gobblin-aws/src/main/java/org/apache/gobblin/aws/AWSJobConfigurationManager.java
+++ b/gobblin-aws/src/main/java/org/apache/gobblin/aws/AWSJobConfigurationManager.java
@@ -33,6 +33,7 @@ import java.util.concurrent.TimeUnit;
 import java.util.zip.ZipEntry;
 import java.util.zip.ZipFile;
 
+import org.apache.gobblin.runtime.job_spec.JobSpecResolver;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -87,6 +88,7 @@ public class AWSJobConfigurationManager extends JobConfigurationManager {
   private final long refreshIntervalInSeconds;
 
   private final ScheduledExecutorService fetchJobConfExecutor;
+  private final JobSpecResolver jobSpecResolver;
 
   public AWSJobConfigurationManager(EventBus eventBus, Config config) {
     super(eventBus, config);
@@ -100,6 +102,11 @@ public class AWSJobConfigurationManager extends JobConfigurationManager {
 
     this.fetchJobConfExecutor = Executors.newSingleThreadScheduledExecutor(
         ExecutorsUtils.newThreadFactory(Optional.of(LOGGER), Optional.of("FetchJobConfExecutor")));
+    try {
+      this.jobSpecResolver = JobSpecResolver.builder(config).build();
+    } catch (IOException ioe) {
+      throw new RuntimeException(ioe);
+    }
   }
 
   private void fetchJobConfSettings() {
@@ -155,7 +162,7 @@ public class AWSJobConfigurationManager extends JobConfigurationManager {
         final Properties properties = ConfigUtils.configToProperties(this.config);
         properties.setProperty(ConfigurationKeys.JOB_CONFIG_FILE_GENERAL_PATH_KEY, jobConfigDir.getAbsolutePath());
 
-        final List<Properties> jobConfigs = SchedulerUtils.loadGenericJobConfigs(properties);
+        final List<Properties> jobConfigs = SchedulerUtils.loadGenericJobConfigs(properties, this.jobSpecResolver);
         LOGGER.info("Loaded " + jobConfigs.size() + " job configuration(s)");
         for (Properties config : jobConfigs) {
           LOGGER.debug("Config value: " + config);
diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/JobConfigurationManager.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/JobConfigurationManager.java
index 34ff439..c2ba8a4 100644
--- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/JobConfigurationManager.java
+++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/JobConfigurationManager.java
@@ -18,9 +18,11 @@
 package org.apache.gobblin.cluster;
 
 import java.io.File;
+import java.io.IOException;
 import java.util.List;
 import java.util.Properties;
 
+import org.apache.gobblin.runtime.job_spec.JobSpecResolver;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -66,6 +68,7 @@ public class JobConfigurationManager extends AbstractIdleService implements Stan
   protected final EventBus eventBus;
   protected final Config config;
   protected Optional<String> jobConfDirPath;
+  protected final JobSpecResolver jobSpecResolver;
 
   public JobConfigurationManager(EventBus eventBus, Config config) {
     this.eventBus = eventBus;
@@ -74,6 +77,11 @@ public class JobConfigurationManager extends AbstractIdleService implements Stan
     this.jobConfDirPath =
         config.hasPath(GobblinClusterConfigurationKeys.JOB_CONF_PATH_KEY) ? Optional
             .of(config.getString(GobblinClusterConfigurationKeys.JOB_CONF_PATH_KEY)) : Optional.<String>absent();
+    try {
+      this.jobSpecResolver = JobSpecResolver.builder(config).build();
+    } catch (IOException ioe) {
+      throw new RuntimeException(ioe);
+    }
   }
 
   @Override
@@ -94,7 +102,7 @@ public class JobConfigurationManager extends AbstractIdleService implements Stan
         LOGGER.info("Loading job configurations from " + jobConfigDir);
         Properties properties = ConfigUtils.configToProperties(this.config);
         properties.setProperty(ConfigurationKeys.JOB_CONFIG_FILE_GENERAL_PATH_KEY, "file://" + jobConfigDir.getAbsolutePath());
-        List<Properties> jobConfigs = SchedulerUtils.loadGenericJobConfigs(properties);
+        List<Properties> jobConfigs = SchedulerUtils.loadGenericJobConfigs(properties, this.jobSpecResolver);
         LOGGER.info("Loaded " + jobConfigs.size() + " job configuration(s)");
         for (Properties config : jobConfigs) {
           postNewJobConfigArrival(config.getProperty(ConfigurationKeys.JOB_NAME_KEY), config);
diff --git a/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/azkaban/AzkabanJobLauncher.java b/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/azkaban/AzkabanJobLauncher.java
index 05c1bb9..6b99f81 100644
--- a/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/azkaban/AzkabanJobLauncher.java
+++ b/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/azkaban/AzkabanJobLauncher.java
@@ -32,6 +32,9 @@ import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 
+import org.apache.gobblin.runtime.api.JobSpec;
+import org.apache.gobblin.runtime.job_spec.JobSpecResolver;
+import org.apache.gobblin.runtime.job_spec.ResolvedJobSpec;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.security.Credentials;
 import org.apache.log4j.Level;
@@ -66,7 +69,6 @@ import org.apache.gobblin.runtime.JobLauncherFactory;
 import org.apache.gobblin.runtime.app.ApplicationException;
 import org.apache.gobblin.runtime.app.ApplicationLauncher;
 import org.apache.gobblin.runtime.app.ServiceBasedAppLauncher;
-import org.apache.gobblin.runtime.job_catalog.PackagedTemplatesJobCatalogDecorator;
 import org.apache.gobblin.runtime.listeners.CompositeJobListener;
 import org.apache.gobblin.runtime.listeners.EmailNotificationJobListener;
 import org.apache.gobblin.runtime.listeners.JobListener;
@@ -199,10 +201,13 @@ public class AzkabanJobLauncher extends AbstractJob implements ApplicationLaunch
 
     Properties jobProps = this.props;
     if (jobProps.containsKey(TEMPLATE_KEY)) {
+      Config config = ConfigUtils.propertiesToConfig(jobProps);
+      JobSpecResolver resolver = JobSpecResolver.builder(config).build();
+
       URI templateUri = new URI(jobProps.getProperty(TEMPLATE_KEY));
-      Config resolvedJob = new PackagedTemplatesJobCatalogDecorator().getTemplate(templateUri)
-          .getResolvedConfig(ConfigUtils.propertiesToConfig(jobProps));
-      jobProps = ConfigUtils.configToProperties(resolvedJob);
+      JobSpec jobSpec = JobSpec.builder().withConfig(config).withTemplate(templateUri).build();
+      ResolvedJobSpec resolvedJob = resolver.resolveJobSpec(jobSpec);
+      jobProps = ConfigUtils.configToProperties(resolvedJob.getConfig());
     }
 
     GobblinMetrics.addCustomTagsToProperties(jobProps, tags);
diff --git a/gobblin-modules/gobblin-azkaban/src/test/java/org/apache/gobblin/service/modules/orchestration/AzkabanProjectConfigTest.java b/gobblin-modules/gobblin-azkaban/src/test/java/org/apache/gobblin/service/modules/orchestration/AzkabanProjectConfigTest.java
index 3a48806..f9f83d3 100644
--- a/gobblin-modules/gobblin-azkaban/src/test/java/org/apache/gobblin/service/modules/orchestration/AzkabanProjectConfigTest.java
+++ b/gobblin-modules/gobblin-azkaban/src/test/java/org/apache/gobblin/service/modules/orchestration/AzkabanProjectConfigTest.java
@@ -40,7 +40,7 @@ public class AzkabanProjectConfigTest {
 
     Properties properties = new Properties();
     JobSpec jobSpec = new JobSpec(new URI("uri"), "0.0", "test job spec",
-        ConfigUtils.propertiesToConfig(properties), properties, Optional.absent(), Collections.EMPTY_MAP);
+        ConfigUtils.propertiesToConfig(properties), properties, Optional.absent(), Optional.absent(), Collections.EMPTY_MAP);
     AzkabanProjectConfig azkabanProjectConfig = new AzkabanProjectConfig(jobSpec);
 
     String actualProjectName = azkabanProjectConfig.getAzkabanProjectName();
@@ -55,7 +55,7 @@ public class AzkabanProjectConfigTest {
     Properties properties = new Properties();
     properties.setProperty("gobblin.service.azkaban.project.namePrefix", "randomPrefix");
     JobSpec jobSpec = new JobSpec(new URI("http://localhost:8000/context"), "0.0", "test job spec",
-        ConfigUtils.propertiesToConfig(properties), properties, Optional.absent(), Collections.EMPTY_MAP);
+        ConfigUtils.propertiesToConfig(properties), properties, Optional.absent(), Optional.absent(), Collections.EMPTY_MAP);
     AzkabanProjectConfig azkabanProjectConfig = new AzkabanProjectConfig(jobSpec);
 
     String actualProjectName = azkabanProjectConfig.getAzkabanProjectName();
@@ -70,7 +70,8 @@ public class AzkabanProjectConfigTest {
     Properties properties = new Properties();
     properties.setProperty("gobblin.service.azkaban.project.namePrefix", "randomPrefixWithReallyLongName");
     JobSpec jobSpec = new JobSpec(new URI("http://localhost:8000/context/that-keeps-expanding-and-explanding"),
-        "0.0", "test job spec", ConfigUtils.propertiesToConfig(properties), properties, Optional.absent(), Collections.EMPTY_MAP);
+        "0.0", "test job spec", ConfigUtils.propertiesToConfig(properties), properties, Optional.absent(),
+        Optional.absent(), Collections.EMPTY_MAP);
     AzkabanProjectConfig azkabanProjectConfig = new AzkabanProjectConfig(jobSpec);
 
     String actualProjectName = azkabanProjectConfig.getAzkabanProjectName();
@@ -85,7 +86,7 @@ public class AzkabanProjectConfigTest {
     Properties properties = new Properties();
     properties.setProperty("gobblin.service.azkaban.project.namePrefix", "randomPrefix");
     JobSpec jobSpec = new JobSpec(new URI("http://localhost:8000/context"), "0.0", "test job spec",
-        ConfigUtils.propertiesToConfig(properties), properties, Optional.absent(), Collections.EMPTY_MAP);
+        ConfigUtils.propertiesToConfig(properties), properties, Optional.absent(), Optional.absent(), Collections.EMPTY_MAP);
     AzkabanProjectConfig azkabanProjectConfig = new AzkabanProjectConfig(jobSpec);
 
     String actualZipFileName = azkabanProjectConfig.getAzkabanProjectZipFilename();
@@ -100,7 +101,8 @@ public class AzkabanProjectConfigTest {
     Properties properties = new Properties();
     properties.setProperty("gobblin.service.azkaban.project.namePrefix", "randomPrefixWithReallyLongName");
     JobSpec jobSpec = new JobSpec(new URI("http://localhost:8000/context/that-keeps-expanding-and-explanding"),
-        "0.0", "test job spec", ConfigUtils.propertiesToConfig(properties), properties, Optional.absent(), Collections.EMPTY_MAP);
+        "0.0", "test job spec", ConfigUtils.propertiesToConfig(properties), properties, Optional.absent(),
+        Optional.absent(), Collections.EMPTY_MAP);
     AzkabanProjectConfig azkabanProjectConfig = new AzkabanProjectConfig(jobSpec);
 
     String actualZipFileName = azkabanProjectConfig.getAzkabanProjectZipFilename();
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/JobSpec.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/JobSpec.java
index 203ea8d..a6ca903 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/JobSpec.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/JobSpec.java
@@ -67,6 +67,8 @@ public class JobSpec implements Configurable, Spec {
   Properties configAsProperties;
   /** URI of {@link org.apache.gobblin.runtime.api.JobTemplate} to use. */
   Optional<URI> templateURI;
+  /** Specific instance of template. */
+  transient Optional<JobTemplate> jobTemplate;
 
   /** Metadata can contain properties which are not a part of config, e.g. Verb */
   Map<String, String> metadata;
@@ -141,6 +143,7 @@ public class JobSpec implements Configurable, Spec {
     private Optional<String> description = Optional.absent();
     private Optional<URI> jobCatalogURI = Optional.absent();
     private Optional<URI> templateURI = Optional.absent();
+    private Optional<JobTemplate> jobTemplate = Optional.absent();
     private Optional<Map> metadata = Optional.absent();
 
     public Builder(URI jobSpecUri) {
@@ -167,7 +170,7 @@ public class JobSpec implements Configurable, Spec {
       Preconditions.checkNotNull(this.uri);
       Preconditions.checkNotNull(this.version);
       return new JobSpec(getURI(), getVersion(), getDescription(), getConfig(),
-                         getConfigAsProperties(), getTemplateURI(), getMetadata());
+                         getConfigAsProperties(), getTemplateURI(), getTemplate(), getMetadata());
     }
 
     /** The scheme and authority of the job catalog URI are used to generate JobSpec URIs from
@@ -301,6 +304,16 @@ public class JobSpec implements Configurable, Spec {
       return this;
     }
 
+    public Optional<JobTemplate> getTemplate() {
+      return this.jobTemplate;
+    }
+
+    public Builder withTemplate(JobTemplate template) {
+      Preconditions.checkNotNull(template);
+      this.jobTemplate = Optional.of(template);
+      return this;
+    }
+
     public Map getDefaultMetadata() {
       log.warn("Job Spec Verb is not provided, using type 'UNKNOWN'.");
       return ImmutableMap.of(VERB_KEY, SpecExecutor.Verb.UNKNOWN.name());
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/JobTemplate.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/JobTemplate.java
index 1be22fc..b6460d9 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/JobTemplate.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/JobTemplate.java
@@ -54,7 +54,8 @@ public interface JobTemplate extends Spec {
 
   /**
    * Return the combine configuration of template and user customized attributes.
-   * @return
+   *
+   * Note, do not use this method directly, instead use {@link org.apache.gobblin.runtime.job_spec.ResolvedJobSpec}.
    */
   Config getResolvedConfig(Config userProps) throws SpecNotFoundException, TemplateException;
 
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/SecureJobTemplate.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/SecureJobTemplate.java
new file mode 100644
index 0000000..c6386d3
--- /dev/null
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/SecureJobTemplate.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.runtime.api;
+
+import java.util.Collection;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import org.slf4j.Logger;
+
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+
+
+/**
+ * A secure template is a {@link JobTemplate} which only allows the user configuration to specify a static set of
+ * keys. This allows most of the template to be non overridable to tightly control how a job executes.
+ */
+public interface SecureJobTemplate extends JobTemplate {
+
+	/**
+	 * Filter the user config to only preserve the keys allowed by a secure template.
+	 */
+	static Config filterUserConfig(SecureJobTemplate template, Config userConfig, Logger logger) {
+		if (!template.isSecure()) {
+			return userConfig;
+		}
+		Config survivingConfig = ConfigFactory.empty();
+		for (String key : template.overridableProperties()) {
+			if (userConfig.hasPath(key)) {
+				survivingConfig = survivingConfig.withValue(key, userConfig.getValue(key));
+				userConfig = userConfig.withoutPath(key);
+			}
+		}
+
+		if (!userConfig.isEmpty()) {
+			logger.warn(String.format("Secure template %s ignored the following keys because they are not overridable: %s",
+					template.getUri().toString(),
+					userConfig.entrySet().stream().map(Map.Entry::getKey).collect(Collectors.joining(", "))));
+		}
+
+		return survivingConfig;
+	}
+
+	/**
+	 * @return Whether the template is secure.
+	 */
+	boolean isSecure();
+
+	/**
+	 * @return If the template is secure, the collection of keys that can be overriden by the user.
+	 */
+	Collection<String> overridableProperties();
+}
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/embedded/EmbeddedGobblin.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/embedded/EmbeddedGobblin.java
index dced320..1a1e858 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/embedded/EmbeddedGobblin.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/embedded/EmbeddedGobblin.java
@@ -38,6 +38,7 @@ import java.util.concurrent.locks.ReentrantLock;
 
 import org.apache.avro.SchemaBuilder;
 import org.apache.commons.lang3.ClassUtils;
+import org.apache.gobblin.runtime.job_spec.JobSpecResolver;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -439,19 +440,16 @@ public class EmbeddedGobblin {
     } else {
       Config finalConfig = userConfig.withFallback(sysProps);
       if (this.template != null) {
-        try {
-          finalConfig = this.template.getResolvedConfig(finalConfig);
-        } catch (SpecNotFoundException | JobTemplate.TemplateException exc) {
-          throw new RuntimeException(exc);
-        }
+        this.specBuilder.withTemplate(this.template);
       }
       jobSpec = this.specBuilder.withConfig(finalConfig).build();
     }
 
     ResolvedJobSpec resolvedJobSpec;
     try {
-      resolvedJobSpec = new ResolvedJobSpec(jobSpec);
-    } catch (SpecNotFoundException | JobTemplate.TemplateException exc) {
+      JobSpecResolver resolver = JobSpecResolver.builder(sysProps).build();
+      resolvedJobSpec = resolver.resolveJobSpec(jobSpec);
+    } catch (SpecNotFoundException | JobTemplate.TemplateException | IOException exc) {
       throw new RuntimeException("Failed to resolved template.", exc);
     }
     final JobCatalog jobCatalog = new StaticJobCatalog(Optional.of(this.useLog), Lists.<JobSpec>newArrayList(resolvedJobSpec));
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job_spec/JobResolutionCallbacks.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job_spec/JobResolutionCallbacks.java
new file mode 100644
index 0000000..5bd72ee
--- /dev/null
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job_spec/JobResolutionCallbacks.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.runtime.job_spec;
+
+import org.apache.gobblin.annotation.Alpha;
+import org.apache.gobblin.runtime.api.JobSpec;
+import org.apache.gobblin.runtime.api.JobTemplate;
+
+
+/**
+ * Callbacks executed during job resolution by {@link JobSpecResolver}.
+ */
+@Alpha
+public interface JobResolutionCallbacks {
+
+	/**
+	 * Called before a job is resolved, providing the original job spec and template to be used.
+	 */
+	void beforeResolution(JobSpecResolver jobSpecResolver, JobSpec jobSpec, JobTemplate jobTemplate)
+			throws JobTemplate.TemplateException;
+
+	/**
+	 * Called after a job is resolved, providing the final resolved spec.
+	 */
+	void afterResolution(JobSpecResolver jobSpecResolver, ResolvedJobSpec resolvedJobSpec);
+
+}
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job_spec/JobSpecResolver.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job_spec/JobSpecResolver.java
new file mode 100644
index 0000000..455e18d
--- /dev/null
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job_spec/JobSpecResolver.java
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.runtime.job_spec;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.gobblin.runtime.api.GobblinInstanceDriver;
+import org.apache.gobblin.runtime.api.JobCatalog;
+import org.apache.gobblin.runtime.api.JobCatalogWithTemplates;
+import org.apache.gobblin.runtime.api.JobSpec;
+import org.apache.gobblin.runtime.api.JobTemplate;
+import org.apache.gobblin.runtime.api.SpecNotFoundException;
+import org.apache.gobblin.runtime.job_catalog.InMemoryJobCatalog;
+import org.apache.gobblin.runtime.job_catalog.PackagedTemplatesJobCatalogDecorator;
+import org.apache.gobblin.util.ClassAliasResolver;
+import org.apache.gobblin.util.ConfigUtils;
+
+import com.google.common.base.Splitter;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigException;
+import com.typesafe.config.ConfigFactory;
+
+import lombok.AllArgsConstructor;
+
+
+/**
+ * Resolved {@link JobSpec}s into {@link ResolvedJobSpec}s, this means:
+ * - Applying the template to obtain the final job config.
+ * - Calling resolution callbacks.
+ */
+@AllArgsConstructor
+public class JobSpecResolver {
+
+	public static final String JOB_RESOLUTION_ACTIONS_KEY = "org.apache.gobblin.jobResolution.actions";
+
+	private final JobCatalogWithTemplates jobCatalog;
+	private final List<JobResolutionCallbacks> jobResolutionCallbacks;
+	private final Config sysConfig;
+
+	/**
+	 * Obtain a mock {@link JobSpecResolver} with an empty configuration. It does not run any callbacks.
+	 * @return
+	 */
+	public static JobSpecResolver mock() {
+		try {
+			return new Builder().sysConfig(ConfigFactory.empty()).build();
+		} catch (IOException ioe) {
+			throw new RuntimeException("Unexpected error. This is an error in code.", ioe);
+		}
+	}
+
+	/**
+	 * @return a Builder for {@link JobSpecResolver}.
+	 */
+	public static Builder builder(GobblinInstanceDriver driver) throws IOException {
+		return new Builder().sysConfig(driver.getSysConfig().getConfig()).jobCatalog(driver.getJobCatalog());
+	}
+
+	/**
+	 * @return a Builder for {@link JobSpecResolver}.
+	 */
+	public static Builder builder(Config sysConfig) throws IOException {
+		return new Builder().sysConfig(sysConfig);
+	}
+
+	/**
+	 * Used for building {@link JobSpecResolver}.
+	 */
+	public static class Builder {
+		private JobCatalogWithTemplates jobCatalog = new PackagedTemplatesJobCatalogDecorator(new InMemoryJobCatalog());
+		private List<JobResolutionCallbacks> jobResolutionCallbacks = new ArrayList<>();
+		private Config sysConfig;
+
+		private Builder sysConfig(Config sysConfig) throws IOException {
+			this.sysConfig = sysConfig;
+			for (String action : Splitter.on(",").trimResults().omitEmptyStrings()
+					.split(ConfigUtils.getString(sysConfig, JOB_RESOLUTION_ACTIONS_KEY, ""))) {
+				try {
+					ClassAliasResolver<JobResolutionCallbacks> resolver = new ClassAliasResolver<>(JobResolutionCallbacks.class);
+					JobResolutionCallbacks actionInstance = resolver.resolveClass(action).newInstance();
+					this.jobResolutionCallbacks.add(actionInstance);
+				} catch (ReflectiveOperationException roe) {
+					throw new IOException(roe);
+				}
+			}
+			return this;
+		}
+
+		/**
+		 * Set the job catalog where templates may be found.
+		 */
+		public Builder jobCatalog(JobCatalog jobCatalog) {
+			this.jobCatalog = new PackagedTemplatesJobCatalogDecorator(jobCatalog);
+			return this;
+		}
+
+		/**
+		 * Add a {@link JobResolutionCallbacks} to the resolution process.
+		 */
+		public Builder jobResolutionAction(JobResolutionCallbacks action) {
+			this.jobResolutionCallbacks.add(action);
+			return this;
+		}
+
+		/**
+		 * @return a {@link JobSpecResolver}
+		 */
+		public JobSpecResolver build() {
+			return new JobSpecResolver(this.jobCatalog, this.jobResolutionCallbacks, this.sysConfig);
+		}
+	}
+
+	/**
+	 * Resolve an input {@link JobSpec} applying any templates.
+	 *
+	 * @throws SpecNotFoundException If no template exists with the specified URI.
+	 * @throws JobTemplate.TemplateException If the template throws an exception during resolution.
+	 * @throws ConfigException If the job config cannot be resolved.
+	 */
+	public ResolvedJobSpec resolveJobSpec(JobSpec jobSpec)
+			throws SpecNotFoundException, JobTemplate.TemplateException, ConfigException {
+		if (jobSpec instanceof ResolvedJobSpec) {
+			return (ResolvedJobSpec) jobSpec;
+		}
+
+		JobTemplate jobTemplate = null;
+		if (jobSpec.getJobTemplate().isPresent()) {
+			jobTemplate = jobSpec.getJobTemplate().get();
+		} else if (jobSpec.getTemplateURI().isPresent()) {
+			jobTemplate = jobCatalog.getTemplate(jobSpec.getTemplateURI().get());
+		}
+
+		for (JobResolutionCallbacks action : this.jobResolutionCallbacks) {
+			action.beforeResolution(this, jobSpec, jobTemplate);
+		}
+		Config resolvedConfig;
+		if (jobTemplate == null) {
+			resolvedConfig = jobSpec.getConfig().resolve();
+		} else {
+			resolvedConfig = jobTemplate.getResolvedConfig(jobSpec.getConfig()).resolve();
+		}
+		ResolvedJobSpec resolvedJobSpec = new ResolvedJobSpec(jobSpec, resolvedConfig);
+		for (JobResolutionCallbacks action : this.jobResolutionCallbacks) {
+			action.afterResolution(this, resolvedJobSpec);
+		}
+
+		return resolvedJobSpec;
+	}
+
+}
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job_spec/ResolvedJobSpec.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job_spec/ResolvedJobSpec.java
index b64f178..795e3b4 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job_spec/ResolvedJobSpec.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job_spec/ResolvedJobSpec.java
@@ -44,10 +44,18 @@ public class ResolvedJobSpec extends JobSpec {
   @Getter
   private final JobSpec originalJobSpec;
 
+  /**
+   * @deprecated Use {@link JobSpecResolver}
+   */
+  @Deprecated
   public ResolvedJobSpec(JobSpec other) throws SpecNotFoundException, JobTemplate.TemplateException {
     this(other, new InMemoryJobCatalog());
   }
 
+  /**
+   * @deprecated Use {@link JobSpecResolver}
+   */
+  @Deprecated
   public ResolvedJobSpec(JobSpec other, GobblinInstanceDriver driver)
       throws SpecNotFoundException, JobTemplate.TemplateException {
     this(other, driver.getJobCatalog());
@@ -55,11 +63,19 @@ public class ResolvedJobSpec extends JobSpec {
 
   /**
    * Resolve the job spec using classpath templates as well as any templates available in the input {@link JobCatalog}.
+   * @deprecated Use {@link JobSpecResolver}
    */
+  @Deprecated
   public ResolvedJobSpec(JobSpec other, JobCatalog catalog)
       throws SpecNotFoundException, JobTemplate.TemplateException {
     super(other.getUri(), other.getVersion(), other.getDescription(), resolveConfig(other, catalog),
-        ConfigUtils.configToProperties(resolveConfig(other, catalog)), other.getTemplateURI(), other.getMetadata());
+        ConfigUtils.configToProperties(resolveConfig(other, catalog)), other.getTemplateURI(), other.getJobTemplate(), other.getMetadata());
+    this.originalJobSpec = other;
+  }
+
+  ResolvedJobSpec(JobSpec other, Config resolvedConfig) {
+    super(other.getUri(), other.getVersion(), other.getDescription(), resolvedConfig,
+        ConfigUtils.configToProperties(resolvedConfig), other.getTemplateURI(), other.getJobTemplate(), other.getMetadata());
     this.originalJobSpec = other;
   }
 
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job_spec/SecureTemplateEnforcer.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job_spec/SecureTemplateEnforcer.java
new file mode 100644
index 0000000..2605c05
--- /dev/null
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job_spec/SecureTemplateEnforcer.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.runtime.job_spec;
+
+import org.apache.gobblin.runtime.api.JobSpec;
+import org.apache.gobblin.runtime.api.JobTemplate;
+import org.apache.gobblin.runtime.api.SecureJobTemplate;
+
+
+/**
+ * A {@link JobResolutionCallbacks} that blocks resolution of any job that does not use a secure template.
+ */
+public class SecureTemplateEnforcer implements JobResolutionCallbacks {
+
+	@Override
+	public void beforeResolution(JobSpecResolver jobSpecResolver, JobSpec jobSpec, JobTemplate jobTemplate)
+			throws JobTemplate.TemplateException {
+		if (!(jobTemplate instanceof SecureJobTemplate) || !((SecureJobTemplate) jobTemplate).isSecure()) {
+			throw new JobTemplate.TemplateException(String.format("Unallowed template resolution. %s is not a secure template.",
+					jobTemplate == null ? "null" : jobTemplate.getUri()));
+		}
+	}
+
+	@Override
+	public void afterResolution(JobSpecResolver jobSpecResolver, ResolvedJobSpec resolvedJobSpec) {
+		// NOOP
+	}
+}
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/template/StaticJobTemplate.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/template/StaticJobTemplate.java
index 6c69370..83e65f1 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/template/StaticJobTemplate.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/template/StaticJobTemplate.java
@@ -22,6 +22,7 @@ import java.net.URISyntaxException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
@@ -33,17 +34,23 @@ import com.typesafe.config.Config;
 import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.runtime.api.JobCatalogWithTemplates;
 import org.apache.gobblin.runtime.api.JobTemplate;
+import org.apache.gobblin.runtime.api.SecureJobTemplate;
 import org.apache.gobblin.runtime.api.SpecNotFoundException;
+import org.apache.gobblin.util.ConfigUtils;
 
 import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
 
 
 /**
  * A {@link JobTemplate} using a static {@link Config} as the raw configuration for the template.
  */
-public class StaticJobTemplate extends InheritingJobTemplate {
+@Slf4j
+public class StaticJobTemplate extends InheritingJobTemplate implements SecureJobTemplate {
 
   public static final String SUPER_TEMPLATE_KEY = "gobblin.template.inherit";
+  public static final String IS_SECURE_KEY = "gobblin.template.isSecure";
+  public static final String SECURE_OVERRIDABLE_PROPERTIES_KEYS = "gobblin.template.secure.overridableProperties";
 
   private Config rawConfig;
   private Set<String> requiredAttributes;
@@ -80,7 +87,7 @@ public class StaticJobTemplate extends InheritingJobTemplate {
       throws TemplateException {
     if (config.hasPath(SUPER_TEMPLATE_KEY)) {
       List<URI> uris = Lists.newArrayList();
-      for (String uriString : config.getString(SUPER_TEMPLATE_KEY).split(",")) {
+      for (String uriString : ConfigUtils.getStringList(config, SUPER_TEMPLATE_KEY)) {
         try {
           uris.add(new URI(uriString));
         } catch (URISyntaxException use) {
@@ -105,6 +112,17 @@ public class StaticJobTemplate extends InheritingJobTemplate {
 
   @Override
   protected Config getLocallyResolvedConfig(Config userConfig) {
-    return userConfig.withFallback(this.rawConfig);
+    Config filteredUserConfig = SecureJobTemplate.filterUserConfig(this, userConfig, log);
+    return filteredUserConfig.withFallback(this.rawConfig);
+  }
+
+  @Override
+  public boolean isSecure() {
+    return ConfigUtils.getBoolean(this.rawConfig, IS_SECURE_KEY, false);
+  }
+
+  @Override
+  public Collection<String> overridableProperties() {
+    return isSecure() ? ConfigUtils.getStringList(this.rawConfig, SECURE_OVERRIDABLE_PROPERTIES_KEYS) : Collections.emptyList();
   }
 }
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/scheduler/JobScheduler.java b/gobblin-runtime/src/main/java/org/apache/gobblin/scheduler/JobScheduler.java
index a675d99..fe3640a 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/scheduler/JobScheduler.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/scheduler/JobScheduler.java
@@ -33,7 +33,9 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 
 import org.apache.commons.configuration.ConfigurationException;
+import org.apache.gobblin.runtime.job_spec.JobSpecResolver;
 import org.apache.gobblin.source.Source;
+import org.apache.gobblin.util.ConfigUtils;
 import org.apache.hadoop.fs.Path;
 
 import org.quartz.CronScheduleBuilder;
@@ -137,6 +139,9 @@ public class JobScheduler extends AbstractIdleService {
   private final Closer closer = Closer.create();
 
   @Getter
+  private final JobSpecResolver jobSpecResolver;
+
+  @Getter
   private volatile boolean cancelRequested = false;
 
   public JobScheduler(Properties properties, SchedulerService scheduler)
@@ -171,6 +176,8 @@ public class JobScheduler extends AbstractIdleService {
       this.jobConfigFileDirPath = null;
       this.listener = null;
     }
+
+    this.jobSpecResolver = JobSpecResolver.builder(ConfigUtils.propertiesToConfig(properties)).build();
   }
 
   @Override
@@ -523,7 +530,7 @@ public class JobScheduler extends AbstractIdleService {
    */
   private List<Properties> loadGeneralJobConfigs()
       throws ConfigurationException, IOException {
-    List<Properties> jobConfigs = SchedulerUtils.loadGenericJobConfigs(this.properties);
+    List<Properties> jobConfigs = SchedulerUtils.loadGenericJobConfigs(this.properties, this.jobSpecResolver);
     LOG.info(String.format("Loaded %d job configurations", jobConfigs.size()));
     return jobConfigs;
   }
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/scheduler/PathAlterationListenerAdaptorForMonitor.java b/gobblin-runtime/src/main/java/org/apache/gobblin/scheduler/PathAlterationListenerAdaptorForMonitor.java
index 0b705e0..a4d837a 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/scheduler/PathAlterationListenerAdaptorForMonitor.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/scheduler/PathAlterationListenerAdaptorForMonitor.java
@@ -22,6 +22,7 @@ import java.util.Map;
 import java.util.Properties;
 
 import org.apache.commons.configuration.ConfigurationException;
+import org.apache.gobblin.runtime.job_spec.JobSpecResolver;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.Path;
@@ -51,11 +52,13 @@ public class PathAlterationListenerAdaptorForMonitor extends PathAlterationListe
   JobScheduler jobScheduler;
   /** Store path to job mappings. Required for correctly unscheduling. */
   private final Map<Path, String> jobNameMap;
+  private final JobSpecResolver jobSpecResolver;
 
   PathAlterationListenerAdaptorForMonitor(Path jobConfigFileDirPath, JobScheduler jobScheduler) {
     this.jobConfigFileDirPath = jobConfigFileDirPath;
     this.jobScheduler = jobScheduler;
     this.jobNameMap = Maps.newConcurrentMap();
+    this.jobSpecResolver = jobScheduler.getJobSpecResolver();
   }
 
   private Path getJobPath(Properties jobProps) {
@@ -73,7 +76,7 @@ public class PathAlterationListenerAdaptorForMonitor extends PathAlterationListe
     String customizedInfo = "";
     try {
       Properties jobProps =
-          SchedulerUtils.loadGenericJobConfig(this.jobScheduler.properties, path, jobConfigFileDirPath);
+          SchedulerUtils.loadGenericJobConfig(this.jobScheduler.properties, path, jobConfigFileDirPath, this.jobSpecResolver);
       LOG.debug("Loaded job properties: {}", jobProps);
       switch (action) {
         case SCHEDULE:
@@ -103,7 +106,7 @@ public class PathAlterationListenerAdaptorForMonitor extends PathAlterationListe
     String customizedInfoResult = "";
     try {
       for (Properties jobProps : SchedulerUtils.loadGenericJobConfigs(jobScheduler.properties, path,
-          jobConfigFileDirPath)) {
+          jobConfigFileDirPath, this.jobSpecResolver)) {
         try {
           switch (action) {
             case SCHEDULE:
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/util/SchedulerUtils.java b/gobblin-runtime/src/main/java/org/apache/gobblin/util/SchedulerUtils.java
index 1174c62..efb3c2e 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/util/SchedulerUtils.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/util/SchedulerUtils.java
@@ -25,6 +25,8 @@ import java.util.Properties;
 import java.util.Set;
 
 import org.apache.commons.configuration.ConfigurationException;
+import org.apache.gobblin.runtime.api.JobSpec;
+import org.apache.gobblin.runtime.job_spec.JobSpecResolver;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.slf4j.Logger;
@@ -65,7 +67,7 @@ public class SchedulerUtils {
    * @param sysProps Gobblin framework configuration properties
    * @return a list of job configurations in the form of {@link java.util.Properties}
    */
-  public static List<Properties> loadGenericJobConfigs(Properties sysProps)
+  public static List<Properties> loadGenericJobConfigs(Properties sysProps, JobSpecResolver resolver)
       throws ConfigurationException, IOException {
     Path rootPath = new Path(sysProps.getProperty(ConfigurationKeys.JOB_CONFIG_FILE_GENERAL_PATH_KEY));
     PullFileLoader loader = new PullFileLoader(rootPath, rootPath.getFileSystem(new Configuration()),
@@ -77,7 +79,7 @@ public class SchedulerUtils {
     List<Properties> jobConfigs = Lists.newArrayList();
     for (Config config : configs) {
       try {
-        jobConfigs.add(resolveTemplate(ConfigUtils.configToProperties(config)));
+        jobConfigs.add(resolveTemplate(ConfigUtils.configToProperties(config), resolver));
       } catch (IOException ioe) {
         LOGGER.error("Could not parse job config at " + ConfigUtils.getString(config,
             ConfigurationKeys.JOB_CONFIG_FILE_PATH_KEY, "Unknown path"), ioe);
@@ -96,7 +98,7 @@ public class SchedulerUtils {
    * @return a list of job configurations in the form of {@link java.util.Properties}
    */
   public static List<Properties> loadGenericJobConfigs(Properties sysProps, Path commonPropsPath,
-      Path jobConfigPathDir)
+      Path jobConfigPathDir, JobSpecResolver resolver)
       throws ConfigurationException, IOException {
 
     PullFileLoader loader = new PullFileLoader(jobConfigPathDir, jobConfigPathDir.getFileSystem(new Configuration()),
@@ -108,7 +110,7 @@ public class SchedulerUtils {
     List<Properties> jobConfigs = Lists.newArrayList();
     for (Config config : configs) {
       try {
-        jobConfigs.add(resolveTemplate(ConfigUtils.configToProperties(config)));
+        jobConfigs.add(resolveTemplate(ConfigUtils.configToProperties(config), resolver));
       } catch (IOException ioe) {
         LOGGER.error("Could not parse job config at " + ConfigUtils.getString(config,
             ConfigurationKeys.JOB_CONFIG_FILE_PATH_KEY, "Unknown path"), ioe);
@@ -126,15 +128,15 @@ public class SchedulerUtils {
    * @param jobConfigPathDir root job configuration file directory
    * @return a job configuration in the form of {@link java.util.Properties}
    */
-  public static Properties loadGenericJobConfig(Properties sysProps, Path jobConfigPath, Path jobConfigPathDir)
-      throws ConfigurationException, IOException {
+  public static Properties loadGenericJobConfig(Properties sysProps, Path jobConfigPath, Path jobConfigPathDir,
+      JobSpecResolver resolver) throws ConfigurationException, IOException {
 
     PullFileLoader loader = new PullFileLoader(jobConfigPathDir, jobConfigPathDir.getFileSystem(new Configuration()),
         getJobConfigurationFileExtensions(sysProps), PullFileLoader.DEFAULT_HOCON_PULL_FILE_EXTENSIONS);
 
     Config sysConfig = ConfigUtils.propertiesToConfig(sysProps);
     Config config = loader.loadPullFile(jobConfigPath, sysConfig, true);
-    return resolveTemplate(ConfigUtils.configToProperties(config));
+    return resolveTemplate(ConfigUtils.configToProperties(config), resolver);
   }
 
   /**
@@ -168,18 +170,16 @@ public class SchedulerUtils {
     }));
   }
 
-  private static Properties resolveTemplate(Properties jobProps) throws IOException {
+  private static Properties resolveTemplate(Properties jobProps, JobSpecResolver resolver) throws IOException {
     try {
+      JobSpec.Builder jobSpecBuilder = JobSpec.builder().withConfig(ConfigUtils.propertiesToConfig(jobProps));
       if (jobProps.containsKey(ConfigurationKeys.JOB_TEMPLATE_PATH)) {
-        Config jobConfig = ConfigUtils.propertiesToConfig(jobProps);
-        Properties resolvedProps = ConfigUtils.configToProperties((ResourceBasedJobTemplate
+        JobTemplate jobTemplate = ResourceBasedJobTemplate
             .forResourcePath(jobProps.getProperty(ConfigurationKeys.JOB_TEMPLATE_PATH),
-                new PackagedTemplatesJobCatalogDecorator()))
-            .getResolvedConfig(jobConfig));
-        return resolvedProps;
-      } else {
-        return jobProps;
+                new PackagedTemplatesJobCatalogDecorator());
+        jobSpecBuilder.withTemplate(jobTemplate);
       }
+      return ConfigUtils.configToProperties(resolver.resolveJobSpec(jobSpecBuilder.build()).getConfig());
     } catch (JobTemplate.TemplateException | SpecNotFoundException | URISyntaxException exc) {
       throw new IOException(exc);
     }
diff --git a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/api/SecureJobTemplateTest.java b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/api/SecureJobTemplateTest.java
new file mode 100644
index 0000000..4b30f89
--- /dev/null
+++ b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/api/SecureJobTemplateTest.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.runtime.api;
+
+import java.net.URI;
+
+import org.mockito.Mockito;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Lists;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+
+import lombok.extern.slf4j.Slf4j;
+
+
+@Slf4j
+public class SecureJobTemplateTest {
+
+	@Test
+	public void test() {
+
+		SecureJobTemplate template = Mockito.mock(SecureJobTemplate.class);
+		Mockito.when(template.getUri()).thenReturn(URI.create("my://template"));
+		Mockito.when(template.isSecure()).thenReturn(true);
+		Mockito.when(template.overridableProperties()).thenReturn(Lists.newArrayList("my.overridable.property1", "my.overridable.property2"));
+
+		// Test simple filtering
+		Config config = ConfigFactory.parseMap(ImmutableMap.of(
+				"someProperty", "foo",
+				"my.overridable.property1", "bar"
+		));
+		Config result = SecureJobTemplate.filterUserConfig(template, config, log);
+		Assert.assertEquals(result.entrySet().size(), 1);
+		Assert.assertEquals(result.getString("my.overridable.property1"), "bar");
+		Assert.assertFalse(result.hasPath("someProperty"));
+		Assert.assertFalse(result.hasPath("my.overridable.property2"));
+
+		// Test allowing override of a subconfig
+		config = ConfigFactory.parseMap(ImmutableMap.of(
+				"someProperty", "foo",
+				"my.overridable.property1.key1", "bar",
+				"my.overridable.property1.key2", "baz"
+		));
+		result = SecureJobTemplate.filterUserConfig(template, config, log);
+		Assert.assertEquals(result.entrySet().size(), 2);
+		Assert.assertEquals(result.getString("my.overridable.property1.key1"), "bar");
+		Assert.assertEquals(result.getString("my.overridable.property1.key2"), "baz");
+
+		// Test multiple overrides
+		config = ConfigFactory.parseMap(ImmutableMap.of(
+				"someProperty", "foo",
+				"my.overridable.property1", "bar",
+				"my.overridable.property2", "baz"
+		));
+		result = SecureJobTemplate.filterUserConfig(template, config, log);
+		Assert.assertEquals(result.entrySet().size(), 2);
+		Assert.assertEquals(result.getString("my.overridable.property1"), "bar");
+		Assert.assertEquals(result.getString("my.overridable.property2"), "baz");
+	}
+
+}
diff --git a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/job_spec/JobSpecResolverTest.java b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/job_spec/JobSpecResolverTest.java
new file mode 100644
index 0000000..7ba28d7
--- /dev/null
+++ b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/job_spec/JobSpecResolverTest.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.runtime.job_spec;
+
+import java.net.URI;
+
+import org.apache.gobblin.runtime.api.JobCatalogWithTemplates;
+import org.apache.gobblin.runtime.api.JobSpec;
+import org.apache.gobblin.runtime.api.JobTemplate;
+import org.apache.gobblin.runtime.api.SecureJobTemplate;
+import org.mockito.Mockito;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import com.google.common.collect.ImmutableMap;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+
+
+public class JobSpecResolverTest {
+
+	@Test
+	public void test() throws Exception {
+
+		Config sysConfig = ConfigFactory.empty();
+
+		JobTemplate jobTemplate = Mockito.mock(JobTemplate.class);
+		Mockito.when(jobTemplate.getResolvedConfig(Mockito.any(Config.class))).thenAnswer(i -> {
+			Config userConfig = (Config) i.getArguments()[0];
+			return ConfigFactory.parseMap(ImmutableMap.of("template.value", "foo")).withFallback(userConfig);
+		});
+
+		JobCatalogWithTemplates catalog = Mockito.mock(JobCatalogWithTemplates.class);
+		Mockito.when(catalog.getTemplate(Mockito.eq(URI.create("my://template")))).thenReturn(jobTemplate);
+
+		JobSpecResolver resolver = JobSpecResolver.builder(sysConfig).jobCatalog(catalog).build();
+		JobSpec jobSpec = JobSpec.builder()
+				.withConfig(ConfigFactory.parseMap(ImmutableMap.of("key", "value")))
+				.withTemplate(URI.create("my://template")).build();
+		ResolvedJobSpec resolvedJobSpec = resolver.resolveJobSpec(jobSpec);
+
+		Assert.assertEquals(resolvedJobSpec.getOriginalJobSpec(), jobSpec);
+		Assert.assertEquals(resolvedJobSpec.getConfig().entrySet().size(), 2);
+		Assert.assertEquals(resolvedJobSpec.getConfig().getString("key"), "value");
+		Assert.assertEquals(resolvedJobSpec.getConfig().getString("template.value"), "foo");
+	}
+
+	@Test
+	public void testWithResolutionAction() throws Exception {
+
+		Config sysConfig = ConfigFactory.empty();
+
+		SecureJobTemplate insecureTemplate = Mockito.mock(SecureJobTemplate.class);
+		Mockito.when(insecureTemplate.getResolvedConfig(Mockito.any(Config.class))).thenAnswer(i -> (Config) i.getArguments()[0]);
+		Mockito.when(insecureTemplate.isSecure()).thenReturn(false);
+
+		SecureJobTemplate secureTemplate = Mockito.mock(SecureJobTemplate.class);
+		Mockito.when(secureTemplate.getResolvedConfig(Mockito.any(Config.class))).thenAnswer(i -> (Config) i.getArguments()[0]);
+		Mockito.when(secureTemplate.isSecure()).thenReturn(true);
+
+		JobCatalogWithTemplates catalog = Mockito.mock(JobCatalogWithTemplates.class);
+		Mockito.when(catalog.getTemplate(Mockito.eq(URI.create("my://template.insecure")))).thenReturn(insecureTemplate);
+		Mockito.when(catalog.getTemplate(Mockito.eq(URI.create("my://template.secure")))).thenReturn(secureTemplate);
+
+		JobSpecResolver resolver = JobSpecResolver.builder(sysConfig).jobCatalog(catalog)
+				// This resolution action should block any resolution that does not use a secure template
+				.jobResolutionAction(new SecureTemplateEnforcer()).build();
+
+		JobSpec jobSpec = JobSpec.builder()
+				.withConfig(ConfigFactory.parseMap(ImmutableMap.of("key", "value")))
+				.withTemplate(URI.create("my://template.insecure")).build();
+		Assert.expectThrows(JobTemplate.TemplateException.class, () -> resolver.resolveJobSpec(jobSpec));
+
+		JobSpec jobSpec2 = JobSpec.builder()
+				.withConfig(ConfigFactory.parseMap(ImmutableMap.of("key", "value")))
+				.withTemplate(URI.create("my://template.secure")).build();
+		ResolvedJobSpec resolvedJobSpec = resolver.resolveJobSpec(jobSpec2);
+
+		Assert.assertEquals(resolvedJobSpec.getOriginalJobSpec(), jobSpec2);
+		Assert.assertEquals(resolvedJobSpec.getConfig().entrySet().size(), 1);
+		Assert.assertEquals(resolvedJobSpec.getConfig().getString("key"), "value");
+	}
+
+
+}
diff --git a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/template/StaticJobTemplateTest.java b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/template/StaticJobTemplateTest.java
index ce42f8e..392c981 100644
--- a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/template/StaticJobTemplateTest.java
+++ b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/template/StaticJobTemplateTest.java
@@ -74,4 +74,29 @@ public class StaticJobTemplateTest {
     Assert.assertEquals(resolved.getString("required2"), "r2");
   }
 
+  @Test
+  public void testSecure() throws Exception {
+    Map<String, Object> confMap = Maps.newHashMap();
+    confMap.put("nonOverridableKey", "value1");
+    confMap.put("overridableKey", "value1");
+    confMap.put(StaticJobTemplate.IS_SECURE_KEY, true);
+    confMap.put(StaticJobTemplate.SECURE_OVERRIDABLE_PROPERTIES_KEYS, "overridableKey, overridableKey2");
+
+    StaticJobTemplate template = new StaticJobTemplate(URI.create("my://template"), "1", "desc", ConfigFactory.parseMap(confMap), null);
+
+    Config userConfig = ConfigFactory.parseMap(ImmutableMap.of(
+        "overridableKey", "override",
+        "overridableKey2", "override2",
+        "nonOverridableKey", "override",
+        "somethingElse", "override"));
+    Config resolved = template.getResolvedConfig(userConfig);
+
+    Assert.assertEquals(resolved.entrySet().size(), 5);
+    Assert.assertEquals(resolved.getString("nonOverridableKey"), "value1");
+    Assert.assertEquals(resolved.getString("overridableKey"), "override");
+    Assert.assertEquals(resolved.getString("overridableKey2"), "override2");
+    Assert.assertFalse(resolved.hasPath("somethingElse"));
+
+  }
+
 }
\ No newline at end of file
diff --git a/gobblin-runtime/src/test/java/org/apache/gobblin/util/SchedulerUtilsTest.java b/gobblin-runtime/src/test/java/org/apache/gobblin/util/SchedulerUtilsTest.java
index 43696f8..476ba2a 100644
--- a/gobblin-runtime/src/test/java/org/apache/gobblin/util/SchedulerUtilsTest.java
+++ b/gobblin-runtime/src/test/java/org/apache/gobblin/util/SchedulerUtilsTest.java
@@ -19,6 +19,9 @@ package org.apache.gobblin.util;
 
 import com.google.common.collect.Sets;
 import com.google.common.io.Files;
+import com.typesafe.config.ConfigFactory;
+
+import org.apache.gobblin.runtime.job_spec.JobSpecResolver;
 import org.apache.gobblin.util.filesystem.PathAlterationListener;
 import org.apache.gobblin.util.filesystem.PathAlterationListenerAdaptor;
 import org.apache.gobblin.util.filesystem.PathAlterationObserverScheduler;
@@ -118,7 +121,7 @@ public class SchedulerUtilsTest {
       throws ConfigurationException, IOException {
     Properties properties = new Properties();
     properties.setProperty(ConfigurationKeys.JOB_CONFIG_FILE_GENERAL_PATH_KEY, this.jobConfigDir.getAbsolutePath());
-    List<Properties> jobConfigs = SchedulerUtils.loadGenericJobConfigs(properties);
+    List<Properties> jobConfigs = SchedulerUtils.loadGenericJobConfigs(properties, JobSpecResolver.mock());
     Assert.assertEquals(jobConfigs.size(), 4);
 
     // test-job-conf-dir/test1/test11/test111.pull
@@ -174,7 +177,7 @@ public class SchedulerUtilsTest {
     Properties properties = new Properties();
     properties.setProperty(ConfigurationKeys.JOB_CONFIG_FILE_GENERAL_PATH_KEY, this.jobConfigDir.getAbsolutePath());
     List<Properties> jobConfigs = SchedulerUtils.loadGenericJobConfigs(properties, commonPropsPath,
-        new Path(this.jobConfigDir.getAbsolutePath()));
+        new Path(this.jobConfigDir.getAbsolutePath()), JobSpecResolver.mock());
     Assert.assertEquals(jobConfigs.size(), 3);
 
     // test-job-conf-dir/test1/test11/test111.pull
@@ -210,7 +213,8 @@ public class SchedulerUtilsTest {
     Properties properties = new Properties();
     properties.setProperty(ConfigurationKeys.JOB_CONFIG_FILE_GENERAL_PATH_KEY, this.jobConfigDir.getAbsolutePath());
     Properties jobProps =
-        SchedulerUtils.loadGenericJobConfig(properties, jobConfigPath, new Path(this.jobConfigDir.getAbsolutePath()));
+        SchedulerUtils.loadGenericJobConfig(properties, jobConfigPath, new Path(this.jobConfigDir.getAbsolutePath()),
+            JobSpecResolver.builder(ConfigFactory.empty()).build());
 
     Assert.assertEquals(jobProps.stringPropertyNames().size(), 7);
     Assert.assertTrue(jobProps.containsKey(ConfigurationKeys.JOB_CONFIG_FILE_DIR_KEY) || jobProps.containsKey(
@@ -275,12 +279,14 @@ public class SchedulerUtilsTest {
     Path path = new Path(getClass().getClassLoader().getResource("schedulerUtilsTest").getFile());
 
     Properties pullFile =
-        SchedulerUtils.loadGenericJobConfig(new Properties(), new Path(path, "templated.pull"), path);
+        SchedulerUtils.loadGenericJobConfig(new Properties(), new Path(path, "templated.pull"), path,
+            JobSpecResolver.builder(ConfigFactory.empty()).build());
 
     Assert.assertEquals(pullFile.getProperty("gobblin.dataset.pattern"), "pattern");
     Assert.assertEquals(pullFile.getProperty("job.name"), "GobblinDatabaseCopyTest");
 
-    List<Properties> jobConfigs = SchedulerUtils.loadGenericJobConfigs(new Properties(), new Path(path, "templated.pull"), path);
+    List<Properties> jobConfigs = SchedulerUtils.loadGenericJobConfigs(new Properties(), new Path(path, "templated.pull"),
+        path, JobSpecResolver.mock());
     Properties pullFile2 = getJobConfigForFile(jobConfigs, "templated.pull");
 
     Assert.assertEquals(pullFile2.getProperty("gobblin.dataset.pattern"), "pattern");
@@ -288,7 +294,7 @@ public class SchedulerUtilsTest {
 
     Properties props = new Properties();
     props.put(ConfigurationKeys.JOB_CONFIG_FILE_GENERAL_PATH_KEY, path.toString());
-    List<Properties> jobConfigs3 = SchedulerUtils.loadGenericJobConfigs(props);
+    List<Properties> jobConfigs3 = SchedulerUtils.loadGenericJobConfigs(props, JobSpecResolver.mock());
     Properties pullFile3 = getJobConfigForFile(jobConfigs3, "templated.pull");
 
     Assert.assertEquals(pullFile3.getProperty("gobblin.dataset.pattern"), "pattern");
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/template/StaticFlowTemplate.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/template/StaticFlowTemplate.java
index 5f8dfda..53c8e37 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/template/StaticFlowTemplate.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/template/StaticFlowTemplate.java
@@ -29,6 +29,7 @@ import org.apache.commons.lang3.tuple.Pair;
 import com.google.common.base.Joiner;
 import com.google.common.collect.Lists;
 import com.typesafe.config.Config;
+import com.typesafe.config.ConfigException;
 import com.typesafe.config.ConfigResolveOptions;
 import com.typesafe.config.ConfigValueFactory;
 
@@ -37,8 +38,11 @@ import lombok.extern.slf4j.Slf4j;
 
 import org.apache.gobblin.annotation.Alpha;
 import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.runtime.api.JobSpec;
 import org.apache.gobblin.runtime.api.JobTemplate;
 import org.apache.gobblin.runtime.api.SpecNotFoundException;
+import org.apache.gobblin.runtime.job_spec.JobSpecResolver;
+import org.apache.gobblin.runtime.job_spec.ResolvedJobSpec;
 import org.apache.gobblin.service.modules.dataset.DatasetDescriptor;
 import org.apache.gobblin.service.modules.flowgraph.DatasetDescriptorConfigKeys;
 import org.apache.gobblin.service.modules.template_catalog.FlowCatalogWithTemplates;
@@ -66,6 +70,8 @@ public class StaticFlowTemplate implements FlowTemplate {
 
   private transient Config rawConfig;
 
+  private final transient JobSpecResolver jobSpecResolver;
+
   public StaticFlowTemplate(URI flowTemplateDirUri, String version, String description, Config config,
       FlowCatalogWithTemplates catalog)
       throws IOException, SpecNotFoundException, JobTemplate.TemplateException, URISyntaxException {
@@ -75,6 +81,7 @@ public class StaticFlowTemplate implements FlowTemplate {
     this.rawConfig = config;
     this.catalog = catalog;
     this.jobTemplates = this.catalog.getJobTemplatesForFlow(flowTemplateDirUri);
+    this.jobSpecResolver = JobSpecResolver.builder(config).build();
   }
 
   //Constructor for testing purposes
@@ -85,6 +92,11 @@ public class StaticFlowTemplate implements FlowTemplate {
     this.rawConfig = config;
     this.catalog = catalog;
     this.jobTemplates = jobTemplates;
+    try {
+      this.jobSpecResolver = JobSpecResolver.builder(config).build();
+    } catch (IOException ioe) {
+      throw new RuntimeException(ioe);
+    }
   }
 
 
@@ -153,17 +165,17 @@ public class StaticFlowTemplate implements FlowTemplate {
    * @return true if the {@link FlowTemplate} is resolvable
    */
   @Override
-  public boolean isResolvable(Config userConfig, DatasetDescriptor inputDescriptor, DatasetDescriptor outputDescriptor)
-      throws SpecNotFoundException, JobTemplate.TemplateException {
+  public boolean isResolvable(Config userConfig, DatasetDescriptor inputDescriptor, DatasetDescriptor outputDescriptor) {
     Config inputDescriptorConfig = inputDescriptor.getRawConfig().atPath(DatasetDescriptorConfigKeys.FLOW_EDGE_INPUT_DATASET_DESCRIPTOR_PREFIX);
     Config outputDescriptorConfig = outputDescriptor.getRawConfig().atPath(DatasetDescriptorConfigKeys.FLOW_EDGE_OUTPUT_DATASET_DESCRIPTOR_PREFIX);
     userConfig = userConfig.withFallback(inputDescriptorConfig).withFallback(outputDescriptorConfig);
 
-    ConfigResolveOptions resolveOptions = ConfigResolveOptions.defaults().setAllowUnresolved(true);
+    JobSpec.Builder jobSpecBuilder = JobSpec.builder().withConfig(userConfig);
 
     for (JobTemplate template: this.jobTemplates) {
-      Config templateConfig = template.getResolvedConfig(userConfig).resolve(resolveOptions);
-      if (!template.getResolvedConfig(userConfig).resolve(resolveOptions).isResolved()) {
+      try {
+        this.jobSpecResolver.resolveJobSpec(jobSpecBuilder.withTemplate(template).build());
+      } catch (JobTemplate.TemplateException | ConfigException | SpecNotFoundException exc) {
         return false;
       }
     }
@@ -179,9 +191,11 @@ public class StaticFlowTemplate implements FlowTemplate {
     userConfig = userConfig.withFallback(inputDescriptorConfig).withFallback(outputDescriptorConfig);
 
     List<Config> resolvedJobConfigs = new ArrayList<>();
+    JobSpec.Builder jobSpecBuilder = JobSpec.builder().withConfig(userConfig);
     for (JobTemplate jobTemplate: getJobTemplates()) {
-      Config resolvedJobConfig = jobTemplate.getResolvedConfig(userConfig).resolve().withValue(
-          ConfigurationKeys.JOB_TEMPLATE_PATH, ConfigValueFactory.fromAnyRef(jobTemplate.getUri().toString()));;
+      ResolvedJobSpec resolvedJobSpec = this.jobSpecResolver.resolveJobSpec(jobSpecBuilder.withTemplate(jobTemplate).build());
+      Config resolvedJobConfig = resolvedJobSpec.getConfig().withValue(
+          ConfigurationKeys.JOB_TEMPLATE_PATH, ConfigValueFactory.fromAnyRef(jobTemplate.getUri().toString()));
       resolvedJobConfigs.add(resolvedJobConfig);
     }
     return resolvedJobConfigs;