You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by nc...@apache.org on 2015/11/18 17:51:16 UTC

[20/50] [abbrv] ambari git commit: AMBARI-13908 Kafka's metrics are not displayed on cluster installed via blueprints (dsen)

AMBARI-13908 Kafka's metrics are not displayed on cluster installed via blueprints (dsen)


Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/ed2d58a6
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/ed2d58a6
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/ed2d58a6

Branch: refs/heads/branch-dev-patch-upgrade
Commit: ed2d58a6b051e4a465cc79bfece96dd8e7daf0e4
Parents: 69db84a
Author: Dmytro Sen <ds...@apache.org>
Authored: Tue Nov 17 19:07:18 2015 +0200
Committer: Dmytro Sen <ds...@apache.org>
Committed: Tue Nov 17 19:07:18 2015 +0200

----------------------------------------------------------------------
 .../BlueprintConfigurationProcessor.java        |  37 +++++
 .../0.9.1.2.1/configuration/storm-site.xml      |   7 +
 .../stacks/HDP/2.0.6/services/stack_advisor.py  |  22 +++
 .../stacks/HDP/2.2/services/stack_advisor.py    |   2 +-
 .../BlueprintConfigurationProcessorTest.java    | 134 +++++++++++++++++++
 .../stacks/2.0.6/common/test_stack_advisor.py   |  88 ++++++++++++
 6 files changed, 289 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ambari/blob/ed2d58a6/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/BlueprintConfigurationProcessor.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/BlueprintConfigurationProcessor.java b/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/BlueprintConfigurationProcessor.java
index 9e2bf85..0138894 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/BlueprintConfigurationProcessor.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/BlueprintConfigurationProcessor.java
@@ -2238,6 +2238,24 @@ public class BlueprintConfigurationProcessor {
     stormSiteMap.put("worker.childopts", new OptionalSingleHostTopologyUpdater("GANGLIA_SERVER"));
     stormSiteMap.put("supervisor.childopts", new OptionalSingleHostTopologyUpdater("GANGLIA_SERVER"));
     stormSiteMap.put("nimbus.childopts", new OptionalSingleHostTopologyUpdater("GANGLIA_SERVER"));
+    // Storm AMS integration
+    stormSiteMap.put("metrics.reporter.register", new NonTopologyUpdater() {
+      @Override
+      public String updateForClusterCreate(String propertyName,
+                                           String origValue,
+                                           Map<String, Map<String, String>> properties,
+                                           ClusterTopology topology) {
+
+        if (topology.getBlueprint().getServices().contains("AMBARI_METRICS")) {
+          final String amsReporterClass = "org.apache.hadoop.metrics2.sink.storm.StormTimelineMetricsReporter";
+          if (origValue == null || origValue.isEmpty()) {
+            return amsReporterClass;
+          }
+        }
+        return origValue;
+      }
+    });
+
     multiStormSiteMap.put("supervisor_hosts",
         new YamlMultiValuePropertyDecorator(new MultipleHostTopologyUpdater("SUPERVISOR")));
     multiStormSiteMap.put("storm.zookeeper.servers",
@@ -2253,6 +2271,25 @@ public class BlueprintConfigurationProcessor {
 
     // KAFKA
     kafkaBrokerMap.put("kafka.ganglia.metrics.host", new OptionalSingleHostTopologyUpdater("GANGLIA_SERVER"));
+    // KAFKA AMS integration
+    kafkaBrokerMap.put("kafka.metrics.reporters", new NonTopologyUpdater() {
+      @Override
+      public String updateForClusterCreate(String propertyName,
+                                           String origValue,
+                                           Map<String, Map<String, String>> properties,
+                                           ClusterTopology topology) {
+
+        if (topology.getBlueprint().getServices().contains("AMBARI_METRICS")) {
+          final String amsReportesClass = "org.apache.hadoop.metrics2.sink.kafka.KafkaTimelineMetricsReporter";
+          if (origValue == null || origValue.isEmpty()) {
+            return amsReportesClass;
+          } else if (!origValue.contains(amsReportesClass)) {
+            return String.format("%s,%s", origValue, amsReportesClass);
+          }
+        }
+        return origValue;
+      }
+    });
 
     // KNOX
     multiCoreSiteMap.put("hadoop.proxyuser.knox.hosts", new MultipleHostTopologyUpdater("KNOX_GATEWAY"));

http://git-wip-us.apache.org/repos/asf/ambari/blob/ed2d58a6/ambari-server/src/main/resources/common-services/STORM/0.9.1.2.1/configuration/storm-site.xml
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/common-services/STORM/0.9.1.2.1/configuration/storm-site.xml b/ambari-server/src/main/resources/common-services/STORM/0.9.1.2.1/configuration/storm-site.xml
index 1020f6a..3e9a90e 100644
--- a/ambari-server/src/main/resources/common-services/STORM/0.9.1.2.1/configuration/storm-site.xml
+++ b/ambari-server/src/main/resources/common-services/STORM/0.9.1.2.1/configuration/storm-site.xml
@@ -720,4 +720,11 @@
        "storm dev-zookeeper". This zookeeper instance is only intended for development;
        it is not a production grade zookeeper setup.</description>
   </property>
+  <property>
+    <name>metrics.reporter.register</name>
+    <description>Topology metrics reporter.</description>
+    <value-attributes>
+      <empty-value-valid>true</empty-value-valid>
+    </value-attributes>
+  </property>
 </configuration>

http://git-wip-us.apache.org/repos/asf/ambari/blob/ed2d58a6/ambari-server/src/main/resources/stacks/HDP/2.0.6/services/stack_advisor.py
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/stacks/HDP/2.0.6/services/stack_advisor.py b/ambari-server/src/main/resources/stacks/HDP/2.0.6/services/stack_advisor.py
index bd25f7d..935e88b 100644
--- a/ambari-server/src/main/resources/stacks/HDP/2.0.6/services/stack_advisor.py
+++ b/ambari-server/src/main/resources/stacks/HDP/2.0.6/services/stack_advisor.py
@@ -86,6 +86,7 @@ class HDP206StackAdvisor(DefaultStackAdvisor):
       "MAPREDUCE2": self.recommendMapReduce2Configurations,
       "HDFS": self.recommendHDFSConfigurations,
       "HBASE": self.recommendHbaseConfigurations,
+      "STORM": self.recommendStormConfigurations,
       "AMBARI_METRICS": self.recommendAmsConfigurations,
       "RANGER": self.recommendRangerConfigurations
     }
@@ -439,6 +440,13 @@ class HDP206StackAdvisor(DefaultStackAdvisor):
 
     return round_to_n(collector_heapsize), round_to_n(hbase_heapsize), total_sinks_count
 
+  def recommendStormConfigurations(self, configurations, clusterData, services, hosts):
+    putStormSiteProperty = self.putProperty(configurations, "storm-site", services)
+    servicesList = [service["StackServices"]["service_name"] for service in services["services"]]
+    # Storm AMS integration
+    if 'AMBARI_METRICS' in servicesList:
+      putStormSiteProperty('metrics.reporter.register', 'org.apache.hadoop.metrics2.sink.storm.StormTimelineMetricsReporter')
+
   def recommendAmsConfigurations(self, configurations, clusterData, services, hosts):
     putAmsEnvProperty = self.putProperty(configurations, "ams-env", services)
     putAmsHbaseSiteProperty = self.putProperty(configurations, "ams-hbase-site", services)
@@ -757,6 +765,7 @@ class HDP206StackAdvisor(DefaultStackAdvisor):
       "MAPREDUCE2": {"mapred-site": self.validateMapReduce2Configurations},
       "YARN": {"yarn-site": self.validateYARNConfigurations},
       "HBASE": {"hbase-env": self.validateHbaseEnvConfigurations},
+      "STORM": {"storm-site": self.validateStormConfigurations},
       "AMBARI_METRICS": {"ams-hbase-site": self.validateAmsHbaseSiteConfigurations,
               "ams-hbase-env": self.validateAmsHbaseEnvConfigurations,
               "ams-site": self.validateAmsSiteConfigurations}
@@ -889,6 +898,19 @@ class HDP206StackAdvisor(DefaultStackAdvisor):
 
     return self.toConfigurationValidationProblems(validationItems, "ams-hbase-site")
 
+  def validateStormConfigurations(self, properties, recommendedDefaults, configurations, services, hosts):
+    validationItems = []
+    servicesList = [service["StackServices"]["service_name"] for service in services["services"]]
+    # Storm AMS integration
+    if 'AMBARI_METRICS' in servicesList and \
+      "org.apache.hadoop.metrics2.sink.storm.StormTimelineMetricsReporter" not in properties.get("metrics.reporter.register"):
+
+      validationItems.append({"config-name": 'metrics.reporter.register',
+                              "item": self.getWarnItem(
+                                "Should be set to org.apache.hadoop.metrics2.sink.storm.StormTimelineMetricsReporter to report the metrics to Ambari Metrics service.")})
+
+    return self.toConfigurationValidationProblems(validationItems, "storm-site")
+
   def validateAmsHbaseEnvConfigurations(self, properties, recommendedDefaults, configurations, services, hosts):
     regionServerItem = self.validatorLessThenDefaultValue(properties, recommendedDefaults, "hbase_regionserver_heapsize") ## FIXME if new service added
     masterItem = self.validatorLessThenDefaultValue(properties, recommendedDefaults, "hbase_master_heapsize")

http://git-wip-us.apache.org/repos/asf/ambari/blob/ed2d58a6/ambari-server/src/main/resources/stacks/HDP/2.2/services/stack_advisor.py
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/stacks/HDP/2.2/services/stack_advisor.py b/ambari-server/src/main/resources/stacks/HDP/2.2/services/stack_advisor.py
index b51af8b..007abb8 100644
--- a/ambari-server/src/main/resources/stacks/HDP/2.2/services/stack_advisor.py
+++ b/ambari-server/src/main/resources/stacks/HDP/2.2/services/stack_advisor.py
@@ -784,8 +784,8 @@ class HDP22StackAdvisor(HDP21StackAdvisor):
       putTezProperty("tez.tez-ui.history-url.base", tez_url)
     pass
 
-
   def recommendStormConfigurations(self, configurations, clusterData, services, hosts):
+    super(HDP22StackAdvisor, self).recommendStormConfigurations(configurations, clusterData, services, hosts)
     putStormSiteProperty = self.putProperty(configurations, "storm-site", services)
     putStormSiteAttributes = self.putPropertyAttribute(configurations, "storm-site")
     storm_site = getServicesSiteProperties(services, "storm-site")

http://git-wip-us.apache.org/repos/asf/ambari/blob/ed2d58a6/ambari-server/src/test/java/org/apache/ambari/server/controller/internal/BlueprintConfigurationProcessorTest.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/test/java/org/apache/ambari/server/controller/internal/BlueprintConfigurationProcessorTest.java b/ambari-server/src/test/java/org/apache/ambari/server/controller/internal/BlueprintConfigurationProcessorTest.java
index 1e75dbf..6d2758a 100644
--- a/ambari-server/src/test/java/org/apache/ambari/server/controller/internal/BlueprintConfigurationProcessorTest.java
+++ b/ambari-server/src/test/java/org/apache/ambari/server/controller/internal/BlueprintConfigurationProcessorTest.java
@@ -153,6 +153,14 @@ public class BlueprintConfigurationProcessorTest {
     atlasComponents.add("ATLAS_CLIENT");
     serviceComponents.put("ATLAS", atlasComponents);
 
+    Collection<String> amsComponents = new HashSet<String>();
+    amsComponents.add("METRICS_COLLECTOR");
+    serviceComponents.put("AMBARI_METRICS", amsComponents);
+
+    Collection<String> stormComponents = new HashSet<String>();
+    stormComponents.add("NIMBUS");
+    serviceComponents.put("STORM", stormComponents);
+
     for (Map.Entry<String, Collection<String>> entry : serviceComponents.entrySet()) {
       String service = entry.getKey();
       for (String component : entry.getValue()) {
@@ -5217,6 +5225,132 @@ public class BlueprintConfigurationProcessorTest {
   }
 
   @Test
+  public void testStormAmsPropertiesDefault() throws Exception {
+    Map<String, Map<String, String>> properties = new HashMap<String, Map<String, String>>();
+
+    Map<String, String> stormSite = new HashMap<String, String>();
+    //default
+    stormSite.put("metrics.reporter.register", "");
+    properties.put("storm-site", stormSite);
+
+    Map<String, Map<String, String>> parentProperties = new HashMap<String, Map<String, String>>();
+    Configuration parentClusterConfig = new Configuration(parentProperties,
+        Collections.<String, Map<String, Map<String, String>>>emptyMap());
+    Configuration clusterConfig = new Configuration(properties,
+        Collections.<String, Map<String, Map<String, String>>>emptyMap(), parentClusterConfig);
+
+    Collection<String> hgComponents1 = new HashSet<String>();
+    hgComponents1.add("METRICS_COLLECTOR");
+    hgComponents1.add("NIMBUS");
+    TestHostGroup group1 = new TestHostGroup("group1", hgComponents1, Collections.singleton("host1"));
+
+    Collection<TestHostGroup> hostGroups = Collections.singletonList(group1);
+
+    ClusterTopology topology = createClusterTopology(bp, clusterConfig, hostGroups);
+    BlueprintConfigurationProcessor configProcessor = new BlueprintConfigurationProcessor(topology);
+
+    configProcessor.doUpdateForClusterCreate();
+
+    assertEquals("org.apache.hadoop.metrics2.sink.storm.StormTimelineMetricsReporter",
+      clusterConfig.getPropertyValue("storm-site", "metrics.reporter.register"));
+  }
+
+  @Test
+  public void testStormAmsPropertiesUserDefinedReporter() throws Exception {
+    Map<String, Map<String, String>> properties = new HashMap<String, Map<String, String>>();
+
+    Map<String, String> stormSite = new HashMap<String, String>();
+    //default
+    stormSite.put("metrics.reporter.register", "user.Reporter");
+    properties.put("storm-site", stormSite);
+
+    Map<String, Map<String, String>> parentProperties = new HashMap<String, Map<String, String>>();
+    Configuration parentClusterConfig = new Configuration(parentProperties,
+        Collections.<String, Map<String, Map<String, String>>>emptyMap());
+    Configuration clusterConfig = new Configuration(properties,
+        Collections.<String, Map<String, Map<String, String>>>emptyMap(), parentClusterConfig);
+
+    Collection<String> hgComponents1 = new HashSet<String>();
+    hgComponents1.add("METRICS_COLLECTOR");
+    hgComponents1.add("NIMBUS");
+    TestHostGroup group1 = new TestHostGroup("group1", hgComponents1, Collections.singleton("host1"));
+
+    Collection<TestHostGroup> hostGroups = Collections.singletonList(group1);
+
+    ClusterTopology topology = createClusterTopology(bp, clusterConfig, hostGroups);
+    BlueprintConfigurationProcessor configProcessor = new BlueprintConfigurationProcessor(topology);
+
+    configProcessor.doUpdateForClusterCreate();
+
+    assertEquals("user.Reporter",
+      clusterConfig.getPropertyValue("storm-site", "metrics.reporter.register"));
+  }
+
+  @Test
+  public void testKafkaAmsProperties() throws Exception {
+    Map<String, Map<String, String>> properties = new HashMap<String, Map<String, String>>();
+
+    Map<String, String> stormSite = new HashMap<String, String>();
+    //default
+    stormSite.put("kafka.metrics.reporters", "");
+    properties.put("kafka-broker", stormSite);
+
+    Map<String, Map<String, String>> parentProperties = new HashMap<String, Map<String, String>>();
+    Configuration parentClusterConfig = new Configuration(parentProperties,
+      Collections.<String, Map<String, Map<String, String>>>emptyMap());
+    Configuration clusterConfig = new Configuration(properties,
+      Collections.<String, Map<String, Map<String, String>>>emptyMap(), parentClusterConfig);
+
+    Collection<String> hgComponents1 = new HashSet<String>();
+    hgComponents1.add("METRICS_COLLECTOR");
+    hgComponents1.add("KAFKA_BROKER");
+    TestHostGroup group1 = new TestHostGroup("group1", hgComponents1, Collections.singleton("host1"));
+
+    Collection<TestHostGroup> hostGroups = Collections.singletonList(group1);
+
+    ClusterTopology topology = createClusterTopology(bp, clusterConfig, hostGroups);
+    BlueprintConfigurationProcessor configProcessor = new BlueprintConfigurationProcessor(topology);
+
+    configProcessor.doUpdateForClusterCreate();
+
+    assertEquals("org.apache.hadoop.metrics2.sink.kafka.KafkaTimelineMetricsReporter",
+      clusterConfig.getPropertyValue("kafka-broker", "kafka.metrics.reporters"));
+
+  }
+
+  @Test
+  public void testKafkaAmsPropertiesMultipleReporters() throws Exception {
+    Map<String, Map<String, String>> properties = new HashMap<String, Map<String, String>>();
+
+    Map<String, String> stormSite = new HashMap<String, String>();
+    //default
+    stormSite.put("kafka.metrics.reporters", "user.Reporter");
+    properties.put("kafka-broker", stormSite);
+
+    Map<String, Map<String, String>> parentProperties = new HashMap<String, Map<String, String>>();
+    Configuration parentClusterConfig = new Configuration(parentProperties,
+      Collections.<String, Map<String, Map<String, String>>>emptyMap());
+    Configuration clusterConfig = new Configuration(properties,
+      Collections.<String, Map<String, Map<String, String>>>emptyMap(), parentClusterConfig);
+
+    Collection<String> hgComponents1 = new HashSet<String>();
+    hgComponents1.add("METRICS_COLLECTOR");
+    hgComponents1.add("KAFKA_BROKER");
+    TestHostGroup group1 = new TestHostGroup("group1", hgComponents1, Collections.singleton("host1"));
+
+    Collection<TestHostGroup> hostGroups = Collections.singletonList(group1);
+
+    ClusterTopology topology = createClusterTopology(bp, clusterConfig, hostGroups);
+    BlueprintConfigurationProcessor configProcessor = new BlueprintConfigurationProcessor(topology);
+
+    configProcessor.doUpdateForClusterCreate();
+
+    assertEquals("user.Reporter,org.apache.hadoop.metrics2.sink.kafka.KafkaTimelineMetricsReporter",
+      clusterConfig.getPropertyValue("kafka-broker", "kafka.metrics.reporters"));
+
+  }
+
+  @Test
   public void testAtlasHiveProperties2() throws Exception {
     Map<String, Map<String, String>> properties = new HashMap<String, Map<String, String>>();
     Map<String, String> atlasProperties = new HashMap<String, String>();

http://git-wip-us.apache.org/repos/asf/ambari/blob/ed2d58a6/ambari-server/src/test/python/stacks/2.0.6/common/test_stack_advisor.py
----------------------------------------------------------------------
diff --git a/ambari-server/src/test/python/stacks/2.0.6/common/test_stack_advisor.py b/ambari-server/src/test/python/stacks/2.0.6/common/test_stack_advisor.py
index a28f764..456977c 100644
--- a/ambari-server/src/test/python/stacks/2.0.6/common/test_stack_advisor.py
+++ b/ambari-server/src/test/python/stacks/2.0.6/common/test_stack_advisor.py
@@ -465,6 +465,49 @@ class TestHDP206StackAdvisor(TestCase):
 
     self.assertEquals(result, expected)
 
+  def test_recommendStormConfigurations(self):
+    # no AMS
+    configurations = {}
+    services = {
+      "services":  [
+      ],
+      "configurations": configurations
+    }
+
+    expected = {
+      "storm-site": {
+        "properties": {
+        }
+      },
+    }
+
+    self.stackAdvisor.recommendStormConfigurations(configurations, None, services, None)
+    self.assertEquals(configurations, expected)
+
+    # with AMS
+    configurations = {}
+    services = {
+      "services":  [
+        {
+          "StackServices": {
+            "service_name": "AMBARI_METRICS"
+          }
+        }
+      ],
+      "configurations": configurations
+    }
+
+    expected = {
+      "storm-site": {
+        "properties": {
+          "metrics.reporter.register": "org.apache.hadoop.metrics2.sink.storm.StormTimelineMetricsReporter"
+        }
+      },
+    }
+
+    self.stackAdvisor.recommendStormConfigurations(configurations, None, services, None)
+    self.assertEquals(configurations, expected)
+
   def test_recommendYARNConfigurations(self):
     configurations = {}
     services = {"configurations": configurations}
@@ -1379,6 +1422,51 @@ class TestHDP206StackAdvisor(TestCase):
     ]
     self.assertEquals(res, expected)
 
+  def test_validateStormSiteConfigurations(self):
+    configurations = {
+      "storm-site": {
+        "properties": {
+          'metrics.reporter.register': "org.apache.hadoop.metrics2.sink.storm.StormTimelineMetricsReporter"
+        }
+      }
+    }
+
+    recommendedDefaults = {
+      'metrics.reporter.register': 'org.apache.hadoop.metrics2.sink.storm.StormTimelineMetricsReporter',
+    }
+    properties = {
+      'metrics.reporter.register': 'org.apache.hadoop.metrics2.sink.storm.StormTimelineMetricsReporter',
+    }
+
+    services = {
+      "services":  [
+        {
+          "StackServices": {
+            "service_name": "AMBARI_METRICS"
+          }
+        }
+      ],
+      "configurations": configurations
+    }
+
+    # positive
+    res = self.stackAdvisor.validateStormConfigurations(properties, recommendedDefaults, configurations, services, None)
+    expected = []
+    self.assertEquals(res, expected)
+    properties['metrics.reporter.register'] = ''
+
+    res = self.stackAdvisor.validateStormConfigurations(properties, recommendedDefaults, configurations, services, None)
+    expected = [
+      {'config-name': 'metrics.reporter.register',
+       'config-type': 'storm-site',
+       'level': 'WARN',
+       'message': 'Should be set to org.apache.hadoop.metrics2.sink.storm.StormTimelineMetricsReporter '
+                  'to report the metrics to Ambari Metrics service.',
+       'type': 'configuration'
+      }
+    ]
+    self.assertEquals(res, expected)
+
   def test_getHostsWithComponent(self):
     services = {"services":
                   [{"StackServices":