You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by su...@apache.org on 2019/11/15 22:45:00 UTC
[incubator-gobblin] branch master updated: [GOBBLIN-960] Resolving
multiple templates in top-level
This is an automated email from the ASF dual-hosted git repository.
suvasude pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 9f0218b [GOBBLIN-960] Resolving multiple templates in top-level
9f0218b is described below
commit 9f0218b276dbf18cd31af11642dcf23230ebcdfc
Author: autumnust <le...@linkedin.com>
AuthorDate: Fri Nov 15 14:44:53 2019 -0800
[GOBBLIN-960] Resolving multiple templates in top-level
Closes #2809 from autumnust/multiple-templates
---
.../gobblin/runtime/AbstractJobLauncher.java | 27 ++++--
.../org/apache/gobblin/runtime/api/JobSpec.java | 38 +++++++-
.../runtime/template/InheritingJobTemplate.java | 104 +++++++++++++++------
.../runtime/template/StaticJobTemplate.java | 16 +++-
.../gobblin/runtime/LocalJobLauncherTest.java | 81 +++++++++++-----
.../runtime/template/StaticJobTemplateTest.java | 29 +++++-
.../test-multitemplate-with-inheritance.template | 18 ++++
.../resources/templates/test-overwrite.template | 20 ++++
.../org/apache/gobblin/util/AvroFlattener.java | 4 +-
9 files changed, 267 insertions(+), 70 deletions(-)
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/AbstractJobLauncher.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/AbstractJobLauncher.java
index d9241c5..f7cecd4 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/AbstractJobLauncher.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/AbstractJobLauncher.java
@@ -20,6 +20,8 @@ package org.apache.gobblin.runtime;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Properties;
@@ -116,6 +118,12 @@ public abstract class AbstractJobLauncher implements JobLauncher {
public static final String GOBBLIN_JOB_TEMPLATE_KEY = "gobblin.template.uri";
+ /** Making {@link AbstractJobLauncher} capable of loading multiple job templates.
+ * Keep the original {@link #GOBBLIN_JOB_TEMPLATE_KEY} for backward-compatibility.
+ * TODO: Expand support to Gobblin-as-a-Service in FlowTemplateCatalog.
+ * */
+ public static final String GOBBLIN_JOB_MULTI_TEMPLATE_KEY = "gobblin.template.uris";
+
// Job configuration properties
protected final Properties jobProps;
@@ -236,14 +244,21 @@ public abstract class AbstractJobLauncher implements JobLauncher {
public static void resolveGobblinJobTemplateIfNecessary(Properties jobProps) throws IOException, URISyntaxException,
SpecNotFoundException,
JobTemplate.TemplateException {
+ Config config = ConfigUtils.propertiesToConfig(jobProps);
+ JobSpecResolver resolver = JobSpecResolver.builder(config).build();
+ JobSpec jobSpec = null;
if (jobProps.containsKey(GOBBLIN_JOB_TEMPLATE_KEY)) {
- Config config = ConfigUtils.propertiesToConfig(jobProps);
- JobSpecResolver resolver = JobSpecResolver.builder(config).build();
-
URI templateUri = new URI(jobProps.getProperty(GOBBLIN_JOB_TEMPLATE_KEY));
- JobSpec jobSpec = JobSpec.builder().withConfig(config).withTemplate(templateUri).build();
- ResolvedJobSpec resolvedJob = resolver.resolveJobSpec(jobSpec);
- jobProps.putAll(ConfigUtils.configToProperties(resolvedJob.getConfig()));
+ jobSpec = JobSpec.builder().withConfig(config).withTemplate(templateUri).build();
+ } else if (jobProps.containsKey(GOBBLIN_JOB_MULTI_TEMPLATE_KEY)) {
+ List<URI> templatesURIs = new ArrayList<>();
+ for (String uri : jobProps.getProperty(GOBBLIN_JOB_MULTI_TEMPLATE_KEY).split(",")) {
+ templatesURIs.add(new URI(uri));
+ }
+ jobSpec = JobSpec.builder().withConfig(config).withResourceTemplates(templatesURIs).build();
+ }
+ if (jobSpec != null ) {
+ jobProps.putAll(ConfigUtils.configToProperties(resolver.resolveJobSpec(jobSpec).getConfig()));
}
}
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/JobSpec.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/JobSpec.java
index a6ca903..2aadad4 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/JobSpec.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/JobSpec.java
@@ -19,9 +19,18 @@ package org.apache.gobblin.runtime.api;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.List;
import java.util.Map;
import java.util.Properties;
+import org.apache.gobblin.annotation.Alpha;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.runtime.JobState;
+import org.apache.gobblin.runtime.template.ResourceBasedJobTemplate;
+import org.apache.gobblin.runtime.template.StaticJobTemplate;
+import org.apache.gobblin.util.ConfigUtils;
+
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
@@ -29,11 +38,6 @@ import com.google.common.collect.ImmutableMap;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
-import org.apache.gobblin.annotation.Alpha;
-import org.apache.gobblin.configuration.ConfigurationKeys;
-import org.apache.gobblin.runtime.JobState;
-import org.apache.gobblin.util.ConfigUtils;
-
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
@@ -75,6 +79,7 @@ public class JobSpec implements Configurable, Spec {
/** A Verb identifies if the Spec is for Insert/Update/Delete */
public static final String VERB_KEY = "Verb";
+ private static final String IN_MEMORY_TEMPLATE_URI = "inmemory";
public static Builder builder(URI jobSpecUri) {
return new Builder(jobSpecUri);
@@ -304,6 +309,29 @@ public class JobSpec implements Configurable, Spec {
return this;
}
+ /**
+ * As the public interface of {@link JobSpec} doesn't really support multiple templates,
+ * for incoming list of template uris we will consolidate them as well as resolving. The resolved Config
+ * will not be materialized but reside only in memory through the lifecycle.
+ *
+ * Restriction: This method assumes no customized jobTemplateCatalog and uses classpath resources as the input.
+ * Also, the order of list matters: The former one will be overwritten by the latter.
+ */
+ public Builder withResourceTemplates(List<URI> templateURIs) {
+ try {
+ List<JobTemplate> templates = new ArrayList<>();
+ for(URI uri : templateURIs) {
+ templates.add(ResourceBasedJobTemplate.forResourcePath(uri.getPath()));
+ }
+ this.jobTemplate = Optional.of(new StaticJobTemplate(new URI(IN_MEMORY_TEMPLATE_URI), "", "",
+ ConfigFactory.empty(), templates));
+
+ } catch (URISyntaxException | SpecNotFoundException | JobTemplate.TemplateException | IOException e) {
+ throw new RuntimeException("Fatal exception: Templates couldn't be resolved properly, ", e);
+ }
+ return this;
+ }
+
public Optional<JobTemplate> getTemplate() {
return this.jobTemplate;
}
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/template/InheritingJobTemplate.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/template/InheritingJobTemplate.java
index d542ed5..dbe5458 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/template/InheritingJobTemplate.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/template/InheritingJobTemplate.java
@@ -51,11 +51,15 @@ public abstract class InheritingJobTemplate implements JobTemplate {
this.resolved = false;
}
- public InheritingJobTemplate(List<JobTemplate> superTemplates) {
+ public InheritingJobTemplate(List<JobTemplate> superTemplates, boolean skipResolve) {
this.superTemplateUris = Lists.newArrayList();
this.catalog = null;
this.superTemplates = superTemplates;
- this.resolved = true;
+ this.resolved = skipResolve;
+ }
+
+ public InheritingJobTemplate(List<JobTemplate> superTemplates) {
+ this(superTemplates, true);
}
/**
@@ -71,27 +75,62 @@ public abstract class InheritingJobTemplate implements JobTemplate {
resolveTemplates(loadedTemplates);
}
+ /**
+ * Resolve all superTemplates being field variables within the class.
+ * There are two types of resolution being involved in this method:
+ * 1) When all templates are being represented as {@link #superTemplateUris}, the actual template will be loaded from
+ * catalog first and enter resolution process. The physical {@link #superTemplates} are being materialized after that.
+ * 2) org.apache.gobblin.runtime.template.InheritingJobTemplate#InheritingJobTemplate(java.util.List, boolean) provides
+ * interface to directly provide physical {@link #superTemplates}. This case is determined by non-null containers of
+ * {@link #superTemplates}.
+ *
+ */
private void resolveTemplates(Map<URI, JobTemplate> loadedTemplates) throws SpecNotFoundException, TemplateException {
if (this.resolved) {
return;
}
- this.superTemplates = Lists.newArrayList();
- for (URI uri : this.superTemplateUris) {
- if (!loadedTemplates.containsKey(uri)) {
- JobTemplate newTemplate = this.catalog.getTemplate(uri);
- loadedTemplates.put(uri, newTemplate);
- if (newTemplate instanceof InheritingJobTemplate) {
- ((InheritingJobTemplate) newTemplate).resolveTemplates(loadedTemplates);
+
+ if (this.superTemplateUris.size() > 0) {
+ this.superTemplates = Lists.newArrayList();
+ for (URI uri : this.superTemplateUris) {
+ if (!loadedTemplates.containsKey(uri)) {
+ JobTemplate newTemplate = this.catalog.getTemplate(uri);
+ resolveTemplateRecursionHelper(newTemplate, uri, loadedTemplates);
+ }
+ this.superTemplates.add(loadedTemplates.get(uri));
+ }
+ } else if (superTemplates != null ) {
+ for (JobTemplate newTemplate : this.superTemplates) {
+ if (!loadedTemplates.containsKey(newTemplate.getUri())) {
+ resolveTemplateRecursionHelper(newTemplate, newTemplate.getUri(), loadedTemplates);
}
}
- this.superTemplates.add(loadedTemplates.get(uri));
}
+
this.resolved = true;
}
+ /**
+ * The canonicalURI needs to be there when the template needs to loaded from catalog, as the format are adjusted
+ * while constructing the template.
+ * Essentially, jobCatalog.load(templateUris.get(0)).getUri().equal(templateUris.get(0)) return false.
+ */
+ private void resolveTemplateRecursionHelper(JobTemplate newTemplate, URI canonicalURI, Map<URI, JobTemplate> loadedTemplates)
+ throws SpecNotFoundException, TemplateException {
+ loadedTemplates.put(canonicalURI, newTemplate);
+ if (newTemplate instanceof InheritingJobTemplate) {
+ ((InheritingJobTemplate) newTemplate).resolveTemplates(loadedTemplates);
+ }
+ }
+
public Collection<JobTemplate> getSuperTemplates() throws SpecNotFoundException, TemplateException {
ensureTemplatesResolved();
- return ImmutableList.copyOf(this.superTemplates);
+
+ if (superTemplates != null ) {
+ return ImmutableList.copyOf(this.superTemplates);
+ } else {
+ return ImmutableList.of();
+ }
}
@Override
@@ -103,13 +142,14 @@ public abstract class InheritingJobTemplate implements JobTemplate {
private Config getRawTemplateConfigHelper(Set<JobTemplate> alreadyInheritedTemplates)
throws SpecNotFoundException, TemplateException {
Config rawTemplate = getLocalRawTemplate();
- for (JobTemplate template : Lists.reverse(this.superTemplates)) {
- if (!alreadyInheritedTemplates.contains(template)) {
- alreadyInheritedTemplates.add(template);
- Config thisFallback = template instanceof InheritingJobTemplate
- ? ((InheritingJobTemplate) template).getRawTemplateConfigHelper(alreadyInheritedTemplates)
- : template.getRawTemplateConfig();
- rawTemplate = rawTemplate.withFallback(thisFallback);
+ if (this.superTemplates != null) {
+ for (JobTemplate template : Lists.reverse(this.superTemplates)) {
+ if (!alreadyInheritedTemplates.contains(template)) {
+ alreadyInheritedTemplates.add(template);
+ Config thisFallback = template instanceof InheritingJobTemplate ? ((InheritingJobTemplate) template).getRawTemplateConfigHelper(alreadyInheritedTemplates)
+ : template.getRawTemplateConfig();
+ rawTemplate = rawTemplate.withFallback(thisFallback);
+ }
}
}
return rawTemplate;
@@ -136,12 +176,13 @@ public abstract class InheritingJobTemplate implements JobTemplate {
private Set<String> getRequiredConfigListHelper(Set<JobTemplate> alreadyLoadedTemplates)
throws SpecNotFoundException, TemplateException {
Set<String> requiredConfigs = Sets.newHashSet(getLocallyRequiredConfigList());
- for (JobTemplate template : this.superTemplates) {
- if (!alreadyLoadedTemplates.contains(template)) {
- alreadyLoadedTemplates.add(template);
- requiredConfigs.addAll(template instanceof InheritingJobTemplate
- ? ((InheritingJobTemplate) template).getRequiredConfigListHelper(alreadyLoadedTemplates)
- : template.getRequiredConfigList());
+ if (this.superTemplates != null) {
+ for (JobTemplate template : this.superTemplates) {
+ if (!alreadyLoadedTemplates.contains(template)) {
+ alreadyLoadedTemplates.add(template);
+ requiredConfigs.addAll(template instanceof InheritingJobTemplate ? ((InheritingJobTemplate) template).getRequiredConfigListHelper(alreadyLoadedTemplates)
+ : template.getRequiredConfigList());
+ }
}
}
return requiredConfigs;
@@ -158,13 +199,14 @@ public abstract class InheritingJobTemplate implements JobTemplate {
private Config getResolvedConfigHelper(Config userConfig, Set<JobTemplate> alreadyLoadedTemplates)
throws SpecNotFoundException, TemplateException {
Config config = getLocallyResolvedConfig(userConfig);
- for (JobTemplate template : Lists.reverse(this.superTemplates)) {
- if (!alreadyLoadedTemplates.contains(template)) {
- alreadyLoadedTemplates.add(template);
- Config fallback = template instanceof InheritingJobTemplate
- ? ((InheritingJobTemplate) template).getResolvedConfigHelper(config, alreadyLoadedTemplates)
- : template.getResolvedConfig(config);
- config = config.withFallback(fallback);
+ if (superTemplates != null ) {
+ for (JobTemplate template : Lists.reverse(this.superTemplates)) {
+ if (!alreadyLoadedTemplates.contains(template)) {
+ alreadyLoadedTemplates.add(template);
+ Config fallback = template instanceof InheritingJobTemplate ? ((InheritingJobTemplate) template)
+ .getResolvedConfigHelper(config, alreadyLoadedTemplates) : template.getResolvedConfig(config);
+ config = config.withFallback(fallback);
+ }
}
}
return config;
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/template/StaticJobTemplate.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/template/StaticJobTemplate.java
index 83e65f1..6472383 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/template/StaticJobTemplate.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/template/StaticJobTemplate.java
@@ -64,13 +64,23 @@ public class StaticJobTemplate extends InheritingJobTemplate implements SecureJo
private Collection<String> dependencies;
public StaticJobTemplate(URI uri, String version, String description, Config config, JobCatalogWithTemplates catalog)
- throws SpecNotFoundException, TemplateException {
+ throws TemplateException {
this(uri, version, description, config, getSuperTemplateUris(config), catalog);
}
+ /** An constructor that materialize multiple templates into a single static template
+ * The constructor provided multiple in-memory templates as the input instead of templateURIs
+ * */
+ public StaticJobTemplate(URI uri, String version, String description, Config config, List<JobTemplate> templates) {
+ super(templates, false);
+ this.uri = uri;
+ this.version = version;
+ this.rawConfig = config;
+ this.description = description;
+ }
+
protected StaticJobTemplate(URI uri, String version, String description, Config config, List<URI> superTemplateUris,
- JobCatalogWithTemplates catalog)
- throws SpecNotFoundException, TemplateException {
+ JobCatalogWithTemplates catalog) {
super(superTemplateUris, catalog);
this.uri = uri;
this.version = version;
diff --git a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/LocalJobLauncherTest.java b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/LocalJobLauncherTest.java
index 3715082..1d781c7 100644
--- a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/LocalJobLauncherTest.java
+++ b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/LocalJobLauncherTest.java
@@ -21,32 +21,25 @@ import java.io.FileReader;
import java.io.IOException;
import java.util.Properties;
-import org.apache.gobblin.runtime.AbstractJobLauncher;
-import org.apache.gobblin.runtime.JobContext;
-import org.apache.gobblin.runtime.JobLauncher;
-import org.apache.gobblin.runtime.JobLauncherFactory;
-import org.apache.gobblin.runtime.local.LocalJobLauncher;
-import org.apache.gobblin.util.JobLauncherUtils;
-import org.junit.Assert;
-import org.testng.annotations.AfterClass;
-import org.testng.annotations.BeforeClass;
-import org.testng.annotations.Test;
-
import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.metastore.FsStateStore;
import org.apache.gobblin.metastore.StateStore;
-
import org.apache.gobblin.metastore.testing.ITestMetastoreDatabase;
import org.apache.gobblin.metastore.testing.TestMetastoreDatabaseFactory;
-import org.apache.gobblin.runtime.JobLauncherTestHelper;
-import org.apache.gobblin.runtime.JobState;
+import org.apache.gobblin.runtime.local.LocalJobLauncher;
+import org.apache.gobblin.util.JobLauncherUtils;
import org.apache.gobblin.util.limiter.BaseLimiterType;
import org.apache.gobblin.util.limiter.DefaultLimiterFactory;
import org.apache.gobblin.writer.Destination;
import org.apache.gobblin.writer.WriterOutputFormat;
+import org.junit.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
import com.google.common.io.Closer;
+import static org.apache.gobblin.runtime.AbstractJobLauncher.GOBBLIN_JOB_MULTI_TEMPLATE_KEY;
import static org.apache.gobblin.runtime.AbstractJobLauncher.GOBBLIN_JOB_TEMPLATE_KEY;
@@ -100,22 +93,66 @@ public class LocalJobLauncherTest {
jobProps.setProperty("job.name", "beforeResolution");
jobProps.setProperty(GOBBLIN_JOB_TEMPLATE_KEY, "resource:///templates/distcp-ng.template");
+ JobContext jobContext = dummyJobContextInitHelper(jobProps);
+ // Indicating resolution succeeded.
+ // 1) User config not being overloaded by template
+ // 2) Conf that not appearing in the user-config being populated by template
+ System.out.println(jobContext.getJobState());
+ Assert.assertEquals(jobContext.getJobState().getProp("job.name"), "beforeResolution");
+ Assert.assertEquals(jobContext.getJobState().getProp("distcp.persist.dir"), "/tmp/distcp-persist-dir");
+ }
+
+ @Test
+ public void testMultipleJobTemplatesResoluion() throws Exception {
+ Properties jobProps = loadJobProps();
+ // Job Name shouldn't be overwritten by any templates and the precedence of template is lower than job configuration.
+ String jobId = JobLauncherUtils.newJobId("beforeResolution");
+ jobProps.setProperty(ConfigurationKeys.JOB_ID_KEY, jobId);
+ jobProps.setProperty("job.name", "beforeResolution");
+ jobProps.setProperty(GOBBLIN_JOB_MULTI_TEMPLATE_KEY,
+ "resource:///templates/test.template,resource:///templates/test-overwrite.template");
+
+ JobContext jobContext = dummyJobContextInitHelper(jobProps);
+
+ // Verifying multi-resolution happens and it respect the precedency.
+ Assert.assertEquals(jobContext.getJobState().getProp("job.name"), "beforeResolution");
+ Assert.assertEquals(jobContext.getJobState().getProp("templated0"), "x_x");
+ Assert.assertEquals(jobContext.getJobState().getProp("templated1"), "y_y");
+
+ // Verifying the order of template list matters.
+ jobProps.setProperty(GOBBLIN_JOB_MULTI_TEMPLATE_KEY,
+ "resource:///templates/test-overwrite.template,resource:///templates/test.template");
+ jobContext = dummyJobContextInitHelper(jobProps);
+ Assert.assertEquals(jobContext.getJobState().getProp("job.name"), "beforeResolution");
+ Assert.assertEquals(jobContext.getJobState().getProp("templated0"), "x");
+ Assert.assertEquals(jobContext.getJobState().getProp("templated1"), "y");
+
+
+ // Verify multi-resolution with inheritance.
+ jobProps.setProperty(GOBBLIN_JOB_MULTI_TEMPLATE_KEY,
+ "resource:///templates/test-multitemplate-with-inheritance.template,resource:///templates/test.template");
+ jobContext = dummyJobContextInitHelper(jobProps);
+ Assert.assertEquals(jobContext.getJobState().getProp("templated0"), "x");
+ Assert.assertEquals(jobContext.getJobState().getProp("templated1"), "y");
+ Assert.assertEquals(jobContext.getJobState().getProp("job.name"), "beforeResolution");
+ // Picked an distcp-specific configuration that there's no default value being set in jobConf.
+ Assert.assertEquals(jobContext.getJobState().getProp("distcp.persist.dir"), "/tmp/distcp-persist-dir");
+ }
+
+ /**
+ * Initialize a jobContext by initializing jobLauncher. This code is mostly used for
+ * testing job templates resolution.
+ */
+ private JobContext dummyJobContextInitHelper(Properties jobProps) throws Exception {
JobContext jobContext = null;
Closer closer = Closer.create();
try {
JobLauncher jobLauncher = closer.register(JobLauncherFactory.newJobLauncher(this.launcherProps, jobProps));
- jobContext = ((AbstractJobLauncher) jobLauncher).getJobContext();
+ return ((AbstractJobLauncher) jobLauncher).getJobContext();
} finally {
closer.close();
}
-
- // Indicating resolution succeeded.
- // 1) User config not being overloaded by template
- // 2) Conf that not appearing in the user-config being populated by template
- System.out.println(jobContext.getJobState());
- Assert.assertEquals(jobContext.getJobState().getProp("job.name"), "beforeResolution");
- Assert.assertEquals(jobContext.getJobState().getProp("distcp.persist.dir"), "/tmp/distcp-persist-dir");
}
@Test
diff --git a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/template/StaticJobTemplateTest.java b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/template/StaticJobTemplateTest.java
index 392c981..1eff130 100644
--- a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/template/StaticJobTemplateTest.java
+++ b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/template/StaticJobTemplateTest.java
@@ -18,13 +18,17 @@
package org.apache.gobblin.runtime.template;
import java.net.URI;
+import java.util.ArrayList;
import java.util.Collection;
+import java.util.List;
import java.util.Map;
+import org.apache.gobblin.runtime.api.JobTemplate;
import org.mockito.Mockito;
import org.testng.Assert;
import org.testng.annotations.Test;
+import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
@@ -75,6 +79,29 @@ public class StaticJobTemplateTest {
}
@Test
+ public void testMultipleTemplates() throws Exception {
+ Map<String, String> confMap = Maps.newHashMap();
+ confMap.put("key", "value");
+
+ InheritingJobTemplateTest.TestTemplate
+ template1 = new InheritingJobTemplateTest.TestTemplate(new URI("template1"), Lists.<JobTemplate>newArrayList(), ImmutableMap.of("key1", "value1"),
+ ImmutableList.of());
+ InheritingJobTemplateTest.TestTemplate
+ template2 = new InheritingJobTemplateTest.TestTemplate(new URI("template2"), Lists.<JobTemplate>newArrayList(), ImmutableMap.of("key2", "value2"),
+ ImmutableList.of());
+ List<JobTemplate> templateList = new ArrayList<>();
+ templateList.add(template1);
+ templateList.add(template2);
+
+ StaticJobTemplate template =
+ new StaticJobTemplate(new URI("template"), "1", "desc", ConfigFactory.parseMap(confMap), templateList);
+ Config resolved = template.getResolvedConfig(ConfigFactory.empty());
+ Assert.assertEquals(resolved.getString("key"), "value");
+ Assert.assertEquals(resolved.getString("key1"), "value1");
+ Assert.assertEquals(resolved.getString("key2"), "value2");
+ }
+
+ @Test
public void testSecure() throws Exception {
Map<String, Object> confMap = Maps.newHashMap();
confMap.put("nonOverridableKey", "value1");
@@ -82,7 +109,7 @@ public class StaticJobTemplateTest {
confMap.put(StaticJobTemplate.IS_SECURE_KEY, true);
confMap.put(StaticJobTemplate.SECURE_OVERRIDABLE_PROPERTIES_KEYS, "overridableKey, overridableKey2");
- StaticJobTemplate template = new StaticJobTemplate(URI.create("my://template"), "1", "desc", ConfigFactory.parseMap(confMap), null);
+ StaticJobTemplate template = new StaticJobTemplate(URI.create("my://template"), "1", "desc", ConfigFactory.parseMap(confMap), (JobCatalogWithTemplates) null);
Config userConfig = ConfigFactory.parseMap(ImmutableMap.of(
"overridableKey", "override",
diff --git a/gobblin-runtime/src/test/resources/templates/test-multitemplate-with-inheritance.template b/gobblin-runtime/src/test/resources/templates/test-multitemplate-with-inheritance.template
new file mode 100644
index 0000000..11d9a03
--- /dev/null
+++ b/gobblin-runtime/src/test/resources/templates/test-multitemplate-with-inheritance.template
@@ -0,0 +1,18 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+gobblin.template.inherit="resource:///templates/test-overwrite.template,resource:///templates/distcp-ng.template"
diff --git a/gobblin-runtime/src/test/resources/templates/test-overwrite.template b/gobblin-runtime/src/test/resources/templates/test-overwrite.template
new file mode 100644
index 0000000..8806138
--- /dev/null
+++ b/gobblin-runtime/src/test/resources/templates/test-overwrite.template
@@ -0,0 +1,20 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+gobblin.template.required_attributes="required0,required1,required2"
+templated0=x_x
+templated1=y_y
\ No newline at end of file
diff --git a/gobblin-utility/src/main/java/org/apache/gobblin/util/AvroFlattener.java b/gobblin-utility/src/main/java/org/apache/gobblin/util/AvroFlattener.java
index 414c140..771026c 100644
--- a/gobblin-utility/src/main/java/org/apache/gobblin/util/AvroFlattener.java
+++ b/gobblin-utility/src/main/java/org/apache/gobblin/util/AvroFlattener.java
@@ -34,7 +34,7 @@ import com.google.common.collect.ImmutableList;
/***
* This class provides methods to flatten an Avro Schema to make it more optimal for ORC
- * (Hive does not support predicate pushdown for ORC with nested fields: ETL-7214)
+ * (Hive does not support predicate pushdown for ORC with nested fields)
*
* The behavior of Avro Schema un-nesting is listed below:
*
@@ -120,7 +120,7 @@ public class AvroFlattener {
private static final String FLATTENED_NAME_JOINER = "__";
private static final String FLATTENED_SOURCE_JOINER = ".";
- private static final String FLATTENED_SOURCE_KEY = "flatten_source";
+ public static final String FLATTENED_SOURCE_KEY = "flatten_source";
private String flattenedNameJoiner;
private String flattenedSourceJoiner;