You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by st...@apache.org on 2016/03/04 09:24:45 UTC
ambari git commit: AMBARI-14720. Exporting Blueprint doesn't have
some configs. (Daniel Gergely via stoader)
Repository: ambari
Updated Branches:
refs/heads/trunk 65724a9b9 -> 75555dc56
AMBARI-14720. Exporting Blueprint doesn't have some configs. (Daniel Gergely via stoader)
Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/75555dc5
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/75555dc5
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/75555dc5
Branch: refs/heads/trunk
Commit: 75555dc56810bc296294d44623ace1bac598511a
Parents: 65724a9
Author: Toader, Sebastian <st...@hortonworks.com>
Authored: Fri Mar 4 09:23:52 2016 +0100
Committer: Toader, Sebastian <st...@hortonworks.com>
Committed: Fri Mar 4 09:23:52 2016 +0100
----------------------------------------------------------------------
.../BlueprintConfigurationProcessor.java | 66 ++++++++++++++++--
.../BlueprintConfigurationProcessorTest.java | 72 ++++++++++++++++++++
2 files changed, 134 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ambari/blob/75555dc5/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/BlueprintConfigurationProcessor.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/BlueprintConfigurationProcessor.java b/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/BlueprintConfigurationProcessor.java
index 2d9a851..f5e7578 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/BlueprintConfigurationProcessor.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/BlueprintConfigurationProcessor.java
@@ -101,6 +101,12 @@ public class BlueprintConfigurationProcessor {
new HashMap<String, Map<String, PropertyUpdater>>();
/**
+ * Non topology related updaters
+ */
+ private static Map<String, Map<String, PropertyUpdater>> nonTopologyUpdaters =
+ new HashMap<String, Map<String, PropertyUpdater>>();
+
+ /**
* Updaters that preserve the original property value, functions
* as a placeholder for DB-related properties that need to be
* removed from export, but do not require an update during
@@ -360,6 +366,8 @@ public class BlueprintConfigurationProcessor {
doMultiHostExportUpdate(multiHostTopologyUpdaters, configuration);
+ doNonTopologyUpdate(nonTopologyUpdaters, configuration);
+
doRemovePropertyExport(removePropertyUpdaters, configuration);
doFilterPriorToExport(configuration);
@@ -1109,6 +1117,28 @@ public class BlueprintConfigurationProcessor {
}
/**
+ * Update non topology related configuration properties for blueprint export.
+ *
+ * @param updaters registered non topology updaters
+ * @param configuration configuration being processed
+ */
+ private void doNonTopologyUpdate(Map<String, Map<String, PropertyUpdater>> updaters, Configuration configuration) {
+ Map<String, Map<String, String>> properties = configuration.getFullProperties();
+ for (Map.Entry<String, Map<String, PropertyUpdater>> entry : updaters.entrySet()) {
+ String type = entry.getKey();
+ for (String propertyName : entry.getValue().keySet()) {
+ NonTopologyUpdater npu = (NonTopologyUpdater) entry.getValue().get(propertyName);
+ Map<String, String> typeProperties = properties.get(type);
+
+ if (typeProperties != null && typeProperties.containsKey(propertyName)) {
+ String newValue = npu.updateForBlueprintExport(propertyName, typeProperties.get(propertyName), properties, clusterTopology);
+ configuration.setProperty(type, propertyName, newValue);
+ }
+ }
+ }
+ }
+
+ /**
* Provides functionality to update a property value.
*/
public interface PropertyUpdater {
@@ -2137,6 +2167,13 @@ public class BlueprintConfigurationProcessor {
ClusterTopology topology) {
return Collections.emptyList();
}
+
+ public String updateForBlueprintExport(String propertyName,
+ String origValue,
+ Map<String, Map<String, String>> properties,
+ ClusterTopology topology) {
+ return origValue;
+ }
}
@@ -2149,6 +2186,7 @@ public class BlueprintConfigurationProcessor {
allUpdaters.add(multiHostTopologyUpdaters);
allUpdaters.add(dbHostTopologyUpdaters);
allUpdaters.add(mPropertyUpdaters);
+ allUpdaters.add(nonTopologyUpdaters);
Map<String, PropertyUpdater> hdfsSiteMap = new HashMap<String, PropertyUpdater>();
Map<String, PropertyUpdater> mapredSiteMap = new HashMap<String, PropertyUpdater>();
@@ -2156,12 +2194,15 @@ public class BlueprintConfigurationProcessor {
Map<String, PropertyUpdater> hbaseSiteMap = new HashMap<String, PropertyUpdater>();
Map<String, PropertyUpdater> yarnSiteMap = new HashMap<String, PropertyUpdater>();
Map<String, PropertyUpdater> hiveSiteMap = new HashMap<String, PropertyUpdater>();
+ Map<String, PropertyUpdater> hiveSiteNonTopologyMap = new HashMap<String, PropertyUpdater>();
Map<String, PropertyUpdater> oozieSiteOriginalValueMap = new HashMap<String, PropertyUpdater>();
Map<String, PropertyUpdater> oozieSiteMap = new HashMap<String, PropertyUpdater>();
Map<String, PropertyUpdater> stormSiteMap = new HashMap<String, PropertyUpdater>();
+ Map<String, PropertyUpdater> stormSiteNonTopologyMap = new HashMap<String, PropertyUpdater>();
Map<String, PropertyUpdater> accumuloSiteMap = new HashMap<String, PropertyUpdater>();
Map<String, PropertyUpdater> falconStartupPropertiesMap = new HashMap<String, PropertyUpdater>();
Map<String, PropertyUpdater> kafkaBrokerMap = new HashMap<String, PropertyUpdater>();
+ Map<String, PropertyUpdater> kafkaBrokerNonTopologyMap = new HashMap<String, PropertyUpdater>();
Map<String, PropertyUpdater> atlasPropsMap = new HashMap<String, PropertyUpdater>();
Map<String, PropertyUpdater> mapredEnvMap = new HashMap<String, PropertyUpdater>();
Map<String, PropertyUpdater> hadoopEnvMap = new HashMap<String, PropertyUpdater>();
@@ -2229,6 +2270,10 @@ public class BlueprintConfigurationProcessor {
removePropertyUpdaters.put("oozie-env", oozieEnvOriginalValueMap);
removePropertyUpdaters.put("oozie-site", oozieSiteOriginalValueMap);
+ nonTopologyUpdaters.put("hive-site", hiveSiteNonTopologyMap);
+ nonTopologyUpdaters.put("kafka-broker", kafkaBrokerNonTopologyMap);
+ nonTopologyUpdaters.put("storm-site", stormSiteNonTopologyMap);
+
//todo: Need to change updaters back to being static
//todo: will need to pass ClusterTopology in as necessary
@@ -2293,7 +2338,7 @@ public class BlueprintConfigurationProcessor {
multiHiveSiteMap.put("hive.cluster.delegation.token.store.zookeeper.connectString", new MultipleHostTopologyUpdater("ZOOKEEPER_SERVER"));
// HIVE Atlas integration
- hiveSiteMap.put("hive.exec.post.hooks", new NonTopologyUpdater() {
+ hiveSiteNonTopologyMap.put("hive.exec.post.hooks", new NonTopologyUpdater() {
@Override
public String updateForClusterCreate(String propertyName,
String origValue,
@@ -2314,7 +2359,7 @@ public class BlueprintConfigurationProcessor {
});
//todo: john - this property should be moved to atlas configuration
- hiveSiteMap.put("atlas.cluster.name", new NonTopologyUpdater() {
+ hiveSiteNonTopologyMap.put("atlas.cluster.name", new NonTopologyUpdater() {
@Override
public String updateForClusterCreate(String propertyName,
String origValue,
@@ -2334,6 +2379,19 @@ public class BlueprintConfigurationProcessor {
return origValue;
}
}
+
+ @Override
+ public String updateForBlueprintExport(String propertyName,
+ String origValue,
+ Map<String, Map<String, String>> properties,
+ ClusterTopology topology) {
+
+ // if the value is the cluster id, then update to primary
+ if (origValue.equals(String.valueOf(topology.getClusterId()))) {
+ return "primary";
+ }
+ return origValue;
+ }
});
//todo: john - this property should be removed
@@ -2387,7 +2445,7 @@ public class BlueprintConfigurationProcessor {
stormSiteMap.put("supervisor.childopts", new OptionalSingleHostTopologyUpdater("GANGLIA_SERVER"));
stormSiteMap.put("nimbus.childopts", new OptionalSingleHostTopologyUpdater("GANGLIA_SERVER"));
// Storm AMS integration
- stormSiteMap.put("metrics.reporter.register", new NonTopologyUpdater() {
+ stormSiteNonTopologyMap.put("metrics.reporter.register", new NonTopologyUpdater() {
@Override
public String updateForClusterCreate(String propertyName,
String origValue,
@@ -2420,7 +2478,7 @@ public class BlueprintConfigurationProcessor {
// KAFKA
kafkaBrokerMap.put("kafka.ganglia.metrics.host", new OptionalSingleHostTopologyUpdater("GANGLIA_SERVER"));
// KAFKA AMS integration
- kafkaBrokerMap.put("kafka.metrics.reporters", new NonTopologyUpdater() {
+ kafkaBrokerNonTopologyMap.put("kafka.metrics.reporters", new NonTopologyUpdater() {
@Override
public String updateForClusterCreate(String propertyName,
String origValue,
http://git-wip-us.apache.org/repos/asf/ambari/blob/75555dc5/ambari-server/src/test/java/org/apache/ambari/server/controller/internal/BlueprintConfigurationProcessorTest.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/test/java/org/apache/ambari/server/controller/internal/BlueprintConfigurationProcessorTest.java b/ambari-server/src/test/java/org/apache/ambari/server/controller/internal/BlueprintConfigurationProcessorTest.java
index 30351d8..68d5755 100644
--- a/ambari-server/src/test/java/org/apache/ambari/server/controller/internal/BlueprintConfigurationProcessorTest.java
+++ b/ambari-server/src/test/java/org/apache/ambari/server/controller/internal/BlueprintConfigurationProcessorTest.java
@@ -6662,6 +6662,78 @@ public class BlueprintConfigurationProcessorTest {
assertEquals(createHostAddress(expectedHostNameNamenode, expectedPortNamenode) + "/hawq_default", hawqSite.get("hawq_dfs_url"));
}
+ @Test
+ public void testDoUpdateForBlueprintExport_NonTopologyProperty__AtlasClusterName() throws Exception {
+ Map<String, Map<String, String>> properties = new HashMap<String, Map<String, String>>();
+
+ Configuration clusterConfig = new Configuration(properties, Collections.<String, Map<String, Map<String, String>>>emptyMap());
+
+ Collection<String> hgComponents = new HashSet<String>();
+ hgComponents.add("ATLAS_SERVER");
+ TestHostGroup group1 = new TestHostGroup("group1", hgComponents, Collections.singleton("testhost"));
+
+ Collection<TestHostGroup> hostGroups = new HashSet<TestHostGroup>();
+ hostGroups.add(group1);
+
+ ClusterTopology topology = createClusterTopology(bp, clusterConfig, hostGroups);
+ Long clusterId = topology.getClusterId();
+ Map<String, String> typeProps = new HashMap<String, String>();
+ typeProps.put("atlas.cluster.name", String.valueOf(clusterId));
+ properties.put("hive-site", typeProps);
+
+ BlueprintConfigurationProcessor configProcessor = new BlueprintConfigurationProcessor(topology);
+ configProcessor.doUpdateForBlueprintExport();
+
+ String updatedVal = properties.get("hive-site").get("atlas.cluster.name");
+ assertEquals("primary", updatedVal);
+ }
+
+ @Test
+ public void testDoUpdateForBlueprintExport_NonTopologyProperty() throws Exception {
+ String someString = "String.To.Represent.A.String.Value";
+ Map<String, Map<String, String>> properties = new HashMap<String, Map<String, String>>();
+
+ Configuration clusterConfig = new Configuration(properties, Collections.<String, Map<String, Map<String, String>>>emptyMap());
+
+ Collection<String> hgComponents = new HashSet<String>();
+ hgComponents.add("ATLAS_SERVER");
+ hgComponents.add("HIVE_SERVER");
+ hgComponents.add("KAFKA_BROKER");
+ hgComponents.add("NIMBUS");
+ TestHostGroup group1 = new TestHostGroup("group1", hgComponents, Collections.singleton("testhost"));
+
+ Collection<TestHostGroup> hostGroups = new HashSet<TestHostGroup>();
+ hostGroups.add(group1);
+
+ ClusterTopology topology = createClusterTopology(bp, clusterConfig, hostGroups);
+ Long clusterId = topology.getClusterId();
+
+ Map<String, String> hiveSiteProps = new HashMap<String, String>();
+ hiveSiteProps.put("atlas.cluster.name", String.valueOf(clusterId));
+ hiveSiteProps.put("hive.exec.post.hooks", someString);
+ properties.put("hive-site", hiveSiteProps);
+
+ Map<String, String> kafkaBrokerProps = new HashMap<String, String>();
+ kafkaBrokerProps.put("kafka.metrics.reporters", someString);
+ properties.put("kafka-broker", kafkaBrokerProps);
+
+ Map<String, String> stormSiteProps = new HashMap<String, String>();
+ stormSiteProps.put("metrics.reporter.register", someString);
+ properties.put("storm-site", stormSiteProps);
+
+ BlueprintConfigurationProcessor configProcessor = new BlueprintConfigurationProcessor(topology);
+ configProcessor.doUpdateForBlueprintExport();
+
+ String atlasClusterName = properties.get("hive-site").get("atlas.cluster.name");
+ String hiveExecPostHooks = properties.get("hive-site").get("hive.exec.post.hooks");
+ String kafkaMetricsReporters = properties.get("kafka-broker").get("kafka.metrics.reporters");
+ String metricsReporterRegister = properties.get("storm-site").get("metrics.reporter.register");
+ assertEquals("primary", atlasClusterName);
+ assertEquals(someString, hiveExecPostHooks);
+ assertEquals(someString, kafkaMetricsReporters);
+ assertEquals(someString, metricsReporterRegister);
+ }
+
private Map<String, AdvisedConfiguration> createAdvisedConfigMap() {
Map<String, AdvisedConfiguration> advMap = new HashMap<String, AdvisedConfiguration>();