You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by su...@apache.org on 2019/11/15 22:45:00 UTC

[incubator-gobblin] branch master updated: [GOBBLIN-960] Resolving multiple templates in top-level

This is an automated email from the ASF dual-hosted git repository.

suvasude pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 9f0218b  [GOBBLIN-960] Resolving multiple templates in top-level
9f0218b is described below

commit 9f0218b276dbf18cd31af11642dcf23230ebcdfc
Author: autumnust <le...@linkedin.com>
AuthorDate: Fri Nov 15 14:44:53 2019 -0800

    [GOBBLIN-960] Resolving multiple templates in top-level
    
    Closes #2809 from autumnust/multiple-templates
---
 .../gobblin/runtime/AbstractJobLauncher.java       |  27 ++++--
 .../org/apache/gobblin/runtime/api/JobSpec.java    |  38 +++++++-
 .../runtime/template/InheritingJobTemplate.java    | 104 +++++++++++++++------
 .../runtime/template/StaticJobTemplate.java        |  16 +++-
 .../gobblin/runtime/LocalJobLauncherTest.java      |  81 +++++++++++-----
 .../runtime/template/StaticJobTemplateTest.java    |  29 +++++-
 .../test-multitemplate-with-inheritance.template   |  18 ++++
 .../resources/templates/test-overwrite.template    |  20 ++++
 .../org/apache/gobblin/util/AvroFlattener.java     |   4 +-
 9 files changed, 267 insertions(+), 70 deletions(-)

diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/AbstractJobLauncher.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/AbstractJobLauncher.java
index d9241c5..f7cecd4 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/AbstractJobLauncher.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/AbstractJobLauncher.java
@@ -20,6 +20,8 @@ package org.apache.gobblin.runtime;
 import java.io.IOException;
 import java.net.URI;
 import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
@@ -116,6 +118,12 @@ public abstract class AbstractJobLauncher implements JobLauncher {
 
   public static final String GOBBLIN_JOB_TEMPLATE_KEY = "gobblin.template.uri";
 
+  /** Making {@link AbstractJobLauncher} capable of loading multiple job templates.
+   * Keep the original {@link #GOBBLIN_JOB_TEMPLATE_KEY} for backward-compatibility.
+   * TODO: Expand support to Gobblin-as-a-Service in FlowTemplateCatalog.
+   * */
+  public static final String GOBBLIN_JOB_MULTI_TEMPLATE_KEY = "gobblin.template.uris";
+
   // Job configuration properties
   protected final Properties jobProps;
 
@@ -236,14 +244,21 @@ public abstract class AbstractJobLauncher implements JobLauncher {
   public static void resolveGobblinJobTemplateIfNecessary(Properties jobProps) throws IOException, URISyntaxException,
                                                                                       SpecNotFoundException,
                                                                                       JobTemplate.TemplateException {
+    Config config = ConfigUtils.propertiesToConfig(jobProps);
+    JobSpecResolver resolver = JobSpecResolver.builder(config).build();
+    JobSpec jobSpec = null;
     if (jobProps.containsKey(GOBBLIN_JOB_TEMPLATE_KEY)) {
-      Config config = ConfigUtils.propertiesToConfig(jobProps);
-      JobSpecResolver resolver = JobSpecResolver.builder(config).build();
-
       URI templateUri = new URI(jobProps.getProperty(GOBBLIN_JOB_TEMPLATE_KEY));
-      JobSpec jobSpec = JobSpec.builder().withConfig(config).withTemplate(templateUri).build();
-      ResolvedJobSpec resolvedJob = resolver.resolveJobSpec(jobSpec);
-      jobProps.putAll(ConfigUtils.configToProperties(resolvedJob.getConfig()));
+      jobSpec = JobSpec.builder().withConfig(config).withTemplate(templateUri).build();
+    } else if (jobProps.containsKey(GOBBLIN_JOB_MULTI_TEMPLATE_KEY)) {
+      List<URI> templatesURIs = new ArrayList<>();
+      for (String uri : jobProps.getProperty(GOBBLIN_JOB_MULTI_TEMPLATE_KEY).split(",")) {
+        templatesURIs.add(new URI(uri));
+      }
+      jobSpec = JobSpec.builder().withConfig(config).withResourceTemplates(templatesURIs).build();
+    }
+    if (jobSpec != null ) {
+      jobProps.putAll(ConfigUtils.configToProperties(resolver.resolveJobSpec(jobSpec).getConfig()));
     }
   }
 
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/JobSpec.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/JobSpec.java
index a6ca903..2aadad4 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/JobSpec.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/JobSpec.java
@@ -19,9 +19,18 @@ package org.apache.gobblin.runtime.api;
 import java.io.IOException;
 import java.net.URI;
 import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.List;
 import java.util.Map;
 import java.util.Properties;
 
+import org.apache.gobblin.annotation.Alpha;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.runtime.JobState;
+import org.apache.gobblin.runtime.template.ResourceBasedJobTemplate;
+import org.apache.gobblin.runtime.template.StaticJobTemplate;
+import org.apache.gobblin.util.ConfigUtils;
+
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
@@ -29,11 +38,6 @@ import com.google.common.collect.ImmutableMap;
 import com.typesafe.config.Config;
 import com.typesafe.config.ConfigFactory;
 
-import org.apache.gobblin.annotation.Alpha;
-import org.apache.gobblin.configuration.ConfigurationKeys;
-import org.apache.gobblin.runtime.JobState;
-import org.apache.gobblin.util.ConfigUtils;
-
 import lombok.AllArgsConstructor;
 import lombok.Data;
 import lombok.extern.slf4j.Slf4j;
@@ -75,6 +79,7 @@ public class JobSpec implements Configurable, Spec {
 
   /** A Verb identifies if the Spec is for Insert/Update/Delete */
   public static final String VERB_KEY = "Verb";
+  private static final String IN_MEMORY_TEMPLATE_URI = "inmemory";
 
   public static Builder builder(URI jobSpecUri) {
     return new Builder(jobSpecUri);
@@ -304,6 +309,29 @@ public class JobSpec implements Configurable, Spec {
       return this;
     }
 
+    /**
+     * As the public interface of {@link JobSpec} doesn't really support multiple templates,
+     * for incoming list of template uris we will consolidate them as well as resolving. The resolved Config
+     * will not be materialized but reside only in memory through the lifecycle.
+     *
+     * Restriction: This method assumes no customized jobTemplateCatalog and uses classpath resources as the input.
+     * Also, the order of list matters: The former one will be overwritten by the latter.
+     */
+    public Builder withResourceTemplates(List<URI> templateURIs) {
+      try {
+        List<JobTemplate> templates = new ArrayList<>();
+        for(URI uri : templateURIs) {
+          templates.add(ResourceBasedJobTemplate.forResourcePath(uri.getPath()));
+        }
+        this.jobTemplate = Optional.of(new StaticJobTemplate(new URI(IN_MEMORY_TEMPLATE_URI), "", "",
+            ConfigFactory.empty(), templates));
+
+      } catch (URISyntaxException | SpecNotFoundException | JobTemplate.TemplateException | IOException e) {
+        throw new RuntimeException("Fatal exception: Templates couldn't be resolved properly, ", e);
+      }
+      return this;
+    }
+
     public Optional<JobTemplate> getTemplate() {
       return this.jobTemplate;
     }
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/template/InheritingJobTemplate.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/template/InheritingJobTemplate.java
index d542ed5..dbe5458 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/template/InheritingJobTemplate.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/template/InheritingJobTemplate.java
@@ -51,11 +51,15 @@ public abstract class InheritingJobTemplate implements JobTemplate {
     this.resolved = false;
   }
 
-  public InheritingJobTemplate(List<JobTemplate> superTemplates) {
+  public InheritingJobTemplate(List<JobTemplate> superTemplates, boolean skipResolve) {
     this.superTemplateUris = Lists.newArrayList();
     this.catalog = null;
     this.superTemplates = superTemplates;
-    this.resolved = true;
+    this.resolved = skipResolve;
+  }
+
+  public InheritingJobTemplate(List<JobTemplate> superTemplates) {
+    this(superTemplates, true);
   }
 
   /**
@@ -71,27 +75,62 @@ public abstract class InheritingJobTemplate implements JobTemplate {
     resolveTemplates(loadedTemplates);
   }
 
+  /**
+   * Resolve all superTemplates being field variables within the class.
+   * There are two types of resolution being involved in this method:
+   * 1) When all templates are being represented as {@link #superTemplateUris}, the actual template will be loaded from
+   * catalog first and enter resolution process. The physical {@link #superTemplates} are being materialized after that.
+   * 2) org.apache.gobblin.runtime.template.InheritingJobTemplate#InheritingJobTemplate(java.util.List, boolean) provides
+   * interface to directly provide physical {@link #superTemplates}. This case is determined by non-null containers of
+   * {@link #superTemplates}.
+   *
+   */
   private void resolveTemplates(Map<URI, JobTemplate> loadedTemplates) throws SpecNotFoundException, TemplateException {
     if (this.resolved) {
       return;
     }
-    this.superTemplates = Lists.newArrayList();
-    for (URI uri : this.superTemplateUris) {
-      if (!loadedTemplates.containsKey(uri)) {
-        JobTemplate newTemplate = this.catalog.getTemplate(uri);
-        loadedTemplates.put(uri, newTemplate);
-        if (newTemplate instanceof InheritingJobTemplate) {
-          ((InheritingJobTemplate) newTemplate).resolveTemplates(loadedTemplates);
+
+    if (this.superTemplateUris.size() > 0) {
+      this.superTemplates = Lists.newArrayList();
+      for (URI uri : this.superTemplateUris) {
+        if (!loadedTemplates.containsKey(uri)) {
+          JobTemplate newTemplate = this.catalog.getTemplate(uri);
+          resolveTemplateRecursionHelper(newTemplate, uri, loadedTemplates);
+        }
+        this.superTemplates.add(loadedTemplates.get(uri));
+      }
+    } else if (superTemplates != null ) {
+      for (JobTemplate newTemplate : this.superTemplates) {
+        if (!loadedTemplates.containsKey(newTemplate.getUri())) {
+          resolveTemplateRecursionHelper(newTemplate, newTemplate.getUri(), loadedTemplates);
         }
       }
-      this.superTemplates.add(loadedTemplates.get(uri));
     }
+
     this.resolved = true;
   }
 
+  /**
+   * The canonicalURI needs to be there when the template needs to loaded from catalog, as the format are adjusted
+   * while constructing the template.
+   * Essentially, jobCatalog.load(templateUris.get(0)).getUri().equal(templateUris.get(0)) return false.
+   */
+  private void resolveTemplateRecursionHelper(JobTemplate newTemplate, URI canonicalURI, Map<URI, JobTemplate> loadedTemplates)
+      throws SpecNotFoundException, TemplateException {
+    loadedTemplates.put(canonicalURI, newTemplate);
+    if (newTemplate instanceof InheritingJobTemplate) {
+      ((InheritingJobTemplate) newTemplate).resolveTemplates(loadedTemplates);
+    }
+  }
+
   public Collection<JobTemplate> getSuperTemplates() throws SpecNotFoundException, TemplateException {
     ensureTemplatesResolved();
-    return ImmutableList.copyOf(this.superTemplates);
+
+    if (superTemplates != null ) {
+      return ImmutableList.copyOf(this.superTemplates);
+    } else {
+      return ImmutableList.of();
+    }
   }
 
   @Override
@@ -103,13 +142,14 @@ public abstract class InheritingJobTemplate implements JobTemplate {
   private Config getRawTemplateConfigHelper(Set<JobTemplate> alreadyInheritedTemplates)
       throws SpecNotFoundException, TemplateException {
     Config rawTemplate = getLocalRawTemplate();
-    for (JobTemplate template : Lists.reverse(this.superTemplates)) {
-      if (!alreadyInheritedTemplates.contains(template)) {
-        alreadyInheritedTemplates.add(template);
-        Config thisFallback = template instanceof InheritingJobTemplate
-            ? ((InheritingJobTemplate) template).getRawTemplateConfigHelper(alreadyInheritedTemplates)
-            : template.getRawTemplateConfig();
-        rawTemplate = rawTemplate.withFallback(thisFallback);
+    if (this.superTemplates != null) {
+      for (JobTemplate template : Lists.reverse(this.superTemplates)) {
+        if (!alreadyInheritedTemplates.contains(template)) {
+          alreadyInheritedTemplates.add(template);
+          Config thisFallback = template instanceof InheritingJobTemplate ? ((InheritingJobTemplate) template).getRawTemplateConfigHelper(alreadyInheritedTemplates)
+              : template.getRawTemplateConfig();
+          rawTemplate = rawTemplate.withFallback(thisFallback);
+        }
       }
     }
     return rawTemplate;
@@ -136,12 +176,13 @@ public abstract class InheritingJobTemplate implements JobTemplate {
   private Set<String> getRequiredConfigListHelper(Set<JobTemplate> alreadyLoadedTemplates)
       throws SpecNotFoundException, TemplateException {
     Set<String> requiredConfigs = Sets.newHashSet(getLocallyRequiredConfigList());
-    for (JobTemplate template : this.superTemplates) {
-      if (!alreadyLoadedTemplates.contains(template)) {
-        alreadyLoadedTemplates.add(template);
-        requiredConfigs.addAll(template instanceof InheritingJobTemplate
-            ? ((InheritingJobTemplate) template).getRequiredConfigListHelper(alreadyLoadedTemplates)
-            : template.getRequiredConfigList());
+    if (this.superTemplates != null) {
+      for (JobTemplate template : this.superTemplates) {
+        if (!alreadyLoadedTemplates.contains(template)) {
+          alreadyLoadedTemplates.add(template);
+          requiredConfigs.addAll(template instanceof InheritingJobTemplate ? ((InheritingJobTemplate) template).getRequiredConfigListHelper(alreadyLoadedTemplates)
+              : template.getRequiredConfigList());
+        }
       }
     }
     return requiredConfigs;
@@ -158,13 +199,14 @@ public abstract class InheritingJobTemplate implements JobTemplate {
   private Config getResolvedConfigHelper(Config userConfig, Set<JobTemplate> alreadyLoadedTemplates)
       throws SpecNotFoundException, TemplateException {
     Config config = getLocallyResolvedConfig(userConfig);
-    for (JobTemplate template : Lists.reverse(this.superTemplates)) {
-      if (!alreadyLoadedTemplates.contains(template)) {
-        alreadyLoadedTemplates.add(template);
-        Config fallback = template instanceof InheritingJobTemplate
-            ? ((InheritingJobTemplate) template).getResolvedConfigHelper(config, alreadyLoadedTemplates)
-            : template.getResolvedConfig(config);
-        config = config.withFallback(fallback);
+    if (superTemplates != null ) {
+      for (JobTemplate template : Lists.reverse(this.superTemplates)) {
+        if (!alreadyLoadedTemplates.contains(template)) {
+          alreadyLoadedTemplates.add(template);
+          Config fallback = template instanceof InheritingJobTemplate ? ((InheritingJobTemplate) template)
+              .getResolvedConfigHelper(config, alreadyLoadedTemplates) : template.getResolvedConfig(config);
+          config = config.withFallback(fallback);
+        }
       }
     }
     return config;
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/template/StaticJobTemplate.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/template/StaticJobTemplate.java
index 83e65f1..6472383 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/template/StaticJobTemplate.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/template/StaticJobTemplate.java
@@ -64,13 +64,23 @@ public class StaticJobTemplate extends InheritingJobTemplate implements SecureJo
   private Collection<String> dependencies;
 
   public StaticJobTemplate(URI uri, String version, String description, Config config, JobCatalogWithTemplates catalog)
-      throws SpecNotFoundException, TemplateException {
+      throws TemplateException {
     this(uri, version, description, config, getSuperTemplateUris(config), catalog);
   }
 
+  /** An constructor that materialize multiple templates into a single static template
+   * The constructor provided multiple in-memory templates as the input instead of templateURIs
+   * */
+  public StaticJobTemplate(URI uri, String version, String description, Config config, List<JobTemplate> templates) {
+    super(templates, false);
+    this.uri = uri;
+    this.version = version;
+    this.rawConfig = config;
+    this.description = description;
+  }
+
   protected StaticJobTemplate(URI uri, String version, String description, Config config, List<URI> superTemplateUris,
-      JobCatalogWithTemplates catalog)
-      throws SpecNotFoundException, TemplateException {
+      JobCatalogWithTemplates catalog) {
     super(superTemplateUris, catalog);
     this.uri = uri;
     this.version = version;
diff --git a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/LocalJobLauncherTest.java b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/LocalJobLauncherTest.java
index 3715082..1d781c7 100644
--- a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/LocalJobLauncherTest.java
+++ b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/LocalJobLauncherTest.java
@@ -21,32 +21,25 @@ import java.io.FileReader;
 import java.io.IOException;
 import java.util.Properties;
 
-import org.apache.gobblin.runtime.AbstractJobLauncher;
-import org.apache.gobblin.runtime.JobContext;
-import org.apache.gobblin.runtime.JobLauncher;
-import org.apache.gobblin.runtime.JobLauncherFactory;
-import org.apache.gobblin.runtime.local.LocalJobLauncher;
-import org.apache.gobblin.util.JobLauncherUtils;
-import org.junit.Assert;
-import org.testng.annotations.AfterClass;
-import org.testng.annotations.BeforeClass;
-import org.testng.annotations.Test;
-
 import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.metastore.FsStateStore;
 import org.apache.gobblin.metastore.StateStore;
-
 import org.apache.gobblin.metastore.testing.ITestMetastoreDatabase;
 import org.apache.gobblin.metastore.testing.TestMetastoreDatabaseFactory;
-import org.apache.gobblin.runtime.JobLauncherTestHelper;
-import org.apache.gobblin.runtime.JobState;
+import org.apache.gobblin.runtime.local.LocalJobLauncher;
+import org.apache.gobblin.util.JobLauncherUtils;
 import org.apache.gobblin.util.limiter.BaseLimiterType;
 import org.apache.gobblin.util.limiter.DefaultLimiterFactory;
 import org.apache.gobblin.writer.Destination;
 import org.apache.gobblin.writer.WriterOutputFormat;
+import org.junit.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
 
 import com.google.common.io.Closer;
 
+import static org.apache.gobblin.runtime.AbstractJobLauncher.GOBBLIN_JOB_MULTI_TEMPLATE_KEY;
 import static org.apache.gobblin.runtime.AbstractJobLauncher.GOBBLIN_JOB_TEMPLATE_KEY;
 
 
@@ -100,22 +93,66 @@ public class LocalJobLauncherTest {
     jobProps.setProperty("job.name", "beforeResolution");
     jobProps.setProperty(GOBBLIN_JOB_TEMPLATE_KEY, "resource:///templates/distcp-ng.template");
 
+    JobContext jobContext = dummyJobContextInitHelper(jobProps);
 
+    // Indicating resolution succeeded.
+    // 1) User config not being overloaded by template
+    // 2) Conf that not appearing in the user-config being populated by template
+    System.out.println(jobContext.getJobState());
+    Assert.assertEquals(jobContext.getJobState().getProp("job.name"), "beforeResolution");
+    Assert.assertEquals(jobContext.getJobState().getProp("distcp.persist.dir"), "/tmp/distcp-persist-dir");
+  }
+
+  @Test
+  public void testMultipleJobTemplatesResoluion() throws Exception {
+    Properties jobProps = loadJobProps();
+    // Job Name shouldn't be overwritten by any templates and the precedence of template is lower than job configuration.
+    String jobId = JobLauncherUtils.newJobId("beforeResolution");
+    jobProps.setProperty(ConfigurationKeys.JOB_ID_KEY, jobId);
+    jobProps.setProperty("job.name", "beforeResolution");
+    jobProps.setProperty(GOBBLIN_JOB_MULTI_TEMPLATE_KEY,
+        "resource:///templates/test.template,resource:///templates/test-overwrite.template");
+
+    JobContext jobContext = dummyJobContextInitHelper(jobProps);
+
+    // Verifying multi-resolution happens and it respect the precedency.
+    Assert.assertEquals(jobContext.getJobState().getProp("job.name"), "beforeResolution");
+    Assert.assertEquals(jobContext.getJobState().getProp("templated0"), "x_x");
+    Assert.assertEquals(jobContext.getJobState().getProp("templated1"), "y_y");
+
+    // Verifying the order of template list matters.
+    jobProps.setProperty(GOBBLIN_JOB_MULTI_TEMPLATE_KEY,
+        "resource:///templates/test-overwrite.template,resource:///templates/test.template");
+    jobContext = dummyJobContextInitHelper(jobProps);
+    Assert.assertEquals(jobContext.getJobState().getProp("job.name"), "beforeResolution");
+    Assert.assertEquals(jobContext.getJobState().getProp("templated0"), "x");
+    Assert.assertEquals(jobContext.getJobState().getProp("templated1"), "y");
+
+
+    // Verify multi-resolution with inheritance.
+    jobProps.setProperty(GOBBLIN_JOB_MULTI_TEMPLATE_KEY,
+        "resource:///templates/test-multitemplate-with-inheritance.template,resource:///templates/test.template");
+    jobContext = dummyJobContextInitHelper(jobProps);
+    Assert.assertEquals(jobContext.getJobState().getProp("templated0"), "x");
+    Assert.assertEquals(jobContext.getJobState().getProp("templated1"), "y");
+    Assert.assertEquals(jobContext.getJobState().getProp("job.name"), "beforeResolution");
+    // Picked an distcp-specific configuration that there's no default value being set in jobConf.
+    Assert.assertEquals(jobContext.getJobState().getProp("distcp.persist.dir"), "/tmp/distcp-persist-dir");
+  }
+
+  /**
+   * Initialize a jobContext by initializing jobLauncher. This code is mostly used for
+   * testing job templates resolution.
+   */
+  private JobContext dummyJobContextInitHelper(Properties jobProps) throws Exception {
     JobContext jobContext = null;
     Closer closer = Closer.create();
     try {
       JobLauncher jobLauncher = closer.register(JobLauncherFactory.newJobLauncher(this.launcherProps, jobProps));
-      jobContext = ((AbstractJobLauncher) jobLauncher).getJobContext();
+      return ((AbstractJobLauncher) jobLauncher).getJobContext();
     } finally {
       closer.close();
     }
-
-    // Indicating resolution succeeded.
-    // 1) User config not being overloaded by template
-    // 2) Conf that not appearing in the user-config being populated by template
-    System.out.println(jobContext.getJobState());
-    Assert.assertEquals(jobContext.getJobState().getProp("job.name"), "beforeResolution");
-    Assert.assertEquals(jobContext.getJobState().getProp("distcp.persist.dir"), "/tmp/distcp-persist-dir");
   }
 
   @Test
diff --git a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/template/StaticJobTemplateTest.java b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/template/StaticJobTemplateTest.java
index 392c981..1eff130 100644
--- a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/template/StaticJobTemplateTest.java
+++ b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/template/StaticJobTemplateTest.java
@@ -18,13 +18,17 @@
 package org.apache.gobblin.runtime.template;
 
 import java.net.URI;
+import java.util.ArrayList;
 import java.util.Collection;
+import java.util.List;
 import java.util.Map;
 
+import org.apache.gobblin.runtime.api.JobTemplate;
 import org.mockito.Mockito;
 import org.testng.Assert;
 import org.testng.annotations.Test;
 
+import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
@@ -75,6 +79,29 @@ public class StaticJobTemplateTest {
   }
 
   @Test
+  public void testMultipleTemplates() throws Exception {
+    Map<String, String> confMap = Maps.newHashMap();
+    confMap.put("key", "value");
+
+    InheritingJobTemplateTest.TestTemplate
+        template1 = new InheritingJobTemplateTest.TestTemplate(new URI("template1"), Lists.<JobTemplate>newArrayList(), ImmutableMap.of("key1", "value1"),
+        ImmutableList.of());
+    InheritingJobTemplateTest.TestTemplate
+        template2 = new InheritingJobTemplateTest.TestTemplate(new URI("template2"), Lists.<JobTemplate>newArrayList(), ImmutableMap.of("key2", "value2"),
+        ImmutableList.of());
+    List<JobTemplate> templateList = new ArrayList<>();
+    templateList.add(template1);
+    templateList.add(template2);
+
+    StaticJobTemplate template =
+        new StaticJobTemplate(new URI("template"), "1", "desc", ConfigFactory.parseMap(confMap), templateList);
+    Config resolved = template.getResolvedConfig(ConfigFactory.empty());
+    Assert.assertEquals(resolved.getString("key"), "value");
+    Assert.assertEquals(resolved.getString("key1"), "value1");
+    Assert.assertEquals(resolved.getString("key2"), "value2");
+  }
+
+  @Test
   public void testSecure() throws Exception {
     Map<String, Object> confMap = Maps.newHashMap();
     confMap.put("nonOverridableKey", "value1");
@@ -82,7 +109,7 @@ public class StaticJobTemplateTest {
     confMap.put(StaticJobTemplate.IS_SECURE_KEY, true);
     confMap.put(StaticJobTemplate.SECURE_OVERRIDABLE_PROPERTIES_KEYS, "overridableKey, overridableKey2");
 
-    StaticJobTemplate template = new StaticJobTemplate(URI.create("my://template"), "1", "desc", ConfigFactory.parseMap(confMap), null);
+    StaticJobTemplate template = new StaticJobTemplate(URI.create("my://template"), "1", "desc", ConfigFactory.parseMap(confMap), (JobCatalogWithTemplates) null);
 
     Config userConfig = ConfigFactory.parseMap(ImmutableMap.of(
         "overridableKey", "override",
diff --git a/gobblin-runtime/src/test/resources/templates/test-multitemplate-with-inheritance.template b/gobblin-runtime/src/test/resources/templates/test-multitemplate-with-inheritance.template
new file mode 100644
index 0000000..11d9a03
--- /dev/null
+++ b/gobblin-runtime/src/test/resources/templates/test-multitemplate-with-inheritance.template
@@ -0,0 +1,18 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+gobblin.template.inherit="resource:///templates/test-overwrite.template,resource:///templates/distcp-ng.template"
diff --git a/gobblin-runtime/src/test/resources/templates/test-overwrite.template b/gobblin-runtime/src/test/resources/templates/test-overwrite.template
new file mode 100644
index 0000000..8806138
--- /dev/null
+++ b/gobblin-runtime/src/test/resources/templates/test-overwrite.template
@@ -0,0 +1,20 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+gobblin.template.required_attributes="required0,required1,required2"
+templated0=x_x
+templated1=y_y
\ No newline at end of file
diff --git a/gobblin-utility/src/main/java/org/apache/gobblin/util/AvroFlattener.java b/gobblin-utility/src/main/java/org/apache/gobblin/util/AvroFlattener.java
index 414c140..771026c 100644
--- a/gobblin-utility/src/main/java/org/apache/gobblin/util/AvroFlattener.java
+++ b/gobblin-utility/src/main/java/org/apache/gobblin/util/AvroFlattener.java
@@ -34,7 +34,7 @@ import com.google.common.collect.ImmutableList;
 
 /***
  * This class provides methods to flatten an Avro Schema to make it more optimal for ORC
- * (Hive does not support predicate pushdown for ORC with nested fields: ETL-7214)
+ * (Hive does not support predicate pushdown for ORC with nested fields)
  *
  * The behavior of Avro Schema un-nesting is listed below:
  *
@@ -120,7 +120,7 @@ public class AvroFlattener {
 
   private static final String FLATTENED_NAME_JOINER = "__";
   private static final String FLATTENED_SOURCE_JOINER = ".";
-  private static final String FLATTENED_SOURCE_KEY = "flatten_source";
+  public static final String FLATTENED_SOURCE_KEY = "flatten_source";
 
   private String flattenedNameJoiner;
   private String flattenedSourceJoiner;