You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by st...@apache.org on 2016/03/04 09:24:45 UTC

ambari git commit: AMBARI-14720. Exporting Blueprint doesn't have some configs. (Daniel Gergely via stoader)

Repository: ambari
Updated Branches:
  refs/heads/trunk 65724a9b9 -> 75555dc56


AMBARI-14720. Exporting Blueprint doesn't have some configs. (Daniel Gergely via stoader)


Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/75555dc5
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/75555dc5
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/75555dc5

Branch: refs/heads/trunk
Commit: 75555dc56810bc296294d44623ace1bac598511a
Parents: 65724a9
Author: Toader, Sebastian <st...@hortonworks.com>
Authored: Fri Mar 4 09:23:52 2016 +0100
Committer: Toader, Sebastian <st...@hortonworks.com>
Committed: Fri Mar 4 09:23:52 2016 +0100

----------------------------------------------------------------------
 .../BlueprintConfigurationProcessor.java        | 66 ++++++++++++++++--
 .../BlueprintConfigurationProcessorTest.java    | 72 ++++++++++++++++++++
 2 files changed, 134 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ambari/blob/75555dc5/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/BlueprintConfigurationProcessor.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/BlueprintConfigurationProcessor.java b/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/BlueprintConfigurationProcessor.java
index 2d9a851..f5e7578 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/BlueprintConfigurationProcessor.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/BlueprintConfigurationProcessor.java
@@ -101,6 +101,12 @@ public class BlueprintConfigurationProcessor {
       new HashMap<String, Map<String, PropertyUpdater>>();
 
   /**
+   * Non topology related updaters
+   */
+  private static Map<String, Map<String, PropertyUpdater>> nonTopologyUpdaters =
+      new HashMap<String, Map<String, PropertyUpdater>>();
+
+  /**
    * Updaters that preserve the original property value, functions
    * as a placeholder for DB-related properties that need to be
    * removed from export, but do not require an update during
@@ -360,6 +366,8 @@ public class BlueprintConfigurationProcessor {
 
       doMultiHostExportUpdate(multiHostTopologyUpdaters, configuration);
 
+      doNonTopologyUpdate(nonTopologyUpdaters, configuration);
+
       doRemovePropertyExport(removePropertyUpdaters, configuration);
 
       doFilterPriorToExport(configuration);
@@ -1109,6 +1117,28 @@ public class BlueprintConfigurationProcessor {
   }
 
   /**
+   * Update non topology related configuration properties for blueprint export.
+   *
+   * @param updaters       registered non topology updaters
+   * @param configuration  configuration being processed
+   */
+  private void doNonTopologyUpdate(Map<String, Map<String, PropertyUpdater>> updaters, Configuration configuration) {
+    Map<String, Map<String, String>> properties = configuration.getFullProperties();
+    for (Map.Entry<String, Map<String, PropertyUpdater>> entry : updaters.entrySet()) {
+      String type = entry.getKey();
+      for (String propertyName : entry.getValue().keySet()) {
+        NonTopologyUpdater npu = (NonTopologyUpdater) entry.getValue().get(propertyName);
+        Map<String, String> typeProperties = properties.get(type);
+
+        if (typeProperties != null && typeProperties.containsKey(propertyName)) {
+          String newValue = npu.updateForBlueprintExport(propertyName, typeProperties.get(propertyName), properties, clusterTopology);
+          configuration.setProperty(type, propertyName, newValue);
+        }
+      }
+    }
+  }
+
+  /**
    * Provides functionality to update a property value.
    */
   public interface PropertyUpdater {
@@ -2137,6 +2167,13 @@ public class BlueprintConfigurationProcessor {
                                                     ClusterTopology topology) {
       return Collections.emptyList();
     }
+
+    public String updateForBlueprintExport(String propertyName,
+                                           String origValue,
+                                           Map<String, Map<String, String>> properties,
+                                           ClusterTopology topology) {
+      return origValue;
+    }
   }
 
 
@@ -2149,6 +2186,7 @@ public class BlueprintConfigurationProcessor {
     allUpdaters.add(multiHostTopologyUpdaters);
     allUpdaters.add(dbHostTopologyUpdaters);
     allUpdaters.add(mPropertyUpdaters);
+    allUpdaters.add(nonTopologyUpdaters);
 
     Map<String, PropertyUpdater> hdfsSiteMap = new HashMap<String, PropertyUpdater>();
     Map<String, PropertyUpdater> mapredSiteMap = new HashMap<String, PropertyUpdater>();
@@ -2156,12 +2194,15 @@ public class BlueprintConfigurationProcessor {
     Map<String, PropertyUpdater> hbaseSiteMap = new HashMap<String, PropertyUpdater>();
     Map<String, PropertyUpdater> yarnSiteMap = new HashMap<String, PropertyUpdater>();
     Map<String, PropertyUpdater> hiveSiteMap = new HashMap<String, PropertyUpdater>();
+    Map<String, PropertyUpdater> hiveSiteNonTopologyMap = new HashMap<String, PropertyUpdater>();
     Map<String, PropertyUpdater> oozieSiteOriginalValueMap = new HashMap<String, PropertyUpdater>();
     Map<String, PropertyUpdater> oozieSiteMap = new HashMap<String, PropertyUpdater>();
     Map<String, PropertyUpdater> stormSiteMap = new HashMap<String, PropertyUpdater>();
+    Map<String, PropertyUpdater> stormSiteNonTopologyMap = new HashMap<String, PropertyUpdater>();
     Map<String, PropertyUpdater> accumuloSiteMap = new HashMap<String, PropertyUpdater>();
     Map<String, PropertyUpdater> falconStartupPropertiesMap = new HashMap<String, PropertyUpdater>();
     Map<String, PropertyUpdater> kafkaBrokerMap = new HashMap<String, PropertyUpdater>();
+    Map<String, PropertyUpdater> kafkaBrokerNonTopologyMap = new HashMap<String, PropertyUpdater>();
     Map<String, PropertyUpdater> atlasPropsMap = new HashMap<String, PropertyUpdater>();
     Map<String, PropertyUpdater> mapredEnvMap = new HashMap<String, PropertyUpdater>();
     Map<String, PropertyUpdater> hadoopEnvMap = new HashMap<String, PropertyUpdater>();
@@ -2229,6 +2270,10 @@ public class BlueprintConfigurationProcessor {
     removePropertyUpdaters.put("oozie-env", oozieEnvOriginalValueMap);
     removePropertyUpdaters.put("oozie-site", oozieSiteOriginalValueMap);
 
+    nonTopologyUpdaters.put("hive-site", hiveSiteNonTopologyMap);
+    nonTopologyUpdaters.put("kafka-broker", kafkaBrokerNonTopologyMap);
+    nonTopologyUpdaters.put("storm-site", stormSiteNonTopologyMap);
+
     //todo: Need to change updaters back to being static
     //todo: will need to pass ClusterTopology in as necessary
 
@@ -2293,7 +2338,7 @@ public class BlueprintConfigurationProcessor {
     multiHiveSiteMap.put("hive.cluster.delegation.token.store.zookeeper.connectString", new MultipleHostTopologyUpdater("ZOOKEEPER_SERVER"));
 
     // HIVE Atlas integration
-    hiveSiteMap.put("hive.exec.post.hooks", new NonTopologyUpdater() {
+    hiveSiteNonTopologyMap.put("hive.exec.post.hooks", new NonTopologyUpdater() {
       @Override
       public String updateForClusterCreate(String propertyName,
                                            String origValue,
@@ -2314,7 +2359,7 @@ public class BlueprintConfigurationProcessor {
     });
 
     //todo: john - this property should be moved to atlas configuration
-    hiveSiteMap.put("atlas.cluster.name", new NonTopologyUpdater() {
+    hiveSiteNonTopologyMap.put("atlas.cluster.name", new NonTopologyUpdater() {
       @Override
       public String updateForClusterCreate(String propertyName,
                                            String origValue,
@@ -2334,6 +2379,19 @@ public class BlueprintConfigurationProcessor {
           return origValue;
         }
       }
+
+      @Override
+      public String updateForBlueprintExport(String propertyName,
+                                            String origValue,
+                                            Map<String, Map<String, String>> properties,
+                                            ClusterTopology topology) {
+
+        // if the value is the cluster id, then update to primary
+        if (origValue.equals(String.valueOf(topology.getClusterId()))) {
+          return "primary";
+        }
+        return origValue;
+      }
     });
 
     //todo: john - this property should be removed
@@ -2387,7 +2445,7 @@ public class BlueprintConfigurationProcessor {
     stormSiteMap.put("supervisor.childopts", new OptionalSingleHostTopologyUpdater("GANGLIA_SERVER"));
     stormSiteMap.put("nimbus.childopts", new OptionalSingleHostTopologyUpdater("GANGLIA_SERVER"));
     // Storm AMS integration
-    stormSiteMap.put("metrics.reporter.register", new NonTopologyUpdater() {
+    stormSiteNonTopologyMap.put("metrics.reporter.register", new NonTopologyUpdater() {
       @Override
       public String updateForClusterCreate(String propertyName,
                                            String origValue,
@@ -2420,7 +2478,7 @@ public class BlueprintConfigurationProcessor {
     // KAFKA
     kafkaBrokerMap.put("kafka.ganglia.metrics.host", new OptionalSingleHostTopologyUpdater("GANGLIA_SERVER"));
     // KAFKA AMS integration
-    kafkaBrokerMap.put("kafka.metrics.reporters", new NonTopologyUpdater() {
+    kafkaBrokerNonTopologyMap.put("kafka.metrics.reporters", new NonTopologyUpdater() {
       @Override
       public String updateForClusterCreate(String propertyName,
                                            String origValue,

http://git-wip-us.apache.org/repos/asf/ambari/blob/75555dc5/ambari-server/src/test/java/org/apache/ambari/server/controller/internal/BlueprintConfigurationProcessorTest.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/test/java/org/apache/ambari/server/controller/internal/BlueprintConfigurationProcessorTest.java b/ambari-server/src/test/java/org/apache/ambari/server/controller/internal/BlueprintConfigurationProcessorTest.java
index 30351d8..68d5755 100644
--- a/ambari-server/src/test/java/org/apache/ambari/server/controller/internal/BlueprintConfigurationProcessorTest.java
+++ b/ambari-server/src/test/java/org/apache/ambari/server/controller/internal/BlueprintConfigurationProcessorTest.java
@@ -6662,6 +6662,78 @@ public class BlueprintConfigurationProcessorTest {
     assertEquals(createHostAddress(expectedHostNameNamenode, expectedPortNamenode) + "/hawq_default", hawqSite.get("hawq_dfs_url"));
   }
 
+  @Test
+  public void testDoUpdateForBlueprintExport_NonTopologyProperty__AtlasClusterName() throws Exception {
+    Map<String, Map<String, String>> properties = new HashMap<String, Map<String, String>>();
+
+    Configuration clusterConfig = new Configuration(properties, Collections.<String, Map<String, Map<String, String>>>emptyMap());
+
+    Collection<String> hgComponents = new HashSet<String>();
+    hgComponents.add("ATLAS_SERVER");
+    TestHostGroup group1 = new TestHostGroup("group1", hgComponents, Collections.singleton("testhost"));
+
+    Collection<TestHostGroup> hostGroups = new HashSet<TestHostGroup>();
+    hostGroups.add(group1);
+
+    ClusterTopology topology = createClusterTopology(bp, clusterConfig, hostGroups);
+    Long clusterId = topology.getClusterId();
+    Map<String, String> typeProps = new HashMap<String, String>();
+    typeProps.put("atlas.cluster.name", String.valueOf(clusterId));
+    properties.put("hive-site", typeProps);
+
+    BlueprintConfigurationProcessor configProcessor = new BlueprintConfigurationProcessor(topology);
+    configProcessor.doUpdateForBlueprintExport();
+
+    String updatedVal = properties.get("hive-site").get("atlas.cluster.name");
+    assertEquals("primary", updatedVal);
+  }
+
+  @Test
+  public void testDoUpdateForBlueprintExport_NonTopologyProperty() throws Exception {
+    String someString = "String.To.Represent.A.String.Value";
+    Map<String, Map<String, String>> properties = new HashMap<String, Map<String, String>>();
+
+    Configuration clusterConfig = new Configuration(properties, Collections.<String, Map<String, Map<String, String>>>emptyMap());
+
+    Collection<String> hgComponents = new HashSet<String>();
+    hgComponents.add("ATLAS_SERVER");
+    hgComponents.add("HIVE_SERVER");
+    hgComponents.add("KAFKA_BROKER");
+    hgComponents.add("NIMBUS");
+    TestHostGroup group1 = new TestHostGroup("group1", hgComponents, Collections.singleton("testhost"));
+
+    Collection<TestHostGroup> hostGroups = new HashSet<TestHostGroup>();
+    hostGroups.add(group1);
+
+    ClusterTopology topology = createClusterTopology(bp, clusterConfig, hostGroups);
+    Long clusterId = topology.getClusterId();
+
+    Map<String, String> hiveSiteProps = new HashMap<String, String>();
+    hiveSiteProps.put("atlas.cluster.name", String.valueOf(clusterId));
+    hiveSiteProps.put("hive.exec.post.hooks", someString);
+    properties.put("hive-site", hiveSiteProps);
+
+    Map<String, String> kafkaBrokerProps = new HashMap<String, String>();
+    kafkaBrokerProps.put("kafka.metrics.reporters", someString);
+    properties.put("kafka-broker", kafkaBrokerProps);
+
+    Map<String, String> stormSiteProps = new HashMap<String, String>();
+    stormSiteProps.put("metrics.reporter.register", someString);
+    properties.put("storm-site", stormSiteProps);
+
+    BlueprintConfigurationProcessor configProcessor = new BlueprintConfigurationProcessor(topology);
+    configProcessor.doUpdateForBlueprintExport();
+
+    String atlasClusterName = properties.get("hive-site").get("atlas.cluster.name");
+    String hiveExecPostHooks = properties.get("hive-site").get("hive.exec.post.hooks");
+    String kafkaMetricsReporters = properties.get("kafka-broker").get("kafka.metrics.reporters");
+    String metricsReporterRegister = properties.get("storm-site").get("metrics.reporter.register");
+    assertEquals("primary", atlasClusterName);
+    assertEquals(someString, hiveExecPostHooks);
+    assertEquals(someString, kafkaMetricsReporters);
+    assertEquals(someString, metricsReporterRegister);
+  }
+
 
   private Map<String, AdvisedConfiguration> createAdvisedConfigMap() {
     Map<String, AdvisedConfiguration> advMap = new HashMap<String, AdvisedConfiguration>();