You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by hu...@apache.org on 2017/11/21 03:51:57 UTC

incubator-gobblin git commit: [GOBBLIN-317] Add dynamic configuration injection in the mappers

Repository: incubator-gobblin
Updated Branches:
  refs/heads/master c8707eded -> 5c03b11b5


[GOBBLIN-317] Add dynamic configuration injection in the mappers

Closes #2170 from htran1/dynamic_config


Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/5c03b11b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/5c03b11b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/5c03b11b

Branch: refs/heads/master
Commit: 5c03b11b5b5a62a611fa2a1042bfe64c283c6a5e
Parents: c8707ed
Author: Hung Tran <hu...@linkedin.com>
Authored: Mon Nov 20 19:51:48 2017 -0800
Committer: Hung Tran <hu...@linkedin.com>
Committed: Mon Nov 20 19:51:48 2017 -0800

----------------------------------------------------------------------
 .../configuration/ConfigurationKeys.java        |  8 ++-
 .../configuration/DynamicConfigGenerator.java   | 33 ++++++++++++
 .../NoopDynamicConfigGenerator.java             | 36 +++++++++++++
 .../gobblin/azkaban/AzkabanJobLauncher.java     | 15 ++++++
 .../kafka/client/Kafka09ConsumerClient.java     |  7 ++-
 .../metrics/kafka/KafkaAvroReporter.java        |  6 +--
 .../gobblin/metrics/kafka/KafkaReporter.java    | 16 +++++-
 .../metrics/kafka/KafkaReporterFactory.java     |  9 +++-
 .../runtime/DynamicConfigGeneratorFactory.java  | 53 ++++++++++++++++++++
 .../runtime/mapreduce/MRJobLauncher.java        | 26 ++++++++--
 .../gobblin/runtime/JobLauncherTestHelper.java  |  2 +
 .../runtime/mapreduce/MRJobLauncherTest.java    | 24 +++++++++
 12 files changed, 220 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5c03b11b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
----------------------------------------------------------------------
diff --git a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
index a563b43..3e576ce 100644
--- a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
+++ b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
@@ -205,6 +205,12 @@ public class ConfigurationKeys {
   public static final String DEFAULT_EVENT_METADATA_GENERATOR_CLASS_KEY = "noop";
 
   /**
+   * Configuration for dynamic configuration generation
+   */
+  public static final String DYNAMIC_CONFIG_GENERATOR_CLASS_KEY = "dynamicConfigGenerator.class";
+  public static final String DEFAULT_DYNAMIC_CONFIG_GENERATOR_CLASS_KEY = "noop";
+
+  /**
    * Configuration properties used internally.
    */
   public static final String JOB_ID_KEY = "job.id";
@@ -734,7 +740,7 @@ public class ConfigurationKeys {
   public static final boolean DEFAULT_KAFKA_SOURCE_SHARE_CONSUMER_CLIENT = false;
   public static final String KAFKA_SOURCE_AVG_FETCH_TIME_CAP = "kakfa.source.avgFetchTimeCap";
   public static final int DEFAULT_KAFKA_SOURCE_AVG_FETCH_TIME_CAP = 100;
-
+  public static final String SHARED_KAFKA_CONFIG_PREFIX = "gobblin.kafka.sharedConfig";
 
   /**
    * Job execution info server and history store configuration properties.

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5c03b11b/gobblin-api/src/main/java/org/apache/gobblin/configuration/DynamicConfigGenerator.java
----------------------------------------------------------------------
diff --git a/gobblin-api/src/main/java/org/apache/gobblin/configuration/DynamicConfigGenerator.java b/gobblin-api/src/main/java/org/apache/gobblin/configuration/DynamicConfigGenerator.java
new file mode 100644
index 0000000..9783fab
--- /dev/null
+++ b/gobblin-api/src/main/java/org/apache/gobblin/configuration/DynamicConfigGenerator.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.configuration;
+
+import com.typesafe.config.Config;
+
+/**
+ * For generating dynamic configuration that gets added to the job configuration.
+ * These are configuration values that cannot be determined statically at job specification time.
+ * One example is the SSL certificate location of a certificate that is fetched at runtime.
+ */
+public interface DynamicConfigGenerator {
+  /**
+   * Generate dynamic configuration that should be added to the job configuration.
+   * @param config configuration
+   * @return config object with the dynamic configuration
+   */
+  Config generateDynamicConfig(Config config);
+}

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5c03b11b/gobblin-api/src/main/java/org/apache/gobblin/configuration/NoopDynamicConfigGenerator.java
----------------------------------------------------------------------
diff --git a/gobblin-api/src/main/java/org/apache/gobblin/configuration/NoopDynamicConfigGenerator.java b/gobblin-api/src/main/java/org/apache/gobblin/configuration/NoopDynamicConfigGenerator.java
new file mode 100644
index 0000000..c990690
--- /dev/null
+++ b/gobblin-api/src/main/java/org/apache/gobblin/configuration/NoopDynamicConfigGenerator.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.configuration;
+
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import org.apache.gobblin.annotation.Alias;
+
+
+/**
+ * NoOp dynamic config generator that returns an empty {@link Config}
+ */
+@Alias("noop")
+public class NoopDynamicConfigGenerator implements DynamicConfigGenerator {
+
+  public NoopDynamicConfigGenerator() {
+  }
+
+  public Config generateDynamicConfig(Config config) {
+    return ConfigFactory.empty();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5c03b11b/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/azkaban/AzkabanJobLauncher.java
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/azkaban/AzkabanJobLauncher.java b/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/azkaban/AzkabanJobLauncher.java
index 20b630b..5c9fc1d 100644
--- a/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/azkaban/AzkabanJobLauncher.java
+++ b/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/azkaban/AzkabanJobLauncher.java
@@ -21,6 +21,7 @@ import java.io.File;
 import java.io.IOException;
 import java.net.URI;
 import java.util.List;
+import java.util.Map;
 import java.util.Properties;
 import java.util.Set;
 import java.util.UUID;
@@ -46,12 +47,15 @@ import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
 import com.google.common.io.Closer;
 import com.typesafe.config.Config;
+import com.typesafe.config.ConfigValue;
 
 import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.configuration.DynamicConfigGenerator;
 import org.apache.gobblin.configuration.State;
 import org.apache.gobblin.metrics.GobblinMetrics;
 import org.apache.gobblin.metrics.RootMetricContext;
 import org.apache.gobblin.metrics.Tag;
+import org.apache.gobblin.runtime.DynamicConfigGeneratorFactory;
 import org.apache.gobblin.runtime.JobException;
 import org.apache.gobblin.runtime.JobLauncher;
 import org.apache.gobblin.runtime.JobLauncherFactory;
@@ -128,6 +132,17 @@ public class AzkabanJobLauncher extends AbstractJob implements ApplicationLaunch
     this.props = new Properties();
     this.props.putAll(props);
 
+    // load dynamic configuration and add them to the job properties
+    Config propsAsConfig = ConfigUtils.propertiesToConfig(props);
+    DynamicConfigGenerator dynamicConfigGenerator =
+        DynamicConfigGeneratorFactory.createDynamicConfigGenerator(propsAsConfig);
+    Config dynamicConfig = dynamicConfigGenerator.generateDynamicConfig(propsAsConfig);
+
+    // add the dynamic config to the job config
+    for (Map.Entry<String, ConfigValue> entry : dynamicConfig.entrySet()) {
+      this.props.put(entry.getKey(), entry.getValue().unwrapped().toString());
+    }
+
     Configuration conf = new Configuration();
 
     String fsUri = conf.get(HADOOP_FS_DEFAULT_NAME);

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5c03b11b/gobblin-modules/gobblin-kafka-09/src/main/java/org/apache/gobblin/kafka/client/Kafka09ConsumerClient.java
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-kafka-09/src/main/java/org/apache/gobblin/kafka/client/Kafka09ConsumerClient.java b/gobblin-modules/gobblin-kafka-09/src/main/java/org/apache/gobblin/kafka/client/Kafka09ConsumerClient.java
index 7f83192..5ab27e0 100644
--- a/gobblin-modules/gobblin-kafka-09/src/main/java/org/apache/gobblin/kafka/client/Kafka09ConsumerClient.java
+++ b/gobblin-modules/gobblin-kafka-09/src/main/java/org/apache/gobblin/kafka/client/Kafka09ConsumerClient.java
@@ -44,6 +44,7 @@ import javax.annotation.Nonnull;
 import lombok.EqualsAndHashCode;
 import lombok.ToString;
 
+import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.source.extractor.extract.kafka.KafkaOffsetRetrievalFailureException;
 import org.apache.gobblin.source.extractor.extract.kafka.KafkaPartition;
 import org.apache.gobblin.source.extractor.extract.kafka.KafkaTopic;
@@ -96,8 +97,10 @@ public class Kafka09ConsumerClient<K, V> extends AbstractBaseKafkaConsumerClient
 
     // grab all the config under "source.kafka" and add the defaults as fallback.
     Config baseConfig = ConfigUtils.getConfigOrEmpty(config, CONFIG_NAMESPACE).withFallback(FALLBACK);
-    // get the "source.kafka.consumerConfig" config for extra config to pass along to Kafka
-    Config specificConfig = ConfigUtils.getConfigOrEmpty(baseConfig, CONSUMER_CONFIG);
+    // get the "source.kafka.consumerConfig" config for extra config to pass along to Kafka with a fallback to the
+    // shared config that start with "gobblin.kafka.sharedConfig"
+    Config specificConfig = ConfigUtils.getConfigOrEmpty(baseConfig, CONSUMER_CONFIG).withFallback(
+        ConfigUtils.getConfigOrEmpty(config, ConfigurationKeys.SHARED_KAFKA_CONFIG_PREFIX));
     // The specific config overrides settings in the base config
     Config scopedConfig = specificConfig.withFallback(baseConfig.withoutPath(CONSUMER_CONFIG));
     props.putAll(ConfigUtils.configToProperties(scopedConfig));

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5c03b11b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/KafkaAvroReporter.java
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/KafkaAvroReporter.java b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/KafkaAvroReporter.java
index 4b6399b..5cb3ce8 100644
--- a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/KafkaAvroReporter.java
+++ b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/KafkaAvroReporter.java
@@ -25,13 +25,11 @@ import org.apache.avro.Schema;
 import com.google.common.base.Optional;
 import com.typesafe.config.Config;
 
-import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.metrics.MetricReport;
 import org.apache.gobblin.metrics.reporter.util.AvroBinarySerializer;
 import org.apache.gobblin.metrics.reporter.util.AvroSerializer;
 import org.apache.gobblin.metrics.reporter.util.SchemaRegistryVersionWriter;
 import org.apache.gobblin.metrics.reporter.util.SchemaVersionWriter;
-import org.apache.gobblin.util.ConfigUtils;
 
 
 /**
@@ -99,7 +97,9 @@ public class KafkaAvroReporter extends KafkaReporter {
     public KafkaAvroReporter build(String brokers, String topic, Properties props) throws IOException {
       this.brokers = brokers;
       this.topic = topic;
-      return new KafkaAvroReporter(this, ConfigUtils.propertiesToConfig(props, Optional.of(ConfigurationKeys.METRICS_CONFIGURATIONS_PREFIX)));
+
+      // create a KafkaAvroReporter with metrics.* and gobblin.kafka.sharedConfig.* keys
+      return new KafkaAvroReporter(this, KafkaReporter.getKafkaAndMetricsConfigFromProperties(props));
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5c03b11b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/KafkaReporter.java
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/KafkaReporter.java b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/KafkaReporter.java
index 1c935b4..40a9fed 100644
--- a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/KafkaReporter.java
+++ b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/KafkaReporter.java
@@ -72,7 +72,9 @@ public class KafkaReporter extends MetricReportReporter {
     if (builder.kafkaPusher.isPresent()) {
       this.kafkaPusher = builder.kafkaPusher.get();
     } else {
-      Config kafkaConfig = ConfigUtils.getConfigOrEmpty(config, PusherUtils.METRICS_REPORTING_KAFKA_CONFIG_PREFIX);
+      Config kafkaConfig = ConfigUtils.getConfigOrEmpty(config, PusherUtils.METRICS_REPORTING_KAFKA_CONFIG_PREFIX)
+          .withFallback(ConfigUtils.getConfigOrEmpty(config, ConfigurationKeys.SHARED_KAFKA_CONFIG_PREFIX));
+
       String pusherClassName = ConfigUtils.getString(config, PusherUtils.KAFKA_PUSHER_CLASS_NAME_KEY,
           PusherUtils.DEFAULT_KAFKA_PUSHER_CLASS_NAME);
 
@@ -85,6 +87,15 @@ public class KafkaReporter extends MetricReportReporter {
   }
 
   /**
+   * Get config with metrics configuration and shared kafka configuration
+   */
+  public static Config getKafkaAndMetricsConfigFromProperties(Properties props) {
+    return ConfigUtils.propertiesToConfig(props, Optional.of(ConfigurationKeys.METRICS_CONFIGURATIONS_PREFIX))
+        .withFallback(ConfigUtils.propertiesToConfig(props,
+            Optional.of(ConfigurationKeys.SHARED_KAFKA_CONFIG_PREFIX)));
+  }
+
+  /**
    * A static factory class for obtaining new {@link Builder}s
    *
    * @see Builder
@@ -139,7 +150,8 @@ public class KafkaReporter extends MetricReportReporter {
       this.brokers = brokers;
       this.topic = topic;
 
-      return new KafkaReporter(this, ConfigUtils.propertiesToConfig(props, Optional.of(ConfigurationKeys.METRICS_CONFIGURATIONS_PREFIX)));
+      // create a KafkaReporter with metrics.* and gobblin.kafka.sharedConfig.* keys
+      return new KafkaReporter(this, KafkaReporter.getKafkaAndMetricsConfigFromProperties(props));
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5c03b11b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/KafkaReporterFactory.java
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/KafkaReporterFactory.java b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/KafkaReporterFactory.java
index 9faac33..4dcf717 100644
--- a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/KafkaReporterFactory.java
+++ b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/KafkaReporterFactory.java
@@ -94,8 +94,13 @@ public class KafkaReporterFactory implements CustomCodahaleReporterFactory {
         KafkaEventReporter.Builder<?> builder = formatEnum.eventReporterBuilder(RootMetricContext.get(),
             properties);
 
-        Config kafkaConfig = ConfigUtils.getConfigOrEmpty(ConfigUtils.propertiesToConfig(properties),
-            PusherUtils.METRICS_REPORTING_KAFKA_CONFIG_PREFIX);
+        Config allConfig = ConfigUtils.propertiesToConfig(properties);
+        // the kafka configuration is composed of the metrics reporting specific keys with a fallback to the shared
+        // kafka config
+        Config kafkaConfig = ConfigUtils.getConfigOrEmpty(allConfig,
+            PusherUtils.METRICS_REPORTING_KAFKA_CONFIG_PREFIX).withFallback(ConfigUtils.getConfigOrEmpty(allConfig,
+            ConfigurationKeys.SHARED_KAFKA_CONFIG_PREFIX));
+
         builder.withConfig(kafkaConfig);
 
         builder.withPusherClassName(properties.getProperty(PusherUtils.KAFKA_PUSHER_CLASS_NAME_KEY,

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5c03b11b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/DynamicConfigGeneratorFactory.java
----------------------------------------------------------------------
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/DynamicConfigGeneratorFactory.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/DynamicConfigGeneratorFactory.java
new file mode 100644
index 0000000..c424a9c
--- /dev/null
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/DynamicConfigGeneratorFactory.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.runtime;
+
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import java.util.Collections;
+
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.configuration.DynamicConfigGenerator;
+import org.apache.gobblin.util.ClassAliasResolver;
+import org.apache.gobblin.util.ConfigUtils;
+import org.apache.gobblin.util.reflection.GobblinConstructorUtils;
+
+
+/**
+ * For getting an instance of a {@link DynamicConfigGenerator}
+ */
+public class DynamicConfigGeneratorFactory {
+  /**
+   * Get an instance of a {@link DynamicConfigGenerator}
+   * @param config {@link Config} to pass to the constructor
+   * @return an instance of {@link DynamicConfigGenerator}
+   */
+  public static DynamicConfigGenerator createDynamicConfigGenerator(Config config) {
+    String dynamicConfigGeneratorClassName =
+        ConfigUtils.getString(config, ConfigurationKeys.DYNAMIC_CONFIG_GENERATOR_CLASS_KEY,
+            ConfigurationKeys.DEFAULT_DYNAMIC_CONFIG_GENERATOR_CLASS_KEY);
+
+    try {
+      ClassAliasResolver<DynamicConfigGenerator> aliasResolver =
+          new ClassAliasResolver<>(DynamicConfigGenerator.class);
+      return aliasResolver.resolveClass(dynamicConfigGeneratorClassName).newInstance();
+    } catch (ReflectiveOperationException e) {
+      throw new RuntimeException("Could not construct DynamicConfigGenerator " +
+          dynamicConfigGeneratorClassName, e);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5c03b11b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/mapreduce/MRJobLauncher.java
----------------------------------------------------------------------
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/mapreduce/MRJobLauncher.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/mapreduce/MRJobLauncher.java
index 9f17db1..306b4ef 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/mapreduce/MRJobLauncher.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/mapreduce/MRJobLauncher.java
@@ -21,10 +21,9 @@ import java.io.DataOutputStream;
 import java.io.FileInputStream;
 import java.io.IOException;
 import java.net.URI;
-import java.util.HashSet;
 import java.util.List;
+import java.util.Map;
 import java.util.Properties;
-import java.util.Set;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 
@@ -56,7 +55,9 @@ import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Lists;
 import com.google.common.io.Closer;
 import com.google.common.util.concurrent.ServiceManager;
+import com.typesafe.config.Config;
 import com.typesafe.config.ConfigFactory;
+import com.typesafe.config.ConfigValue;
 
 import org.apache.gobblin.broker.SharedResourcesBrokerFactory;
 import org.apache.gobblin.broker.gobblin_scopes.GobblinScopeTypes;
@@ -64,6 +65,7 @@ import org.apache.gobblin.broker.gobblin_scopes.JobScopeInstance;
 import org.apache.gobblin.broker.iface.SharedResourcesBroker;
 import org.apache.gobblin.commit.CommitStep;
 import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.configuration.DynamicConfigGenerator;
 import org.apache.gobblin.metastore.FsStateStore;
 import org.apache.gobblin.metastore.StateStore;
 import org.apache.gobblin.metrics.GobblinMetrics;
@@ -71,6 +73,7 @@ import org.apache.gobblin.metrics.Tag;
 import org.apache.gobblin.metrics.event.TimingEvent;
 import org.apache.gobblin.password.PasswordManager;
 import org.apache.gobblin.runtime.AbstractJobLauncher;
+import org.apache.gobblin.runtime.DynamicConfigGeneratorFactory;
 import org.apache.gobblin.runtime.GobblinMultiTaskAttempt;
 import org.apache.gobblin.runtime.JobLauncher;
 import org.apache.gobblin.runtime.JobState;
@@ -83,6 +86,7 @@ import org.apache.gobblin.runtime.util.JobMetrics;
 import org.apache.gobblin.runtime.util.MetricGroup;
 import org.apache.gobblin.source.workunit.MultiWorkUnit;
 import org.apache.gobblin.source.workunit.WorkUnit;
+import org.apache.gobblin.util.ConfigUtils;
 import org.apache.gobblin.util.HadoopUtils;
 import org.apache.gobblin.util.JobConfigurationUtils;
 import org.apache.gobblin.util.JobLauncherUtils;
@@ -626,7 +630,21 @@ public class MRJobLauncher extends AbstractJobLauncher {
         throw new RuntimeException("Failed to setup the mapper task", ioe);
       }
 
-      this.taskExecutor = new TaskExecutor(context.getConfiguration());
+
+      // load dynamic configuration to add to the job configuration
+      Configuration configuration = context.getConfiguration();
+      Config jobStateAsConfig = ConfigUtils.propertiesToConfig(this.jobState.getProperties());
+      DynamicConfigGenerator dynamicConfigGenerator = DynamicConfigGeneratorFactory.createDynamicConfigGenerator(
+          jobStateAsConfig);
+      Config dynamicConfig = dynamicConfigGenerator.generateDynamicConfig(jobStateAsConfig);
+
+      // add the dynamic config to the job config
+      for (Map.Entry<String, ConfigValue> entry : dynamicConfig.entrySet()) {
+        this.jobState.setProp(entry.getKey(), entry.getValue().unwrapped().toString());
+        configuration.set(entry.getKey(), entry.getValue().unwrapped().toString());
+      }
+
+      this.taskExecutor = new TaskExecutor(configuration);
       this.taskStateTracker = new MRTaskStateTracker(context);
       this.serviceManager = new ServiceManager(Lists.newArrayList(this.taskExecutor, this.taskStateTracker));
       try {
@@ -636,8 +654,6 @@ public class MRJobLauncher extends AbstractJobLauncher {
         throw new RuntimeException(te);
       }
 
-      Configuration configuration = context.getConfiguration();
-
       // Setup and start metrics reporting if metric reporting is enabled
       if (Boolean.valueOf(
           configuration.get(ConfigurationKeys.METRICS_ENABLED_KEY, ConfigurationKeys.DEFAULT_METRICS_ENABLED))) {

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5c03b11b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/JobLauncherTestHelper.java
----------------------------------------------------------------------
diff --git a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/JobLauncherTestHelper.java b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/JobLauncherTestHelper.java
index 497fd88..c947dfc 100644
--- a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/JobLauncherTestHelper.java
+++ b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/JobLauncherTestHelper.java
@@ -50,6 +50,8 @@ import org.apache.gobblin.util.JobLauncherUtils;
 public class JobLauncherTestHelper {
 
   public static final String SOURCE_FILE_LIST_KEY = "source.files";
+  public static final String DYNAMIC_KEY1 = "DynamicKey1";
+  public static final String DYNAMIC_VALUE1 = "DynamicValue1";
 
   private final StateStore<JobState.DatasetState> datasetStateStore;
   private final Properties launcherProps;

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5c03b11b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/mapreduce/MRJobLauncherTest.java
----------------------------------------------------------------------
diff --git a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/mapreduce/MRJobLauncherTest.java b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/mapreduce/MRJobLauncherTest.java
index 60e71a7..8d4f308 100644
--- a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/mapreduce/MRJobLauncherTest.java
+++ b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/mapreduce/MRJobLauncherTest.java
@@ -33,7 +33,12 @@ import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
 
+import com.google.common.collect.ImmutableMap;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+
 import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.configuration.DynamicConfigGenerator;
 import org.apache.gobblin.configuration.WorkUnitState;
 import org.apache.gobblin.metastore.FsStateStore;
 import org.apache.gobblin.metastore.StateStore;
@@ -271,6 +276,10 @@ public class MRJobLauncherTest extends BMNGRunner {
       @BMRule(name = "saveSuccessCount", targetClass = "org.apache.gobblin.metastore.FsStateStore",
           targetMethod = "put", targetLocation = "AT ENTRY", condition = "$2.endsWith(\".suc\")",
           action = "org.apache.gobblin.runtime.mapreduce.MRJobLauncherTest.sucCount2 = org.apache.gobblin.runtime.mapreduce.MRJobLauncherTest.sucCount2 + 1"),
+      @BMRule(name = "checkProp", targetClass = "org.apache.gobblin.runtime.mapreduce.MRJobLauncher$TaskRunner",
+          targetMethod = "setup", targetLocation = "AT EXIT",
+          condition = "!$0.jobState.getProp(\"DynamicKey1\").equals(\"DynamicValue1\")",
+          action = "throw new RuntimeException(\"could not find key\")"),
       @BMRule(name = "writeSuccessFile", targetClass = "org.apache.gobblin.runtime.GobblinMultiTaskAttempt",
           targetMethod = "taskSuccessfulInPriorAttempt", targetLocation = "AFTER WRITE $taskStateStore",
           condition = "$1.endsWith(\"_1\")",
@@ -281,6 +290,10 @@ public class MRJobLauncherTest extends BMNGRunner {
     jobProps.setProperty(ConfigurationKeys.JOB_NAME_KEY,
         jobProps.getProperty(ConfigurationKeys.JOB_NAME_KEY) + "-testLaunchJobWithMultiWorkUnitAndSucFile");
     jobProps.setProperty("use.multiworkunit", Boolean.toString(true));
+
+    jobProps.setProperty("dynamicConfigGenerator.class",
+        "org.apache.gobblin.runtime.mapreduce.MRJobLauncherTest$TestDynamicConfigGenerator");
+
     try {
       this.jobLauncherTestHelper.runTestWithSkippedTask(jobProps, "_1");
 
@@ -366,4 +379,15 @@ public class MRJobLauncherTest extends BMNGRunner {
 
     return jobProps;
   }
+
+  public static class TestDynamicConfigGenerator implements DynamicConfigGenerator {
+    public TestDynamicConfigGenerator() {
+    }
+
+    @Override
+    public Config generateDynamicConfig(Config config) {
+      return ConfigFactory.parseMap(ImmutableMap.of(JobLauncherTestHelper.DYNAMIC_KEY1,
+          JobLauncherTestHelper.DYNAMIC_VALUE1));
+    }
+  }
 }