You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by hu...@apache.org on 2017/11/21 03:51:57 UTC
incubator-gobblin git commit: [GOBBLIN-317] Add dynamic configuration
injection in the mappers
Repository: incubator-gobblin
Updated Branches:
refs/heads/master c8707eded -> 5c03b11b5
[GOBBLIN-317] Add dynamic configuration injection in the mappers
Closes #2170 from htran1/dynamic_config
Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/5c03b11b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/5c03b11b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/5c03b11b
Branch: refs/heads/master
Commit: 5c03b11b5b5a62a611fa2a1042bfe64c283c6a5e
Parents: c8707ed
Author: Hung Tran <hu...@linkedin.com>
Authored: Mon Nov 20 19:51:48 2017 -0800
Committer: Hung Tran <hu...@linkedin.com>
Committed: Mon Nov 20 19:51:48 2017 -0800
----------------------------------------------------------------------
.../configuration/ConfigurationKeys.java | 8 ++-
.../configuration/DynamicConfigGenerator.java | 33 ++++++++++++
.../NoopDynamicConfigGenerator.java | 36 +++++++++++++
.../gobblin/azkaban/AzkabanJobLauncher.java | 15 ++++++
.../kafka/client/Kafka09ConsumerClient.java | 7 ++-
.../metrics/kafka/KafkaAvroReporter.java | 6 +--
.../gobblin/metrics/kafka/KafkaReporter.java | 16 +++++-
.../metrics/kafka/KafkaReporterFactory.java | 9 +++-
.../runtime/DynamicConfigGeneratorFactory.java | 53 ++++++++++++++++++++
.../runtime/mapreduce/MRJobLauncher.java | 26 ++++++++--
.../gobblin/runtime/JobLauncherTestHelper.java | 2 +
.../runtime/mapreduce/MRJobLauncherTest.java | 24 +++++++++
12 files changed, 220 insertions(+), 15 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5c03b11b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
----------------------------------------------------------------------
diff --git a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
index a563b43..3e576ce 100644
--- a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
+++ b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
@@ -205,6 +205,12 @@ public class ConfigurationKeys {
public static final String DEFAULT_EVENT_METADATA_GENERATOR_CLASS_KEY = "noop";
/**
+ * Configuration for dynamic configuration generation
+ */
+ public static final String DYNAMIC_CONFIG_GENERATOR_CLASS_KEY = "dynamicConfigGenerator.class";
+ public static final String DEFAULT_DYNAMIC_CONFIG_GENERATOR_CLASS_KEY = "noop";
+
+ /**
* Configuration properties used internally.
*/
public static final String JOB_ID_KEY = "job.id";
@@ -734,7 +740,7 @@ public class ConfigurationKeys {
public static final boolean DEFAULT_KAFKA_SOURCE_SHARE_CONSUMER_CLIENT = false;
public static final String KAFKA_SOURCE_AVG_FETCH_TIME_CAP = "kakfa.source.avgFetchTimeCap";
public static final int DEFAULT_KAFKA_SOURCE_AVG_FETCH_TIME_CAP = 100;
-
+ public static final String SHARED_KAFKA_CONFIG_PREFIX = "gobblin.kafka.sharedConfig";
/**
* Job execution info server and history store configuration properties.
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5c03b11b/gobblin-api/src/main/java/org/apache/gobblin/configuration/DynamicConfigGenerator.java
----------------------------------------------------------------------
diff --git a/gobblin-api/src/main/java/org/apache/gobblin/configuration/DynamicConfigGenerator.java b/gobblin-api/src/main/java/org/apache/gobblin/configuration/DynamicConfigGenerator.java
new file mode 100644
index 0000000..9783fab
--- /dev/null
+++ b/gobblin-api/src/main/java/org/apache/gobblin/configuration/DynamicConfigGenerator.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.configuration;
+
+import com.typesafe.config.Config;
+
+/**
+ * For generating dynamic configuration that gets added to the job configuration.
+ * These are configuration values that cannot be determined statically at job specification time.
+ * One example is the SSL certificate location of a certificate that is fetched at runtime.
+ */
+public interface DynamicConfigGenerator {
+ /**
+ * Generate dynamic configuration that should be added to the job configuration.
+ * @param config configuration
+ * @return config object with the dynamic configuration
+ */
+ Config generateDynamicConfig(Config config);
+}
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5c03b11b/gobblin-api/src/main/java/org/apache/gobblin/configuration/NoopDynamicConfigGenerator.java
----------------------------------------------------------------------
diff --git a/gobblin-api/src/main/java/org/apache/gobblin/configuration/NoopDynamicConfigGenerator.java b/gobblin-api/src/main/java/org/apache/gobblin/configuration/NoopDynamicConfigGenerator.java
new file mode 100644
index 0000000..c990690
--- /dev/null
+++ b/gobblin-api/src/main/java/org/apache/gobblin/configuration/NoopDynamicConfigGenerator.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.configuration;
+
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import org.apache.gobblin.annotation.Alias;
+
+
+/**
+ * NoOp dynamic config generator that returns an empty {@link Config}
+ */
+@Alias("noop")
+public class NoopDynamicConfigGenerator implements DynamicConfigGenerator {
+
+ public NoopDynamicConfigGenerator() {
+ }
+
+ public Config generateDynamicConfig(Config config) {
+ return ConfigFactory.empty();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5c03b11b/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/azkaban/AzkabanJobLauncher.java
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/azkaban/AzkabanJobLauncher.java b/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/azkaban/AzkabanJobLauncher.java
index 20b630b..5c9fc1d 100644
--- a/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/azkaban/AzkabanJobLauncher.java
+++ b/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/azkaban/AzkabanJobLauncher.java
@@ -21,6 +21,7 @@ import java.io.File;
import java.io.IOException;
import java.net.URI;
import java.util.List;
+import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.UUID;
@@ -46,12 +47,15 @@ import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import com.google.common.io.Closer;
import com.typesafe.config.Config;
+import com.typesafe.config.ConfigValue;
import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.configuration.DynamicConfigGenerator;
import org.apache.gobblin.configuration.State;
import org.apache.gobblin.metrics.GobblinMetrics;
import org.apache.gobblin.metrics.RootMetricContext;
import org.apache.gobblin.metrics.Tag;
+import org.apache.gobblin.runtime.DynamicConfigGeneratorFactory;
import org.apache.gobblin.runtime.JobException;
import org.apache.gobblin.runtime.JobLauncher;
import org.apache.gobblin.runtime.JobLauncherFactory;
@@ -128,6 +132,17 @@ public class AzkabanJobLauncher extends AbstractJob implements ApplicationLaunch
this.props = new Properties();
this.props.putAll(props);
+ // load dynamic configuration and add them to the job properties
+ Config propsAsConfig = ConfigUtils.propertiesToConfig(props);
+ DynamicConfigGenerator dynamicConfigGenerator =
+ DynamicConfigGeneratorFactory.createDynamicConfigGenerator(propsAsConfig);
+ Config dynamicConfig = dynamicConfigGenerator.generateDynamicConfig(propsAsConfig);
+
+ // add the dynamic config to the job config
+ for (Map.Entry<String, ConfigValue> entry : dynamicConfig.entrySet()) {
+ this.props.put(entry.getKey(), entry.getValue().unwrapped().toString());
+ }
+
Configuration conf = new Configuration();
String fsUri = conf.get(HADOOP_FS_DEFAULT_NAME);
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5c03b11b/gobblin-modules/gobblin-kafka-09/src/main/java/org/apache/gobblin/kafka/client/Kafka09ConsumerClient.java
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-kafka-09/src/main/java/org/apache/gobblin/kafka/client/Kafka09ConsumerClient.java b/gobblin-modules/gobblin-kafka-09/src/main/java/org/apache/gobblin/kafka/client/Kafka09ConsumerClient.java
index 7f83192..5ab27e0 100644
--- a/gobblin-modules/gobblin-kafka-09/src/main/java/org/apache/gobblin/kafka/client/Kafka09ConsumerClient.java
+++ b/gobblin-modules/gobblin-kafka-09/src/main/java/org/apache/gobblin/kafka/client/Kafka09ConsumerClient.java
@@ -44,6 +44,7 @@ import javax.annotation.Nonnull;
import lombok.EqualsAndHashCode;
import lombok.ToString;
+import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.source.extractor.extract.kafka.KafkaOffsetRetrievalFailureException;
import org.apache.gobblin.source.extractor.extract.kafka.KafkaPartition;
import org.apache.gobblin.source.extractor.extract.kafka.KafkaTopic;
@@ -96,8 +97,10 @@ public class Kafka09ConsumerClient<K, V> extends AbstractBaseKafkaConsumerClient
// grab all the config under "source.kafka" and add the defaults as fallback.
Config baseConfig = ConfigUtils.getConfigOrEmpty(config, CONFIG_NAMESPACE).withFallback(FALLBACK);
- // get the "source.kafka.consumerConfig" config for extra config to pass along to Kafka
- Config specificConfig = ConfigUtils.getConfigOrEmpty(baseConfig, CONSUMER_CONFIG);
+ // get the "source.kafka.consumerConfig" config for extra config to pass along to Kafka with a fallback to the
+ // shared config that start with "gobblin.kafka.sharedConfig"
+ Config specificConfig = ConfigUtils.getConfigOrEmpty(baseConfig, CONSUMER_CONFIG).withFallback(
+ ConfigUtils.getConfigOrEmpty(config, ConfigurationKeys.SHARED_KAFKA_CONFIG_PREFIX));
// The specific config overrides settings in the base config
Config scopedConfig = specificConfig.withFallback(baseConfig.withoutPath(CONSUMER_CONFIG));
props.putAll(ConfigUtils.configToProperties(scopedConfig));
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5c03b11b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/KafkaAvroReporter.java
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/KafkaAvroReporter.java b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/KafkaAvroReporter.java
index 4b6399b..5cb3ce8 100644
--- a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/KafkaAvroReporter.java
+++ b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/KafkaAvroReporter.java
@@ -25,13 +25,11 @@ import org.apache.avro.Schema;
import com.google.common.base.Optional;
import com.typesafe.config.Config;
-import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.metrics.MetricReport;
import org.apache.gobblin.metrics.reporter.util.AvroBinarySerializer;
import org.apache.gobblin.metrics.reporter.util.AvroSerializer;
import org.apache.gobblin.metrics.reporter.util.SchemaRegistryVersionWriter;
import org.apache.gobblin.metrics.reporter.util.SchemaVersionWriter;
-import org.apache.gobblin.util.ConfigUtils;
/**
@@ -99,7 +97,9 @@ public class KafkaAvroReporter extends KafkaReporter {
public KafkaAvroReporter build(String brokers, String topic, Properties props) throws IOException {
this.brokers = brokers;
this.topic = topic;
- return new KafkaAvroReporter(this, ConfigUtils.propertiesToConfig(props, Optional.of(ConfigurationKeys.METRICS_CONFIGURATIONS_PREFIX)));
+
+ // create a KafkaAvroReporter with metrics.* and gobblin.kafka.sharedConfig.* keys
+ return new KafkaAvroReporter(this, KafkaReporter.getKafkaAndMetricsConfigFromProperties(props));
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5c03b11b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/KafkaReporter.java
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/KafkaReporter.java b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/KafkaReporter.java
index 1c935b4..40a9fed 100644
--- a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/KafkaReporter.java
+++ b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/KafkaReporter.java
@@ -72,7 +72,9 @@ public class KafkaReporter extends MetricReportReporter {
if (builder.kafkaPusher.isPresent()) {
this.kafkaPusher = builder.kafkaPusher.get();
} else {
- Config kafkaConfig = ConfigUtils.getConfigOrEmpty(config, PusherUtils.METRICS_REPORTING_KAFKA_CONFIG_PREFIX);
+ Config kafkaConfig = ConfigUtils.getConfigOrEmpty(config, PusherUtils.METRICS_REPORTING_KAFKA_CONFIG_PREFIX)
+ .withFallback(ConfigUtils.getConfigOrEmpty(config, ConfigurationKeys.SHARED_KAFKA_CONFIG_PREFIX));
+
String pusherClassName = ConfigUtils.getString(config, PusherUtils.KAFKA_PUSHER_CLASS_NAME_KEY,
PusherUtils.DEFAULT_KAFKA_PUSHER_CLASS_NAME);
@@ -85,6 +87,15 @@ public class KafkaReporter extends MetricReportReporter {
}
/**
+ * Get config with metrics configuration and shared kafka configuration
+ */
+ public static Config getKafkaAndMetricsConfigFromProperties(Properties props) {
+ return ConfigUtils.propertiesToConfig(props, Optional.of(ConfigurationKeys.METRICS_CONFIGURATIONS_PREFIX))
+ .withFallback(ConfigUtils.propertiesToConfig(props,
+ Optional.of(ConfigurationKeys.SHARED_KAFKA_CONFIG_PREFIX)));
+ }
+
+ /**
* A static factory class for obtaining new {@link Builder}s
*
* @see Builder
@@ -139,7 +150,8 @@ public class KafkaReporter extends MetricReportReporter {
this.brokers = brokers;
this.topic = topic;
- return new KafkaReporter(this, ConfigUtils.propertiesToConfig(props, Optional.of(ConfigurationKeys.METRICS_CONFIGURATIONS_PREFIX)));
+ // create a KafkaReporter with metrics.* and gobblin.kafka.sharedConfig.* keys
+ return new KafkaReporter(this, KafkaReporter.getKafkaAndMetricsConfigFromProperties(props));
}
}
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5c03b11b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/KafkaReporterFactory.java
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/KafkaReporterFactory.java b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/KafkaReporterFactory.java
index 9faac33..4dcf717 100644
--- a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/KafkaReporterFactory.java
+++ b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/KafkaReporterFactory.java
@@ -94,8 +94,13 @@ public class KafkaReporterFactory implements CustomCodahaleReporterFactory {
KafkaEventReporter.Builder<?> builder = formatEnum.eventReporterBuilder(RootMetricContext.get(),
properties);
- Config kafkaConfig = ConfigUtils.getConfigOrEmpty(ConfigUtils.propertiesToConfig(properties),
- PusherUtils.METRICS_REPORTING_KAFKA_CONFIG_PREFIX);
+ Config allConfig = ConfigUtils.propertiesToConfig(properties);
+ // the kafka configuration is composed of the metrics reporting specific keys with a fallback to the shared
+ // kafka config
+ Config kafkaConfig = ConfigUtils.getConfigOrEmpty(allConfig,
+ PusherUtils.METRICS_REPORTING_KAFKA_CONFIG_PREFIX).withFallback(ConfigUtils.getConfigOrEmpty(allConfig,
+ ConfigurationKeys.SHARED_KAFKA_CONFIG_PREFIX));
+
builder.withConfig(kafkaConfig);
builder.withPusherClassName(properties.getProperty(PusherUtils.KAFKA_PUSHER_CLASS_NAME_KEY,
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5c03b11b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/DynamicConfigGeneratorFactory.java
----------------------------------------------------------------------
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/DynamicConfigGeneratorFactory.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/DynamicConfigGeneratorFactory.java
new file mode 100644
index 0000000..c424a9c
--- /dev/null
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/DynamicConfigGeneratorFactory.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.runtime;
+
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import java.util.Collections;
+
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.configuration.DynamicConfigGenerator;
+import org.apache.gobblin.util.ClassAliasResolver;
+import org.apache.gobblin.util.ConfigUtils;
+import org.apache.gobblin.util.reflection.GobblinConstructorUtils;
+
+
+/**
+ * For getting an instance of a {@link DynamicConfigGenerator}
+ */
+public class DynamicConfigGeneratorFactory {
+ /**
+ * Get an instance of a {@link DynamicConfigGenerator}
+ * @param config {@link Config} to pass to the constructor
+ * @return an instance of {@link DynamicConfigGenerator}
+ */
+ public static DynamicConfigGenerator createDynamicConfigGenerator(Config config) {
+ String dynamicConfigGeneratorClassName =
+ ConfigUtils.getString(config, ConfigurationKeys.DYNAMIC_CONFIG_GENERATOR_CLASS_KEY,
+ ConfigurationKeys.DEFAULT_DYNAMIC_CONFIG_GENERATOR_CLASS_KEY);
+
+ try {
+ ClassAliasResolver<DynamicConfigGenerator> aliasResolver =
+ new ClassAliasResolver<>(DynamicConfigGenerator.class);
+ return aliasResolver.resolveClass(dynamicConfigGeneratorClassName).newInstance();
+ } catch (ReflectiveOperationException e) {
+ throw new RuntimeException("Could not construct DynamicConfigGenerator " +
+ dynamicConfigGeneratorClassName, e);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5c03b11b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/mapreduce/MRJobLauncher.java
----------------------------------------------------------------------
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/mapreduce/MRJobLauncher.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/mapreduce/MRJobLauncher.java
index 9f17db1..306b4ef 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/mapreduce/MRJobLauncher.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/mapreduce/MRJobLauncher.java
@@ -21,10 +21,9 @@ import java.io.DataOutputStream;
import java.io.FileInputStream;
import java.io.IOException;
import java.net.URI;
-import java.util.HashSet;
import java.util.List;
+import java.util.Map;
import java.util.Properties;
-import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
@@ -56,7 +55,9 @@ import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.io.Closer;
import com.google.common.util.concurrent.ServiceManager;
+import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
+import com.typesafe.config.ConfigValue;
import org.apache.gobblin.broker.SharedResourcesBrokerFactory;
import org.apache.gobblin.broker.gobblin_scopes.GobblinScopeTypes;
@@ -64,6 +65,7 @@ import org.apache.gobblin.broker.gobblin_scopes.JobScopeInstance;
import org.apache.gobblin.broker.iface.SharedResourcesBroker;
import org.apache.gobblin.commit.CommitStep;
import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.configuration.DynamicConfigGenerator;
import org.apache.gobblin.metastore.FsStateStore;
import org.apache.gobblin.metastore.StateStore;
import org.apache.gobblin.metrics.GobblinMetrics;
@@ -71,6 +73,7 @@ import org.apache.gobblin.metrics.Tag;
import org.apache.gobblin.metrics.event.TimingEvent;
import org.apache.gobblin.password.PasswordManager;
import org.apache.gobblin.runtime.AbstractJobLauncher;
+import org.apache.gobblin.runtime.DynamicConfigGeneratorFactory;
import org.apache.gobblin.runtime.GobblinMultiTaskAttempt;
import org.apache.gobblin.runtime.JobLauncher;
import org.apache.gobblin.runtime.JobState;
@@ -83,6 +86,7 @@ import org.apache.gobblin.runtime.util.JobMetrics;
import org.apache.gobblin.runtime.util.MetricGroup;
import org.apache.gobblin.source.workunit.MultiWorkUnit;
import org.apache.gobblin.source.workunit.WorkUnit;
+import org.apache.gobblin.util.ConfigUtils;
import org.apache.gobblin.util.HadoopUtils;
import org.apache.gobblin.util.JobConfigurationUtils;
import org.apache.gobblin.util.JobLauncherUtils;
@@ -626,7 +630,21 @@ public class MRJobLauncher extends AbstractJobLauncher {
throw new RuntimeException("Failed to setup the mapper task", ioe);
}
- this.taskExecutor = new TaskExecutor(context.getConfiguration());
+
+ // load dynamic configuration to add to the job configuration
+ Configuration configuration = context.getConfiguration();
+ Config jobStateAsConfig = ConfigUtils.propertiesToConfig(this.jobState.getProperties());
+ DynamicConfigGenerator dynamicConfigGenerator = DynamicConfigGeneratorFactory.createDynamicConfigGenerator(
+ jobStateAsConfig);
+ Config dynamicConfig = dynamicConfigGenerator.generateDynamicConfig(jobStateAsConfig);
+
+ // add the dynamic config to the job config
+ for (Map.Entry<String, ConfigValue> entry : dynamicConfig.entrySet()) {
+ this.jobState.setProp(entry.getKey(), entry.getValue().unwrapped().toString());
+ configuration.set(entry.getKey(), entry.getValue().unwrapped().toString());
+ }
+
+ this.taskExecutor = new TaskExecutor(configuration);
this.taskStateTracker = new MRTaskStateTracker(context);
this.serviceManager = new ServiceManager(Lists.newArrayList(this.taskExecutor, this.taskStateTracker));
try {
@@ -636,8 +654,6 @@ public class MRJobLauncher extends AbstractJobLauncher {
throw new RuntimeException(te);
}
- Configuration configuration = context.getConfiguration();
-
// Setup and start metrics reporting if metric reporting is enabled
if (Boolean.valueOf(
configuration.get(ConfigurationKeys.METRICS_ENABLED_KEY, ConfigurationKeys.DEFAULT_METRICS_ENABLED))) {
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5c03b11b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/JobLauncherTestHelper.java
----------------------------------------------------------------------
diff --git a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/JobLauncherTestHelper.java b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/JobLauncherTestHelper.java
index 497fd88..c947dfc 100644
--- a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/JobLauncherTestHelper.java
+++ b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/JobLauncherTestHelper.java
@@ -50,6 +50,8 @@ import org.apache.gobblin.util.JobLauncherUtils;
public class JobLauncherTestHelper {
public static final String SOURCE_FILE_LIST_KEY = "source.files";
+ public static final String DYNAMIC_KEY1 = "DynamicKey1";
+ public static final String DYNAMIC_VALUE1 = "DynamicValue1";
private final StateStore<JobState.DatasetState> datasetStateStore;
private final Properties launcherProps;
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5c03b11b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/mapreduce/MRJobLauncherTest.java
----------------------------------------------------------------------
diff --git a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/mapreduce/MRJobLauncherTest.java b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/mapreduce/MRJobLauncherTest.java
index 60e71a7..8d4f308 100644
--- a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/mapreduce/MRJobLauncherTest.java
+++ b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/mapreduce/MRJobLauncherTest.java
@@ -33,7 +33,12 @@ import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
+import com.google.common.collect.ImmutableMap;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+
import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.configuration.DynamicConfigGenerator;
import org.apache.gobblin.configuration.WorkUnitState;
import org.apache.gobblin.metastore.FsStateStore;
import org.apache.gobblin.metastore.StateStore;
@@ -271,6 +276,10 @@ public class MRJobLauncherTest extends BMNGRunner {
@BMRule(name = "saveSuccessCount", targetClass = "org.apache.gobblin.metastore.FsStateStore",
targetMethod = "put", targetLocation = "AT ENTRY", condition = "$2.endsWith(\".suc\")",
action = "org.apache.gobblin.runtime.mapreduce.MRJobLauncherTest.sucCount2 = org.apache.gobblin.runtime.mapreduce.MRJobLauncherTest.sucCount2 + 1"),
+ @BMRule(name = "checkProp", targetClass = "org.apache.gobblin.runtime.mapreduce.MRJobLauncher$TaskRunner",
+ targetMethod = "setup", targetLocation = "AT EXIT",
+ condition = "!$0.jobState.getProp(\"DynamicKey1\").equals(\"DynamicValue1\")",
+ action = "throw new RuntimeException(\"could not find key\")"),
@BMRule(name = "writeSuccessFile", targetClass = "org.apache.gobblin.runtime.GobblinMultiTaskAttempt",
targetMethod = "taskSuccessfulInPriorAttempt", targetLocation = "AFTER WRITE $taskStateStore",
condition = "$1.endsWith(\"_1\")",
@@ -281,6 +290,10 @@ public class MRJobLauncherTest extends BMNGRunner {
jobProps.setProperty(ConfigurationKeys.JOB_NAME_KEY,
jobProps.getProperty(ConfigurationKeys.JOB_NAME_KEY) + "-testLaunchJobWithMultiWorkUnitAndSucFile");
jobProps.setProperty("use.multiworkunit", Boolean.toString(true));
+
+ jobProps.setProperty("dynamicConfigGenerator.class",
+ "org.apache.gobblin.runtime.mapreduce.MRJobLauncherTest$TestDynamicConfigGenerator");
+
try {
this.jobLauncherTestHelper.runTestWithSkippedTask(jobProps, "_1");
@@ -366,4 +379,15 @@ public class MRJobLauncherTest extends BMNGRunner {
return jobProps;
}
+
+ public static class TestDynamicConfigGenerator implements DynamicConfigGenerator {
+ public TestDynamicConfigGenerator() {
+ }
+
+ @Override
+ public Config generateDynamicConfig(Config config) {
+ return ConfigFactory.parseMap(ImmutableMap.of(JobLauncherTestHelper.DYNAMIC_KEY1,
+ JobLauncherTestHelper.DYNAMIC_VALUE1));
+ }
+ }
}