You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by su...@apache.org on 2019/09/24 22:01:00 UTC
[incubator-gobblin] branch master updated: [GOBBLIN-701] Add secure
templates (duplicate of #2571)
This is an automated email from the ASF dual-hosted git repository.
suvasude pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 7d260ab [GOBBLIN-701] Add secure templates (duplicate of #2571)
7d260ab is described below
commit 7d260ab2c32a773e317a31cd264f5c97d2d48a10
Author: Jack Moseley <jm...@linkedin.com>
AuthorDate: Tue Sep 24 15:00:52 2019 -0700
[GOBBLIN-701] Add secure templates (duplicate of #2571)
Closes #2739 from jack-moseley/secure-template
---
.../gobblin/aws/AWSJobConfigurationManager.java | 9 +-
.../gobblin/cluster/JobConfigurationManager.java | 10 +-
.../apache/gobblin/azkaban/AzkabanJobLauncher.java | 13 +-
.../orchestration/AzkabanProjectConfigTest.java | 12 +-
.../org/apache/gobblin/runtime/api/JobSpec.java | 15 +-
.../apache/gobblin/runtime/api/JobTemplate.java | 3 +-
.../gobblin/runtime/api/SecureJobTemplate.java | 69 +++++++++
.../gobblin/runtime/embedded/EmbeddedGobblin.java | 12 +-
.../runtime/job_spec/JobResolutionCallbacks.java | 42 ++++++
.../gobblin/runtime/job_spec/JobSpecResolver.java | 167 +++++++++++++++++++++
.../gobblin/runtime/job_spec/ResolvedJobSpec.java | 18 ++-
.../runtime/job_spec/SecureTemplateEnforcer.java | 43 ++++++
.../runtime/template/StaticJobTemplate.java | 24 ++-
.../org/apache/gobblin/scheduler/JobScheduler.java | 9 +-
.../PathAlterationListenerAdaptorForMonitor.java | 7 +-
.../org/apache/gobblin/util/SchedulerUtils.java | 30 ++--
.../gobblin/runtime/api/SecureJobTemplateTest.java | 79 ++++++++++
.../runtime/job_spec/JobSpecResolverTest.java | 100 ++++++++++++
.../runtime/template/StaticJobTemplateTest.java | 25 +++
.../apache/gobblin/util/SchedulerUtilsTest.java | 18 ++-
.../modules/template/StaticFlowTemplate.java | 28 +++-
21 files changed, 678 insertions(+), 55 deletions(-)
diff --git a/gobblin-aws/src/main/java/org/apache/gobblin/aws/AWSJobConfigurationManager.java b/gobblin-aws/src/main/java/org/apache/gobblin/aws/AWSJobConfigurationManager.java
index 042503f..cc99373 100644
--- a/gobblin-aws/src/main/java/org/apache/gobblin/aws/AWSJobConfigurationManager.java
+++ b/gobblin-aws/src/main/java/org/apache/gobblin/aws/AWSJobConfigurationManager.java
@@ -33,6 +33,7 @@ import java.util.concurrent.TimeUnit;
import java.util.zip.ZipEntry;
import java.util.zip.ZipFile;
+import org.apache.gobblin.runtime.job_spec.JobSpecResolver;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -87,6 +88,7 @@ public class AWSJobConfigurationManager extends JobConfigurationManager {
private final long refreshIntervalInSeconds;
private final ScheduledExecutorService fetchJobConfExecutor;
+ private final JobSpecResolver jobSpecResolver;
public AWSJobConfigurationManager(EventBus eventBus, Config config) {
super(eventBus, config);
@@ -100,6 +102,11 @@ public class AWSJobConfigurationManager extends JobConfigurationManager {
this.fetchJobConfExecutor = Executors.newSingleThreadScheduledExecutor(
ExecutorsUtils.newThreadFactory(Optional.of(LOGGER), Optional.of("FetchJobConfExecutor")));
+ try {
+ this.jobSpecResolver = JobSpecResolver.builder(config).build();
+ } catch (IOException ioe) {
+ throw new RuntimeException(ioe);
+ }
}
private void fetchJobConfSettings() {
@@ -155,7 +162,7 @@ public class AWSJobConfigurationManager extends JobConfigurationManager {
final Properties properties = ConfigUtils.configToProperties(this.config);
properties.setProperty(ConfigurationKeys.JOB_CONFIG_FILE_GENERAL_PATH_KEY, jobConfigDir.getAbsolutePath());
- final List<Properties> jobConfigs = SchedulerUtils.loadGenericJobConfigs(properties);
+ final List<Properties> jobConfigs = SchedulerUtils.loadGenericJobConfigs(properties, this.jobSpecResolver);
LOGGER.info("Loaded " + jobConfigs.size() + " job configuration(s)");
for (Properties config : jobConfigs) {
LOGGER.debug("Config value: " + config);
diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/JobConfigurationManager.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/JobConfigurationManager.java
index 34ff439..c2ba8a4 100644
--- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/JobConfigurationManager.java
+++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/JobConfigurationManager.java
@@ -18,9 +18,11 @@
package org.apache.gobblin.cluster;
import java.io.File;
+import java.io.IOException;
import java.util.List;
import java.util.Properties;
+import org.apache.gobblin.runtime.job_spec.JobSpecResolver;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -66,6 +68,7 @@ public class JobConfigurationManager extends AbstractIdleService implements Stan
protected final EventBus eventBus;
protected final Config config;
protected Optional<String> jobConfDirPath;
+ protected final JobSpecResolver jobSpecResolver;
public JobConfigurationManager(EventBus eventBus, Config config) {
this.eventBus = eventBus;
@@ -74,6 +77,11 @@ public class JobConfigurationManager extends AbstractIdleService implements Stan
this.jobConfDirPath =
config.hasPath(GobblinClusterConfigurationKeys.JOB_CONF_PATH_KEY) ? Optional
.of(config.getString(GobblinClusterConfigurationKeys.JOB_CONF_PATH_KEY)) : Optional.<String>absent();
+ try {
+ this.jobSpecResolver = JobSpecResolver.builder(config).build();
+ } catch (IOException ioe) {
+ throw new RuntimeException(ioe);
+ }
}
@Override
@@ -94,7 +102,7 @@ public class JobConfigurationManager extends AbstractIdleService implements Stan
LOGGER.info("Loading job configurations from " + jobConfigDir);
Properties properties = ConfigUtils.configToProperties(this.config);
properties.setProperty(ConfigurationKeys.JOB_CONFIG_FILE_GENERAL_PATH_KEY, "file://" + jobConfigDir.getAbsolutePath());
- List<Properties> jobConfigs = SchedulerUtils.loadGenericJobConfigs(properties);
+ List<Properties> jobConfigs = SchedulerUtils.loadGenericJobConfigs(properties, this.jobSpecResolver);
LOGGER.info("Loaded " + jobConfigs.size() + " job configuration(s)");
for (Properties config : jobConfigs) {
postNewJobConfigArrival(config.getProperty(ConfigurationKeys.JOB_NAME_KEY), config);
diff --git a/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/azkaban/AzkabanJobLauncher.java b/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/azkaban/AzkabanJobLauncher.java
index 05c1bb9..6b99f81 100644
--- a/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/azkaban/AzkabanJobLauncher.java
+++ b/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/azkaban/AzkabanJobLauncher.java
@@ -32,6 +32,9 @@ import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
+import org.apache.gobblin.runtime.api.JobSpec;
+import org.apache.gobblin.runtime.job_spec.JobSpecResolver;
+import org.apache.gobblin.runtime.job_spec.ResolvedJobSpec;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.Credentials;
import org.apache.log4j.Level;
@@ -66,7 +69,6 @@ import org.apache.gobblin.runtime.JobLauncherFactory;
import org.apache.gobblin.runtime.app.ApplicationException;
import org.apache.gobblin.runtime.app.ApplicationLauncher;
import org.apache.gobblin.runtime.app.ServiceBasedAppLauncher;
-import org.apache.gobblin.runtime.job_catalog.PackagedTemplatesJobCatalogDecorator;
import org.apache.gobblin.runtime.listeners.CompositeJobListener;
import org.apache.gobblin.runtime.listeners.EmailNotificationJobListener;
import org.apache.gobblin.runtime.listeners.JobListener;
@@ -199,10 +201,13 @@ public class AzkabanJobLauncher extends AbstractJob implements ApplicationLaunch
Properties jobProps = this.props;
if (jobProps.containsKey(TEMPLATE_KEY)) {
+ Config config = ConfigUtils.propertiesToConfig(jobProps);
+ JobSpecResolver resolver = JobSpecResolver.builder(config).build();
+
URI templateUri = new URI(jobProps.getProperty(TEMPLATE_KEY));
- Config resolvedJob = new PackagedTemplatesJobCatalogDecorator().getTemplate(templateUri)
- .getResolvedConfig(ConfigUtils.propertiesToConfig(jobProps));
- jobProps = ConfigUtils.configToProperties(resolvedJob);
+ JobSpec jobSpec = JobSpec.builder().withConfig(config).withTemplate(templateUri).build();
+ ResolvedJobSpec resolvedJob = resolver.resolveJobSpec(jobSpec);
+ jobProps = ConfigUtils.configToProperties(resolvedJob.getConfig());
}
GobblinMetrics.addCustomTagsToProperties(jobProps, tags);
diff --git a/gobblin-modules/gobblin-azkaban/src/test/java/org/apache/gobblin/service/modules/orchestration/AzkabanProjectConfigTest.java b/gobblin-modules/gobblin-azkaban/src/test/java/org/apache/gobblin/service/modules/orchestration/AzkabanProjectConfigTest.java
index 3a48806..f9f83d3 100644
--- a/gobblin-modules/gobblin-azkaban/src/test/java/org/apache/gobblin/service/modules/orchestration/AzkabanProjectConfigTest.java
+++ b/gobblin-modules/gobblin-azkaban/src/test/java/org/apache/gobblin/service/modules/orchestration/AzkabanProjectConfigTest.java
@@ -40,7 +40,7 @@ public class AzkabanProjectConfigTest {
Properties properties = new Properties();
JobSpec jobSpec = new JobSpec(new URI("uri"), "0.0", "test job spec",
- ConfigUtils.propertiesToConfig(properties), properties, Optional.absent(), Collections.EMPTY_MAP);
+ ConfigUtils.propertiesToConfig(properties), properties, Optional.absent(), Optional.absent(), Collections.EMPTY_MAP);
AzkabanProjectConfig azkabanProjectConfig = new AzkabanProjectConfig(jobSpec);
String actualProjectName = azkabanProjectConfig.getAzkabanProjectName();
@@ -55,7 +55,7 @@ public class AzkabanProjectConfigTest {
Properties properties = new Properties();
properties.setProperty("gobblin.service.azkaban.project.namePrefix", "randomPrefix");
JobSpec jobSpec = new JobSpec(new URI("http://localhost:8000/context"), "0.0", "test job spec",
- ConfigUtils.propertiesToConfig(properties), properties, Optional.absent(), Collections.EMPTY_MAP);
+ ConfigUtils.propertiesToConfig(properties), properties, Optional.absent(), Optional.absent(), Collections.EMPTY_MAP);
AzkabanProjectConfig azkabanProjectConfig = new AzkabanProjectConfig(jobSpec);
String actualProjectName = azkabanProjectConfig.getAzkabanProjectName();
@@ -70,7 +70,8 @@ public class AzkabanProjectConfigTest {
Properties properties = new Properties();
properties.setProperty("gobblin.service.azkaban.project.namePrefix", "randomPrefixWithReallyLongName");
JobSpec jobSpec = new JobSpec(new URI("http://localhost:8000/context/that-keeps-expanding-and-explanding"),
- "0.0", "test job spec", ConfigUtils.propertiesToConfig(properties), properties, Optional.absent(), Collections.EMPTY_MAP);
+ "0.0", "test job spec", ConfigUtils.propertiesToConfig(properties), properties, Optional.absent(),
+ Optional.absent(), Collections.EMPTY_MAP);
AzkabanProjectConfig azkabanProjectConfig = new AzkabanProjectConfig(jobSpec);
String actualProjectName = azkabanProjectConfig.getAzkabanProjectName();
@@ -85,7 +86,7 @@ public class AzkabanProjectConfigTest {
Properties properties = new Properties();
properties.setProperty("gobblin.service.azkaban.project.namePrefix", "randomPrefix");
JobSpec jobSpec = new JobSpec(new URI("http://localhost:8000/context"), "0.0", "test job spec",
- ConfigUtils.propertiesToConfig(properties), properties, Optional.absent(), Collections.EMPTY_MAP);
+ ConfigUtils.propertiesToConfig(properties), properties, Optional.absent(), Optional.absent(), Collections.EMPTY_MAP);
AzkabanProjectConfig azkabanProjectConfig = new AzkabanProjectConfig(jobSpec);
String actualZipFileName = azkabanProjectConfig.getAzkabanProjectZipFilename();
@@ -100,7 +101,8 @@ public class AzkabanProjectConfigTest {
Properties properties = new Properties();
properties.setProperty("gobblin.service.azkaban.project.namePrefix", "randomPrefixWithReallyLongName");
JobSpec jobSpec = new JobSpec(new URI("http://localhost:8000/context/that-keeps-expanding-and-explanding"),
- "0.0", "test job spec", ConfigUtils.propertiesToConfig(properties), properties, Optional.absent(), Collections.EMPTY_MAP);
+ "0.0", "test job spec", ConfigUtils.propertiesToConfig(properties), properties, Optional.absent(),
+ Optional.absent(), Collections.EMPTY_MAP);
AzkabanProjectConfig azkabanProjectConfig = new AzkabanProjectConfig(jobSpec);
String actualZipFileName = azkabanProjectConfig.getAzkabanProjectZipFilename();
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/JobSpec.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/JobSpec.java
index 203ea8d..a6ca903 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/JobSpec.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/JobSpec.java
@@ -67,6 +67,8 @@ public class JobSpec implements Configurable, Spec {
Properties configAsProperties;
/** URI of {@link org.apache.gobblin.runtime.api.JobTemplate} to use. */
Optional<URI> templateURI;
+ /** Specific instance of template. */
+ transient Optional<JobTemplate> jobTemplate;
/** Metadata can contain properties which are not a part of config, e.g. Verb */
Map<String, String> metadata;
@@ -141,6 +143,7 @@ public class JobSpec implements Configurable, Spec {
private Optional<String> description = Optional.absent();
private Optional<URI> jobCatalogURI = Optional.absent();
private Optional<URI> templateURI = Optional.absent();
+ private Optional<JobTemplate> jobTemplate = Optional.absent();
private Optional<Map> metadata = Optional.absent();
public Builder(URI jobSpecUri) {
@@ -167,7 +170,7 @@ public class JobSpec implements Configurable, Spec {
Preconditions.checkNotNull(this.uri);
Preconditions.checkNotNull(this.version);
return new JobSpec(getURI(), getVersion(), getDescription(), getConfig(),
- getConfigAsProperties(), getTemplateURI(), getMetadata());
+ getConfigAsProperties(), getTemplateURI(), getTemplate(), getMetadata());
}
/** The scheme and authority of the job catalog URI are used to generate JobSpec URIs from
@@ -301,6 +304,16 @@ public class JobSpec implements Configurable, Spec {
return this;
}
+ public Optional<JobTemplate> getTemplate() {
+ return this.jobTemplate;
+ }
+
+ public Builder withTemplate(JobTemplate template) {
+ Preconditions.checkNotNull(template);
+ this.jobTemplate = Optional.of(template);
+ return this;
+ }
+
public Map getDefaultMetadata() {
log.warn("Job Spec Verb is not provided, using type 'UNKNOWN'.");
return ImmutableMap.of(VERB_KEY, SpecExecutor.Verb.UNKNOWN.name());
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/JobTemplate.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/JobTemplate.java
index 1be22fc..b6460d9 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/JobTemplate.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/JobTemplate.java
@@ -54,7 +54,8 @@ public interface JobTemplate extends Spec {
/**
* Return the combine configuration of template and user customized attributes.
- * @return
+ *
+ * Note, do not use this method directly, instead use {@link org.apache.gobblin.runtime.job_spec.ResolvedJobSpec}.
*/
Config getResolvedConfig(Config userProps) throws SpecNotFoundException, TemplateException;
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/SecureJobTemplate.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/SecureJobTemplate.java
new file mode 100644
index 0000000..c6386d3
--- /dev/null
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/SecureJobTemplate.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.runtime.api;
+
+import java.util.Collection;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import org.slf4j.Logger;
+
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+
+
+/**
+ * A secure template is a {@link JobTemplate} which only allows the user configuration to specify a static set of
+ * keys. This allows most of the template to be non overridable to tightly control how a job executes.
+ */
+public interface SecureJobTemplate extends JobTemplate {
+
+ /**
+ * Filter the user config to only preserve the keys allowed by a secure template.
+ */
+ static Config filterUserConfig(SecureJobTemplate template, Config userConfig, Logger logger) {
+ if (!template.isSecure()) {
+ return userConfig;
+ }
+ Config survivingConfig = ConfigFactory.empty();
+ for (String key : template.overridableProperties()) {
+ if (userConfig.hasPath(key)) {
+ survivingConfig = survivingConfig.withValue(key, userConfig.getValue(key));
+ userConfig = userConfig.withoutPath(key);
+ }
+ }
+
+ if (!userConfig.isEmpty()) {
+ logger.warn(String.format("Secure template %s ignored the following keys because they are not overridable: %s",
+ template.getUri().toString(),
+ userConfig.entrySet().stream().map(Map.Entry::getKey).collect(Collectors.joining(", "))));
+ }
+
+ return survivingConfig;
+ }
+
+ /**
+ * @return Whether the template is secure.
+ */
+ boolean isSecure();
+
+ /**
+ * @return If the template is secure, the collection of keys that can be overriden by the user.
+ */
+ Collection<String> overridableProperties();
+}
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/embedded/EmbeddedGobblin.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/embedded/EmbeddedGobblin.java
index dced320..1a1e858 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/embedded/EmbeddedGobblin.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/embedded/EmbeddedGobblin.java
@@ -38,6 +38,7 @@ import java.util.concurrent.locks.ReentrantLock;
import org.apache.avro.SchemaBuilder;
import org.apache.commons.lang3.ClassUtils;
+import org.apache.gobblin.runtime.job_spec.JobSpecResolver;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -439,19 +440,16 @@ public class EmbeddedGobblin {
} else {
Config finalConfig = userConfig.withFallback(sysProps);
if (this.template != null) {
- try {
- finalConfig = this.template.getResolvedConfig(finalConfig);
- } catch (SpecNotFoundException | JobTemplate.TemplateException exc) {
- throw new RuntimeException(exc);
- }
+ this.specBuilder.withTemplate(this.template);
}
jobSpec = this.specBuilder.withConfig(finalConfig).build();
}
ResolvedJobSpec resolvedJobSpec;
try {
- resolvedJobSpec = new ResolvedJobSpec(jobSpec);
- } catch (SpecNotFoundException | JobTemplate.TemplateException exc) {
+ JobSpecResolver resolver = JobSpecResolver.builder(sysProps).build();
+ resolvedJobSpec = resolver.resolveJobSpec(jobSpec);
+ } catch (SpecNotFoundException | JobTemplate.TemplateException | IOException exc) {
throw new RuntimeException("Failed to resolved template.", exc);
}
final JobCatalog jobCatalog = new StaticJobCatalog(Optional.of(this.useLog), Lists.<JobSpec>newArrayList(resolvedJobSpec));
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job_spec/JobResolutionCallbacks.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job_spec/JobResolutionCallbacks.java
new file mode 100644
index 0000000..5bd72ee
--- /dev/null
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job_spec/JobResolutionCallbacks.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.runtime.job_spec;
+
+import org.apache.gobblin.annotation.Alpha;
+import org.apache.gobblin.runtime.api.JobSpec;
+import org.apache.gobblin.runtime.api.JobTemplate;
+
+
+/**
+ * Callbacks executed during job resolution by {@link JobSpecResolver}.
+ */
+@Alpha
+public interface JobResolutionCallbacks {
+
+ /**
+ * Called before a job is resolved, providing the original job spec and template to be used.
+ */
+ void beforeResolution(JobSpecResolver jobSpecResolver, JobSpec jobSpec, JobTemplate jobTemplate)
+ throws JobTemplate.TemplateException;
+
+ /**
+ * Called after a job is resolved, providing the final resolved spec.
+ */
+ void afterResolution(JobSpecResolver jobSpecResolver, ResolvedJobSpec resolvedJobSpec);
+
+}
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job_spec/JobSpecResolver.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job_spec/JobSpecResolver.java
new file mode 100644
index 0000000..455e18d
--- /dev/null
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job_spec/JobSpecResolver.java
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.runtime.job_spec;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.gobblin.runtime.api.GobblinInstanceDriver;
+import org.apache.gobblin.runtime.api.JobCatalog;
+import org.apache.gobblin.runtime.api.JobCatalogWithTemplates;
+import org.apache.gobblin.runtime.api.JobSpec;
+import org.apache.gobblin.runtime.api.JobTemplate;
+import org.apache.gobblin.runtime.api.SpecNotFoundException;
+import org.apache.gobblin.runtime.job_catalog.InMemoryJobCatalog;
+import org.apache.gobblin.runtime.job_catalog.PackagedTemplatesJobCatalogDecorator;
+import org.apache.gobblin.util.ClassAliasResolver;
+import org.apache.gobblin.util.ConfigUtils;
+
+import com.google.common.base.Splitter;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigException;
+import com.typesafe.config.ConfigFactory;
+
+import lombok.AllArgsConstructor;
+
+
+/**
+ * Resolved {@link JobSpec}s into {@link ResolvedJobSpec}s, this means:
+ * - Applying the template to obtain the final job config.
+ * - Calling resolution callbacks.
+ */
+@AllArgsConstructor
+public class JobSpecResolver {
+
+ public static final String JOB_RESOLUTION_ACTIONS_KEY = "org.apache.gobblin.jobResolution.actions";
+
+ private final JobCatalogWithTemplates jobCatalog;
+ private final List<JobResolutionCallbacks> jobResolutionCallbacks;
+ private final Config sysConfig;
+
+ /**
+ * Obtain a mock {@link JobSpecResolver} with an empty configuration. It does not run any callbacks.
+ * @return
+ */
+ public static JobSpecResolver mock() {
+ try {
+ return new Builder().sysConfig(ConfigFactory.empty()).build();
+ } catch (IOException ioe) {
+ throw new RuntimeException("Unexpected error. This is an error in code.", ioe);
+ }
+ }
+
+ /**
+ * @return a Builder for {@link JobSpecResolver}.
+ */
+ public static Builder builder(GobblinInstanceDriver driver) throws IOException {
+ return new Builder().sysConfig(driver.getSysConfig().getConfig()).jobCatalog(driver.getJobCatalog());
+ }
+
+ /**
+ * @return a Builder for {@link JobSpecResolver}.
+ */
+ public static Builder builder(Config sysConfig) throws IOException {
+ return new Builder().sysConfig(sysConfig);
+ }
+
+ /**
+ * Used for building {@link JobSpecResolver}.
+ */
+ public static class Builder {
+ private JobCatalogWithTemplates jobCatalog = new PackagedTemplatesJobCatalogDecorator(new InMemoryJobCatalog());
+ private List<JobResolutionCallbacks> jobResolutionCallbacks = new ArrayList<>();
+ private Config sysConfig;
+
+ private Builder sysConfig(Config sysConfig) throws IOException {
+ this.sysConfig = sysConfig;
+ for (String action : Splitter.on(",").trimResults().omitEmptyStrings()
+ .split(ConfigUtils.getString(sysConfig, JOB_RESOLUTION_ACTIONS_KEY, ""))) {
+ try {
+ ClassAliasResolver<JobResolutionCallbacks> resolver = new ClassAliasResolver<>(JobResolutionCallbacks.class);
+ JobResolutionCallbacks actionInstance = resolver.resolveClass(action).newInstance();
+ this.jobResolutionCallbacks.add(actionInstance);
+ } catch (ReflectiveOperationException roe) {
+ throw new IOException(roe);
+ }
+ }
+ return this;
+ }
+
+ /**
+ * Set the job catalog where templates may be found.
+ */
+ public Builder jobCatalog(JobCatalog jobCatalog) {
+ this.jobCatalog = new PackagedTemplatesJobCatalogDecorator(jobCatalog);
+ return this;
+ }
+
+ /**
+ * Add a {@link JobResolutionCallbacks} to the resolution process.
+ */
+ public Builder jobResolutionAction(JobResolutionCallbacks action) {
+ this.jobResolutionCallbacks.add(action);
+ return this;
+ }
+
+ /**
+ * @return a {@link JobSpecResolver}
+ */
+ public JobSpecResolver build() {
+ return new JobSpecResolver(this.jobCatalog, this.jobResolutionCallbacks, this.sysConfig);
+ }
+ }
+
+ /**
+ * Resolve an input {@link JobSpec} applying any templates.
+ *
+ * @throws SpecNotFoundException If no template exists with the specified URI.
+ * @throws JobTemplate.TemplateException If the template throws an exception during resolution.
+ * @throws ConfigException If the job config cannot be resolved.
+ */
+ public ResolvedJobSpec resolveJobSpec(JobSpec jobSpec)
+ throws SpecNotFoundException, JobTemplate.TemplateException, ConfigException {
+ if (jobSpec instanceof ResolvedJobSpec) {
+ return (ResolvedJobSpec) jobSpec;
+ }
+
+ JobTemplate jobTemplate = null;
+ if (jobSpec.getJobTemplate().isPresent()) {
+ jobTemplate = jobSpec.getJobTemplate().get();
+ } else if (jobSpec.getTemplateURI().isPresent()) {
+ jobTemplate = jobCatalog.getTemplate(jobSpec.getTemplateURI().get());
+ }
+
+ for (JobResolutionCallbacks action : this.jobResolutionCallbacks) {
+ action.beforeResolution(this, jobSpec, jobTemplate);
+ }
+ Config resolvedConfig;
+ if (jobTemplate == null) {
+ resolvedConfig = jobSpec.getConfig().resolve();
+ } else {
+ resolvedConfig = jobTemplate.getResolvedConfig(jobSpec.getConfig()).resolve();
+ }
+ ResolvedJobSpec resolvedJobSpec = new ResolvedJobSpec(jobSpec, resolvedConfig);
+ for (JobResolutionCallbacks action : this.jobResolutionCallbacks) {
+ action.afterResolution(this, resolvedJobSpec);
+ }
+
+ return resolvedJobSpec;
+ }
+
+}
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job_spec/ResolvedJobSpec.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job_spec/ResolvedJobSpec.java
index b64f178..795e3b4 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job_spec/ResolvedJobSpec.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job_spec/ResolvedJobSpec.java
@@ -44,10 +44,18 @@ public class ResolvedJobSpec extends JobSpec {
@Getter
private final JobSpec originalJobSpec;
+ /**
+ * @deprecated Use {@link JobSpecResolver}
+ */
+ @Deprecated
public ResolvedJobSpec(JobSpec other) throws SpecNotFoundException, JobTemplate.TemplateException {
this(other, new InMemoryJobCatalog());
}
+ /**
+ * @deprecated Use {@link JobSpecResolver}
+ */
+ @Deprecated
public ResolvedJobSpec(JobSpec other, GobblinInstanceDriver driver)
throws SpecNotFoundException, JobTemplate.TemplateException {
this(other, driver.getJobCatalog());
@@ -55,11 +63,19 @@ public class ResolvedJobSpec extends JobSpec {
/**
* Resolve the job spec using classpath templates as well as any templates available in the input {@link JobCatalog}.
+ * @deprecated Use {@link JobSpecResolver}
*/
+ @Deprecated
public ResolvedJobSpec(JobSpec other, JobCatalog catalog)
throws SpecNotFoundException, JobTemplate.TemplateException {
super(other.getUri(), other.getVersion(), other.getDescription(), resolveConfig(other, catalog),
- ConfigUtils.configToProperties(resolveConfig(other, catalog)), other.getTemplateURI(), other.getMetadata());
+ ConfigUtils.configToProperties(resolveConfig(other, catalog)), other.getTemplateURI(), other.getJobTemplate(), other.getMetadata());
+ this.originalJobSpec = other;
+ }
+
+ ResolvedJobSpec(JobSpec other, Config resolvedConfig) {
+ super(other.getUri(), other.getVersion(), other.getDescription(), resolvedConfig,
+ ConfigUtils.configToProperties(resolvedConfig), other.getTemplateURI(), other.getJobTemplate(), other.getMetadata());
this.originalJobSpec = other;
}
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job_spec/SecureTemplateEnforcer.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job_spec/SecureTemplateEnforcer.java
new file mode 100644
index 0000000..2605c05
--- /dev/null
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job_spec/SecureTemplateEnforcer.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.runtime.job_spec;
+
+import org.apache.gobblin.runtime.api.JobSpec;
+import org.apache.gobblin.runtime.api.JobTemplate;
+import org.apache.gobblin.runtime.api.SecureJobTemplate;
+
+
+/**
+ * A {@link JobResolutionCallbacks} that blocks resolution of any job that does not use a secure template.
+ */
+public class SecureTemplateEnforcer implements JobResolutionCallbacks {
+
+ @Override
+ public void beforeResolution(JobSpecResolver jobSpecResolver, JobSpec jobSpec, JobTemplate jobTemplate)
+ throws JobTemplate.TemplateException {
+ if (!(jobTemplate instanceof SecureJobTemplate) || !((SecureJobTemplate) jobTemplate).isSecure()) {
+ throw new JobTemplate.TemplateException(String.format("Unallowed template resolution. %s is not a secure template.",
+ jobTemplate == null ? "null" : jobTemplate.getUri()));
+ }
+ }
+
+ @Override
+ public void afterResolution(JobSpecResolver jobSpecResolver, ResolvedJobSpec resolvedJobSpec) {
+ // NOOP
+ }
+}
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/template/StaticJobTemplate.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/template/StaticJobTemplate.java
index 6c69370..83e65f1 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/template/StaticJobTemplate.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/template/StaticJobTemplate.java
@@ -22,6 +22,7 @@ import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
+import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
@@ -33,17 +34,23 @@ import com.typesafe.config.Config;
import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.runtime.api.JobCatalogWithTemplates;
import org.apache.gobblin.runtime.api.JobTemplate;
+import org.apache.gobblin.runtime.api.SecureJobTemplate;
import org.apache.gobblin.runtime.api.SpecNotFoundException;
+import org.apache.gobblin.util.ConfigUtils;
import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
/**
* A {@link JobTemplate} using a static {@link Config} as the raw configuration for the template.
*/
-public class StaticJobTemplate extends InheritingJobTemplate {
+@Slf4j
+public class StaticJobTemplate extends InheritingJobTemplate implements SecureJobTemplate {
public static final String SUPER_TEMPLATE_KEY = "gobblin.template.inherit";
+ public static final String IS_SECURE_KEY = "gobblin.template.isSecure";
+ public static final String SECURE_OVERRIDABLE_PROPERTIES_KEYS = "gobblin.template.secure.overridableProperties";
private Config rawConfig;
private Set<String> requiredAttributes;
@@ -80,7 +87,7 @@ public class StaticJobTemplate extends InheritingJobTemplate {
throws TemplateException {
if (config.hasPath(SUPER_TEMPLATE_KEY)) {
List<URI> uris = Lists.newArrayList();
- for (String uriString : config.getString(SUPER_TEMPLATE_KEY).split(",")) {
+ for (String uriString : ConfigUtils.getStringList(config, SUPER_TEMPLATE_KEY)) {
try {
uris.add(new URI(uriString));
} catch (URISyntaxException use) {
@@ -105,6 +112,17 @@ public class StaticJobTemplate extends InheritingJobTemplate {
@Override
protected Config getLocallyResolvedConfig(Config userConfig) {
- return userConfig.withFallback(this.rawConfig);
+ Config filteredUserConfig = SecureJobTemplate.filterUserConfig(this, userConfig, log);
+ return filteredUserConfig.withFallback(this.rawConfig);
+ }
+
+ @Override
+ public boolean isSecure() {
+ return ConfigUtils.getBoolean(this.rawConfig, IS_SECURE_KEY, false);
+ }
+
+ @Override
+ public Collection<String> overridableProperties() {
+ return isSecure() ? ConfigUtils.getStringList(this.rawConfig, SECURE_OVERRIDABLE_PROPERTIES_KEYS) : Collections.emptyList();
}
}
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/scheduler/JobScheduler.java b/gobblin-runtime/src/main/java/org/apache/gobblin/scheduler/JobScheduler.java
index a675d99..fe3640a 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/scheduler/JobScheduler.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/scheduler/JobScheduler.java
@@ -33,7 +33,9 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.commons.configuration.ConfigurationException;
+import org.apache.gobblin.runtime.job_spec.JobSpecResolver;
import org.apache.gobblin.source.Source;
+import org.apache.gobblin.util.ConfigUtils;
import org.apache.hadoop.fs.Path;
import org.quartz.CronScheduleBuilder;
@@ -137,6 +139,9 @@ public class JobScheduler extends AbstractIdleService {
private final Closer closer = Closer.create();
@Getter
+ private final JobSpecResolver jobSpecResolver;
+
+ @Getter
private volatile boolean cancelRequested = false;
public JobScheduler(Properties properties, SchedulerService scheduler)
@@ -171,6 +176,8 @@ public class JobScheduler extends AbstractIdleService {
this.jobConfigFileDirPath = null;
this.listener = null;
}
+
+ this.jobSpecResolver = JobSpecResolver.builder(ConfigUtils.propertiesToConfig(properties)).build();
}
@Override
@@ -523,7 +530,7 @@ public class JobScheduler extends AbstractIdleService {
*/
private List<Properties> loadGeneralJobConfigs()
throws ConfigurationException, IOException {
- List<Properties> jobConfigs = SchedulerUtils.loadGenericJobConfigs(this.properties);
+ List<Properties> jobConfigs = SchedulerUtils.loadGenericJobConfigs(this.properties, this.jobSpecResolver);
LOG.info(String.format("Loaded %d job configurations", jobConfigs.size()));
return jobConfigs;
}
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/scheduler/PathAlterationListenerAdaptorForMonitor.java b/gobblin-runtime/src/main/java/org/apache/gobblin/scheduler/PathAlterationListenerAdaptorForMonitor.java
index 0b705e0..a4d837a 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/scheduler/PathAlterationListenerAdaptorForMonitor.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/scheduler/PathAlterationListenerAdaptorForMonitor.java
@@ -22,6 +22,7 @@ import java.util.Map;
import java.util.Properties;
import org.apache.commons.configuration.ConfigurationException;
+import org.apache.gobblin.runtime.job_spec.JobSpecResolver;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
@@ -51,11 +52,13 @@ public class PathAlterationListenerAdaptorForMonitor extends PathAlterationListe
JobScheduler jobScheduler;
/** Store path to job mappings. Required for correctly unscheduling. */
private final Map<Path, String> jobNameMap;
+ private final JobSpecResolver jobSpecResolver;
PathAlterationListenerAdaptorForMonitor(Path jobConfigFileDirPath, JobScheduler jobScheduler) {
this.jobConfigFileDirPath = jobConfigFileDirPath;
this.jobScheduler = jobScheduler;
this.jobNameMap = Maps.newConcurrentMap();
+ this.jobSpecResolver = jobScheduler.getJobSpecResolver();
}
private Path getJobPath(Properties jobProps) {
@@ -73,7 +76,7 @@ public class PathAlterationListenerAdaptorForMonitor extends PathAlterationListe
String customizedInfo = "";
try {
Properties jobProps =
- SchedulerUtils.loadGenericJobConfig(this.jobScheduler.properties, path, jobConfigFileDirPath);
+ SchedulerUtils.loadGenericJobConfig(this.jobScheduler.properties, path, jobConfigFileDirPath, this.jobSpecResolver);
LOG.debug("Loaded job properties: {}", jobProps);
switch (action) {
case SCHEDULE:
@@ -103,7 +106,7 @@ public class PathAlterationListenerAdaptorForMonitor extends PathAlterationListe
String customizedInfoResult = "";
try {
for (Properties jobProps : SchedulerUtils.loadGenericJobConfigs(jobScheduler.properties, path,
- jobConfigFileDirPath)) {
+ jobConfigFileDirPath, this.jobSpecResolver)) {
try {
switch (action) {
case SCHEDULE:
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/util/SchedulerUtils.java b/gobblin-runtime/src/main/java/org/apache/gobblin/util/SchedulerUtils.java
index 1174c62..efb3c2e 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/util/SchedulerUtils.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/util/SchedulerUtils.java
@@ -25,6 +25,8 @@ import java.util.Properties;
import java.util.Set;
import org.apache.commons.configuration.ConfigurationException;
+import org.apache.gobblin.runtime.api.JobSpec;
+import org.apache.gobblin.runtime.job_spec.JobSpecResolver;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.slf4j.Logger;
@@ -65,7 +67,7 @@ public class SchedulerUtils {
* @param sysProps Gobblin framework configuration properties
* @return a list of job configurations in the form of {@link java.util.Properties}
*/
- public static List<Properties> loadGenericJobConfigs(Properties sysProps)
+ public static List<Properties> loadGenericJobConfigs(Properties sysProps, JobSpecResolver resolver)
throws ConfigurationException, IOException {
Path rootPath = new Path(sysProps.getProperty(ConfigurationKeys.JOB_CONFIG_FILE_GENERAL_PATH_KEY));
PullFileLoader loader = new PullFileLoader(rootPath, rootPath.getFileSystem(new Configuration()),
@@ -77,7 +79,7 @@ public class SchedulerUtils {
List<Properties> jobConfigs = Lists.newArrayList();
for (Config config : configs) {
try {
- jobConfigs.add(resolveTemplate(ConfigUtils.configToProperties(config)));
+ jobConfigs.add(resolveTemplate(ConfigUtils.configToProperties(config), resolver));
} catch (IOException ioe) {
LOGGER.error("Could not parse job config at " + ConfigUtils.getString(config,
ConfigurationKeys.JOB_CONFIG_FILE_PATH_KEY, "Unknown path"), ioe);
@@ -96,7 +98,7 @@ public class SchedulerUtils {
* @return a list of job configurations in the form of {@link java.util.Properties}
*/
public static List<Properties> loadGenericJobConfigs(Properties sysProps, Path commonPropsPath,
- Path jobConfigPathDir)
+ Path jobConfigPathDir, JobSpecResolver resolver)
throws ConfigurationException, IOException {
PullFileLoader loader = new PullFileLoader(jobConfigPathDir, jobConfigPathDir.getFileSystem(new Configuration()),
@@ -108,7 +110,7 @@ public class SchedulerUtils {
List<Properties> jobConfigs = Lists.newArrayList();
for (Config config : configs) {
try {
- jobConfigs.add(resolveTemplate(ConfigUtils.configToProperties(config)));
+ jobConfigs.add(resolveTemplate(ConfigUtils.configToProperties(config), resolver));
} catch (IOException ioe) {
LOGGER.error("Could not parse job config at " + ConfigUtils.getString(config,
ConfigurationKeys.JOB_CONFIG_FILE_PATH_KEY, "Unknown path"), ioe);
@@ -126,15 +128,15 @@ public class SchedulerUtils {
* @param jobConfigPathDir root job configuration file directory
* @return a job configuration in the form of {@link java.util.Properties}
*/
- public static Properties loadGenericJobConfig(Properties sysProps, Path jobConfigPath, Path jobConfigPathDir)
- throws ConfigurationException, IOException {
+ public static Properties loadGenericJobConfig(Properties sysProps, Path jobConfigPath, Path jobConfigPathDir,
+ JobSpecResolver resolver) throws ConfigurationException, IOException {
PullFileLoader loader = new PullFileLoader(jobConfigPathDir, jobConfigPathDir.getFileSystem(new Configuration()),
getJobConfigurationFileExtensions(sysProps), PullFileLoader.DEFAULT_HOCON_PULL_FILE_EXTENSIONS);
Config sysConfig = ConfigUtils.propertiesToConfig(sysProps);
Config config = loader.loadPullFile(jobConfigPath, sysConfig, true);
- return resolveTemplate(ConfigUtils.configToProperties(config));
+ return resolveTemplate(ConfigUtils.configToProperties(config), resolver);
}
/**
@@ -168,18 +170,16 @@ public class SchedulerUtils {
}));
}
- private static Properties resolveTemplate(Properties jobProps) throws IOException {
+ private static Properties resolveTemplate(Properties jobProps, JobSpecResolver resolver) throws IOException {
try {
+ JobSpec.Builder jobSpecBuilder = JobSpec.builder().withConfig(ConfigUtils.propertiesToConfig(jobProps));
if (jobProps.containsKey(ConfigurationKeys.JOB_TEMPLATE_PATH)) {
- Config jobConfig = ConfigUtils.propertiesToConfig(jobProps);
- Properties resolvedProps = ConfigUtils.configToProperties((ResourceBasedJobTemplate
+ JobTemplate jobTemplate = ResourceBasedJobTemplate
.forResourcePath(jobProps.getProperty(ConfigurationKeys.JOB_TEMPLATE_PATH),
- new PackagedTemplatesJobCatalogDecorator()))
- .getResolvedConfig(jobConfig));
- return resolvedProps;
- } else {
- return jobProps;
+ new PackagedTemplatesJobCatalogDecorator());
+ jobSpecBuilder.withTemplate(jobTemplate);
}
+ return ConfigUtils.configToProperties(resolver.resolveJobSpec(jobSpecBuilder.build()).getConfig());
} catch (JobTemplate.TemplateException | SpecNotFoundException | URISyntaxException exc) {
throw new IOException(exc);
}
diff --git a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/api/SecureJobTemplateTest.java b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/api/SecureJobTemplateTest.java
new file mode 100644
index 0000000..4b30f89
--- /dev/null
+++ b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/api/SecureJobTemplateTest.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.runtime.api;
+
+import java.net.URI;
+
+import org.mockito.Mockito;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Lists;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+
+import lombok.extern.slf4j.Slf4j;
+
+
+@Slf4j
+public class SecureJobTemplateTest {
+
+ @Test
+ public void test() {
+
+ SecureJobTemplate template = Mockito.mock(SecureJobTemplate.class);
+ Mockito.when(template.getUri()).thenReturn(URI.create("my://template"));
+ Mockito.when(template.isSecure()).thenReturn(true);
+ Mockito.when(template.overridableProperties()).thenReturn(Lists.newArrayList("my.overridable.property1", "my.overridable.property2"));
+
+ // Test simple filtering
+ Config config = ConfigFactory.parseMap(ImmutableMap.of(
+ "someProperty", "foo",
+ "my.overridable.property1", "bar"
+ ));
+ Config result = SecureJobTemplate.filterUserConfig(template, config, log);
+ Assert.assertEquals(result.entrySet().size(), 1);
+ Assert.assertEquals(result.getString("my.overridable.property1"), "bar");
+ Assert.assertFalse(result.hasPath("someProperty"));
+ Assert.assertFalse(result.hasPath("my.overridable.property2"));
+
+ // Test allowing override of a subconfig
+ config = ConfigFactory.parseMap(ImmutableMap.of(
+ "someProperty", "foo",
+ "my.overridable.property1.key1", "bar",
+ "my.overridable.property1.key2", "baz"
+ ));
+ result = SecureJobTemplate.filterUserConfig(template, config, log);
+ Assert.assertEquals(result.entrySet().size(), 2);
+ Assert.assertEquals(result.getString("my.overridable.property1.key1"), "bar");
+ Assert.assertEquals(result.getString("my.overridable.property1.key2"), "baz");
+
+ // Test multiple overrides
+ config = ConfigFactory.parseMap(ImmutableMap.of(
+ "someProperty", "foo",
+ "my.overridable.property1", "bar",
+ "my.overridable.property2", "baz"
+ ));
+ result = SecureJobTemplate.filterUserConfig(template, config, log);
+ Assert.assertEquals(result.entrySet().size(), 2);
+ Assert.assertEquals(result.getString("my.overridable.property1"), "bar");
+ Assert.assertEquals(result.getString("my.overridable.property2"), "baz");
+ }
+
+}
diff --git a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/job_spec/JobSpecResolverTest.java b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/job_spec/JobSpecResolverTest.java
new file mode 100644
index 0000000..7ba28d7
--- /dev/null
+++ b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/job_spec/JobSpecResolverTest.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.runtime.job_spec;
+
+import java.net.URI;
+
+import org.apache.gobblin.runtime.api.JobCatalogWithTemplates;
+import org.apache.gobblin.runtime.api.JobSpec;
+import org.apache.gobblin.runtime.api.JobTemplate;
+import org.apache.gobblin.runtime.api.SecureJobTemplate;
+import org.mockito.Mockito;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import com.google.common.collect.ImmutableMap;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+
+
+public class JobSpecResolverTest {
+
+ @Test
+ public void test() throws Exception {
+
+ Config sysConfig = ConfigFactory.empty();
+
+ JobTemplate jobTemplate = Mockito.mock(JobTemplate.class);
+ Mockito.when(jobTemplate.getResolvedConfig(Mockito.any(Config.class))).thenAnswer(i -> {
+ Config userConfig = (Config) i.getArguments()[0];
+ return ConfigFactory.parseMap(ImmutableMap.of("template.value", "foo")).withFallback(userConfig);
+ });
+
+ JobCatalogWithTemplates catalog = Mockito.mock(JobCatalogWithTemplates.class);
+ Mockito.when(catalog.getTemplate(Mockito.eq(URI.create("my://template")))).thenReturn(jobTemplate);
+
+ JobSpecResolver resolver = JobSpecResolver.builder(sysConfig).jobCatalog(catalog).build();
+ JobSpec jobSpec = JobSpec.builder()
+ .withConfig(ConfigFactory.parseMap(ImmutableMap.of("key", "value")))
+ .withTemplate(URI.create("my://template")).build();
+ ResolvedJobSpec resolvedJobSpec = resolver.resolveJobSpec(jobSpec);
+
+ Assert.assertEquals(resolvedJobSpec.getOriginalJobSpec(), jobSpec);
+ Assert.assertEquals(resolvedJobSpec.getConfig().entrySet().size(), 2);
+ Assert.assertEquals(resolvedJobSpec.getConfig().getString("key"), "value");
+ Assert.assertEquals(resolvedJobSpec.getConfig().getString("template.value"), "foo");
+ }
+
+ @Test
+ public void testWithResolutionAction() throws Exception {
+
+ Config sysConfig = ConfigFactory.empty();
+
+ SecureJobTemplate insecureTemplate = Mockito.mock(SecureJobTemplate.class);
+ Mockito.when(insecureTemplate.getResolvedConfig(Mockito.any(Config.class))).thenAnswer(i -> (Config) i.getArguments()[0]);
+ Mockito.when(insecureTemplate.isSecure()).thenReturn(false);
+
+ SecureJobTemplate secureTemplate = Mockito.mock(SecureJobTemplate.class);
+ Mockito.when(secureTemplate.getResolvedConfig(Mockito.any(Config.class))).thenAnswer(i -> (Config) i.getArguments()[0]);
+ Mockito.when(secureTemplate.isSecure()).thenReturn(true);
+
+ JobCatalogWithTemplates catalog = Mockito.mock(JobCatalogWithTemplates.class);
+ Mockito.when(catalog.getTemplate(Mockito.eq(URI.create("my://template.insecure")))).thenReturn(insecureTemplate);
+ Mockito.when(catalog.getTemplate(Mockito.eq(URI.create("my://template.secure")))).thenReturn(secureTemplate);
+
+ JobSpecResolver resolver = JobSpecResolver.builder(sysConfig).jobCatalog(catalog)
+ // This resolution action should block any resolution that does not use a secure template
+ .jobResolutionAction(new SecureTemplateEnforcer()).build();
+
+ JobSpec jobSpec = JobSpec.builder()
+ .withConfig(ConfigFactory.parseMap(ImmutableMap.of("key", "value")))
+ .withTemplate(URI.create("my://template.insecure")).build();
+ Assert.expectThrows(JobTemplate.TemplateException.class, () -> resolver.resolveJobSpec(jobSpec));
+
+ JobSpec jobSpec2 = JobSpec.builder()
+ .withConfig(ConfigFactory.parseMap(ImmutableMap.of("key", "value")))
+ .withTemplate(URI.create("my://template.secure")).build();
+ ResolvedJobSpec resolvedJobSpec = resolver.resolveJobSpec(jobSpec2);
+
+ Assert.assertEquals(resolvedJobSpec.getOriginalJobSpec(), jobSpec2);
+ Assert.assertEquals(resolvedJobSpec.getConfig().entrySet().size(), 1);
+ Assert.assertEquals(resolvedJobSpec.getConfig().getString("key"), "value");
+ }
+
+
+}
diff --git a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/template/StaticJobTemplateTest.java b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/template/StaticJobTemplateTest.java
index ce42f8e..392c981 100644
--- a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/template/StaticJobTemplateTest.java
+++ b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/template/StaticJobTemplateTest.java
@@ -74,4 +74,29 @@ public class StaticJobTemplateTest {
Assert.assertEquals(resolved.getString("required2"), "r2");
}
+ @Test
+ public void testSecure() throws Exception {
+ Map<String, Object> confMap = Maps.newHashMap();
+ confMap.put("nonOverridableKey", "value1");
+ confMap.put("overridableKey", "value1");
+ confMap.put(StaticJobTemplate.IS_SECURE_KEY, true);
+ confMap.put(StaticJobTemplate.SECURE_OVERRIDABLE_PROPERTIES_KEYS, "overridableKey, overridableKey2");
+
+ StaticJobTemplate template = new StaticJobTemplate(URI.create("my://template"), "1", "desc", ConfigFactory.parseMap(confMap), null);
+
+ Config userConfig = ConfigFactory.parseMap(ImmutableMap.of(
+ "overridableKey", "override",
+ "overridableKey2", "override2",
+ "nonOverridableKey", "override",
+ "somethingElse", "override"));
+ Config resolved = template.getResolvedConfig(userConfig);
+
+ Assert.assertEquals(resolved.entrySet().size(), 5);
+ Assert.assertEquals(resolved.getString("nonOverridableKey"), "value1");
+ Assert.assertEquals(resolved.getString("overridableKey"), "override");
+ Assert.assertEquals(resolved.getString("overridableKey2"), "override2");
+ Assert.assertFalse(resolved.hasPath("somethingElse"));
+
+ }
+
}
\ No newline at end of file
diff --git a/gobblin-runtime/src/test/java/org/apache/gobblin/util/SchedulerUtilsTest.java b/gobblin-runtime/src/test/java/org/apache/gobblin/util/SchedulerUtilsTest.java
index 43696f8..476ba2a 100644
--- a/gobblin-runtime/src/test/java/org/apache/gobblin/util/SchedulerUtilsTest.java
+++ b/gobblin-runtime/src/test/java/org/apache/gobblin/util/SchedulerUtilsTest.java
@@ -19,6 +19,9 @@ package org.apache.gobblin.util;
import com.google.common.collect.Sets;
import com.google.common.io.Files;
+import com.typesafe.config.ConfigFactory;
+
+import org.apache.gobblin.runtime.job_spec.JobSpecResolver;
import org.apache.gobblin.util.filesystem.PathAlterationListener;
import org.apache.gobblin.util.filesystem.PathAlterationListenerAdaptor;
import org.apache.gobblin.util.filesystem.PathAlterationObserverScheduler;
@@ -118,7 +121,7 @@ public class SchedulerUtilsTest {
throws ConfigurationException, IOException {
Properties properties = new Properties();
properties.setProperty(ConfigurationKeys.JOB_CONFIG_FILE_GENERAL_PATH_KEY, this.jobConfigDir.getAbsolutePath());
- List<Properties> jobConfigs = SchedulerUtils.loadGenericJobConfigs(properties);
+ List<Properties> jobConfigs = SchedulerUtils.loadGenericJobConfigs(properties, JobSpecResolver.mock());
Assert.assertEquals(jobConfigs.size(), 4);
// test-job-conf-dir/test1/test11/test111.pull
@@ -174,7 +177,7 @@ public class SchedulerUtilsTest {
Properties properties = new Properties();
properties.setProperty(ConfigurationKeys.JOB_CONFIG_FILE_GENERAL_PATH_KEY, this.jobConfigDir.getAbsolutePath());
List<Properties> jobConfigs = SchedulerUtils.loadGenericJobConfigs(properties, commonPropsPath,
- new Path(this.jobConfigDir.getAbsolutePath()));
+ new Path(this.jobConfigDir.getAbsolutePath()), JobSpecResolver.mock());
Assert.assertEquals(jobConfigs.size(), 3);
// test-job-conf-dir/test1/test11/test111.pull
@@ -210,7 +213,8 @@ public class SchedulerUtilsTest {
Properties properties = new Properties();
properties.setProperty(ConfigurationKeys.JOB_CONFIG_FILE_GENERAL_PATH_KEY, this.jobConfigDir.getAbsolutePath());
Properties jobProps =
- SchedulerUtils.loadGenericJobConfig(properties, jobConfigPath, new Path(this.jobConfigDir.getAbsolutePath()));
+ SchedulerUtils.loadGenericJobConfig(properties, jobConfigPath, new Path(this.jobConfigDir.getAbsolutePath()),
+ JobSpecResolver.builder(ConfigFactory.empty()).build());
Assert.assertEquals(jobProps.stringPropertyNames().size(), 7);
Assert.assertTrue(jobProps.containsKey(ConfigurationKeys.JOB_CONFIG_FILE_DIR_KEY) || jobProps.containsKey(
@@ -275,12 +279,14 @@ public class SchedulerUtilsTest {
Path path = new Path(getClass().getClassLoader().getResource("schedulerUtilsTest").getFile());
Properties pullFile =
- SchedulerUtils.loadGenericJobConfig(new Properties(), new Path(path, "templated.pull"), path);
+ SchedulerUtils.loadGenericJobConfig(new Properties(), new Path(path, "templated.pull"), path,
+ JobSpecResolver.builder(ConfigFactory.empty()).build());
Assert.assertEquals(pullFile.getProperty("gobblin.dataset.pattern"), "pattern");
Assert.assertEquals(pullFile.getProperty("job.name"), "GobblinDatabaseCopyTest");
- List<Properties> jobConfigs = SchedulerUtils.loadGenericJobConfigs(new Properties(), new Path(path, "templated.pull"), path);
+ List<Properties> jobConfigs = SchedulerUtils.loadGenericJobConfigs(new Properties(), new Path(path, "templated.pull"),
+ path, JobSpecResolver.mock());
Properties pullFile2 = getJobConfigForFile(jobConfigs, "templated.pull");
Assert.assertEquals(pullFile2.getProperty("gobblin.dataset.pattern"), "pattern");
@@ -288,7 +294,7 @@ public class SchedulerUtilsTest {
Properties props = new Properties();
props.put(ConfigurationKeys.JOB_CONFIG_FILE_GENERAL_PATH_KEY, path.toString());
- List<Properties> jobConfigs3 = SchedulerUtils.loadGenericJobConfigs(props);
+ List<Properties> jobConfigs3 = SchedulerUtils.loadGenericJobConfigs(props, JobSpecResolver.mock());
Properties pullFile3 = getJobConfigForFile(jobConfigs3, "templated.pull");
Assert.assertEquals(pullFile3.getProperty("gobblin.dataset.pattern"), "pattern");
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/template/StaticFlowTemplate.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/template/StaticFlowTemplate.java
index 5f8dfda..53c8e37 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/template/StaticFlowTemplate.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/template/StaticFlowTemplate.java
@@ -29,6 +29,7 @@ import org.apache.commons.lang3.tuple.Pair;
import com.google.common.base.Joiner;
import com.google.common.collect.Lists;
import com.typesafe.config.Config;
+import com.typesafe.config.ConfigException;
import com.typesafe.config.ConfigResolveOptions;
import com.typesafe.config.ConfigValueFactory;
@@ -37,8 +38,11 @@ import lombok.extern.slf4j.Slf4j;
import org.apache.gobblin.annotation.Alpha;
import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.runtime.api.JobSpec;
import org.apache.gobblin.runtime.api.JobTemplate;
import org.apache.gobblin.runtime.api.SpecNotFoundException;
+import org.apache.gobblin.runtime.job_spec.JobSpecResolver;
+import org.apache.gobblin.runtime.job_spec.ResolvedJobSpec;
import org.apache.gobblin.service.modules.dataset.DatasetDescriptor;
import org.apache.gobblin.service.modules.flowgraph.DatasetDescriptorConfigKeys;
import org.apache.gobblin.service.modules.template_catalog.FlowCatalogWithTemplates;
@@ -66,6 +70,8 @@ public class StaticFlowTemplate implements FlowTemplate {
private transient Config rawConfig;
+ private final transient JobSpecResolver jobSpecResolver;
+
public StaticFlowTemplate(URI flowTemplateDirUri, String version, String description, Config config,
FlowCatalogWithTemplates catalog)
throws IOException, SpecNotFoundException, JobTemplate.TemplateException, URISyntaxException {
@@ -75,6 +81,7 @@ public class StaticFlowTemplate implements FlowTemplate {
this.rawConfig = config;
this.catalog = catalog;
this.jobTemplates = this.catalog.getJobTemplatesForFlow(flowTemplateDirUri);
+ this.jobSpecResolver = JobSpecResolver.builder(config).build();
}
//Constructor for testing purposes
@@ -85,6 +92,11 @@ public class StaticFlowTemplate implements FlowTemplate {
this.rawConfig = config;
this.catalog = catalog;
this.jobTemplates = jobTemplates;
+ try {
+ this.jobSpecResolver = JobSpecResolver.builder(config).build();
+ } catch (IOException ioe) {
+ throw new RuntimeException(ioe);
+ }
}
@@ -153,17 +165,17 @@ public class StaticFlowTemplate implements FlowTemplate {
* @return true if the {@link FlowTemplate} is resolvable
*/
@Override
- public boolean isResolvable(Config userConfig, DatasetDescriptor inputDescriptor, DatasetDescriptor outputDescriptor)
- throws SpecNotFoundException, JobTemplate.TemplateException {
+ public boolean isResolvable(Config userConfig, DatasetDescriptor inputDescriptor, DatasetDescriptor outputDescriptor) {
Config inputDescriptorConfig = inputDescriptor.getRawConfig().atPath(DatasetDescriptorConfigKeys.FLOW_EDGE_INPUT_DATASET_DESCRIPTOR_PREFIX);
Config outputDescriptorConfig = outputDescriptor.getRawConfig().atPath(DatasetDescriptorConfigKeys.FLOW_EDGE_OUTPUT_DATASET_DESCRIPTOR_PREFIX);
userConfig = userConfig.withFallback(inputDescriptorConfig).withFallback(outputDescriptorConfig);
- ConfigResolveOptions resolveOptions = ConfigResolveOptions.defaults().setAllowUnresolved(true);
+ JobSpec.Builder jobSpecBuilder = JobSpec.builder().withConfig(userConfig);
for (JobTemplate template: this.jobTemplates) {
- Config templateConfig = template.getResolvedConfig(userConfig).resolve(resolveOptions);
- if (!template.getResolvedConfig(userConfig).resolve(resolveOptions).isResolved()) {
+ try {
+ this.jobSpecResolver.resolveJobSpec(jobSpecBuilder.withTemplate(template).build());
+ } catch (JobTemplate.TemplateException | ConfigException | SpecNotFoundException exc) {
return false;
}
}
@@ -179,9 +191,11 @@ public class StaticFlowTemplate implements FlowTemplate {
userConfig = userConfig.withFallback(inputDescriptorConfig).withFallback(outputDescriptorConfig);
List<Config> resolvedJobConfigs = new ArrayList<>();
+ JobSpec.Builder jobSpecBuilder = JobSpec.builder().withConfig(userConfig);
for (JobTemplate jobTemplate: getJobTemplates()) {
- Config resolvedJobConfig = jobTemplate.getResolvedConfig(userConfig).resolve().withValue(
- ConfigurationKeys.JOB_TEMPLATE_PATH, ConfigValueFactory.fromAnyRef(jobTemplate.getUri().toString()));;
+ ResolvedJobSpec resolvedJobSpec = this.jobSpecResolver.resolveJobSpec(jobSpecBuilder.withTemplate(jobTemplate).build());
+ Config resolvedJobConfig = resolvedJobSpec.getConfig().withValue(
+ ConfigurationKeys.JOB_TEMPLATE_PATH, ConfigValueFactory.fromAnyRef(jobTemplate.getUri().toString()));
resolvedJobConfigs.add(resolvedJobConfig);
}
return resolvedJobConfigs;