You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by nc...@apache.org on 2015/11/18 17:51:16 UTC
[20/50] [abbrv] ambari git commit: AMBARI-13908 Kafka's metrics are
not displayed on cluster installed via blueprints (dsen)
AMBARI-13908 Kafka's metrics are not displayed on cluster installed via blueprints (dsen)
Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/ed2d58a6
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/ed2d58a6
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/ed2d58a6
Branch: refs/heads/branch-dev-patch-upgrade
Commit: ed2d58a6b051e4a465cc79bfece96dd8e7daf0e4
Parents: 69db84a
Author: Dmytro Sen <ds...@apache.org>
Authored: Tue Nov 17 19:07:18 2015 +0200
Committer: Dmytro Sen <ds...@apache.org>
Committed: Tue Nov 17 19:07:18 2015 +0200
----------------------------------------------------------------------
.../BlueprintConfigurationProcessor.java | 37 +++++
.../0.9.1.2.1/configuration/storm-site.xml | 7 +
.../stacks/HDP/2.0.6/services/stack_advisor.py | 22 +++
.../stacks/HDP/2.2/services/stack_advisor.py | 2 +-
.../BlueprintConfigurationProcessorTest.java | 134 +++++++++++++++++++
.../stacks/2.0.6/common/test_stack_advisor.py | 88 ++++++++++++
6 files changed, 289 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ambari/blob/ed2d58a6/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/BlueprintConfigurationProcessor.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/BlueprintConfigurationProcessor.java b/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/BlueprintConfigurationProcessor.java
index 9e2bf85..0138894 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/BlueprintConfigurationProcessor.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/BlueprintConfigurationProcessor.java
@@ -2238,6 +2238,24 @@ public class BlueprintConfigurationProcessor {
stormSiteMap.put("worker.childopts", new OptionalSingleHostTopologyUpdater("GANGLIA_SERVER"));
stormSiteMap.put("supervisor.childopts", new OptionalSingleHostTopologyUpdater("GANGLIA_SERVER"));
stormSiteMap.put("nimbus.childopts", new OptionalSingleHostTopologyUpdater("GANGLIA_SERVER"));
+ // Storm AMS integration
+ stormSiteMap.put("metrics.reporter.register", new NonTopologyUpdater() {
+ @Override
+ public String updateForClusterCreate(String propertyName,
+ String origValue,
+ Map<String, Map<String, String>> properties,
+ ClusterTopology topology) {
+
+ if (topology.getBlueprint().getServices().contains("AMBARI_METRICS")) {
+ final String amsReporterClass = "org.apache.hadoop.metrics2.sink.storm.StormTimelineMetricsReporter";
+ if (origValue == null || origValue.isEmpty()) {
+ return amsReporterClass;
+ }
+ }
+ return origValue;
+ }
+ });
+
multiStormSiteMap.put("supervisor_hosts",
new YamlMultiValuePropertyDecorator(new MultipleHostTopologyUpdater("SUPERVISOR")));
multiStormSiteMap.put("storm.zookeeper.servers",
@@ -2253,6 +2271,25 @@ public class BlueprintConfigurationProcessor {
// KAFKA
kafkaBrokerMap.put("kafka.ganglia.metrics.host", new OptionalSingleHostTopologyUpdater("GANGLIA_SERVER"));
+ // KAFKA AMS integration
+ kafkaBrokerMap.put("kafka.metrics.reporters", new NonTopologyUpdater() {
+ @Override
+ public String updateForClusterCreate(String propertyName,
+ String origValue,
+ Map<String, Map<String, String>> properties,
+ ClusterTopology topology) {
+
+ if (topology.getBlueprint().getServices().contains("AMBARI_METRICS")) {
+ final String amsReportesClass = "org.apache.hadoop.metrics2.sink.kafka.KafkaTimelineMetricsReporter";
+ if (origValue == null || origValue.isEmpty()) {
+ return amsReportesClass;
+ } else if (!origValue.contains(amsReportesClass)) {
+ return String.format("%s,%s", origValue, amsReportesClass);
+ }
+ }
+ return origValue;
+ }
+ });
// KNOX
multiCoreSiteMap.put("hadoop.proxyuser.knox.hosts", new MultipleHostTopologyUpdater("KNOX_GATEWAY"));
http://git-wip-us.apache.org/repos/asf/ambari/blob/ed2d58a6/ambari-server/src/main/resources/common-services/STORM/0.9.1.2.1/configuration/storm-site.xml
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/common-services/STORM/0.9.1.2.1/configuration/storm-site.xml b/ambari-server/src/main/resources/common-services/STORM/0.9.1.2.1/configuration/storm-site.xml
index 1020f6a..3e9a90e 100644
--- a/ambari-server/src/main/resources/common-services/STORM/0.9.1.2.1/configuration/storm-site.xml
+++ b/ambari-server/src/main/resources/common-services/STORM/0.9.1.2.1/configuration/storm-site.xml
@@ -720,4 +720,11 @@
"storm dev-zookeeper". This zookeeper instance is only intended for development;
it is not a production grade zookeeper setup.</description>
</property>
+ <property>
+ <name>metrics.reporter.register</name>
+ <description>Topology metrics reporter.</description>
+ <value-attributes>
+ <empty-value-valid>true</empty-value-valid>
+ </value-attributes>
+ </property>
</configuration>
http://git-wip-us.apache.org/repos/asf/ambari/blob/ed2d58a6/ambari-server/src/main/resources/stacks/HDP/2.0.6/services/stack_advisor.py
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/stacks/HDP/2.0.6/services/stack_advisor.py b/ambari-server/src/main/resources/stacks/HDP/2.0.6/services/stack_advisor.py
index bd25f7d..935e88b 100644
--- a/ambari-server/src/main/resources/stacks/HDP/2.0.6/services/stack_advisor.py
+++ b/ambari-server/src/main/resources/stacks/HDP/2.0.6/services/stack_advisor.py
@@ -86,6 +86,7 @@ class HDP206StackAdvisor(DefaultStackAdvisor):
"MAPREDUCE2": self.recommendMapReduce2Configurations,
"HDFS": self.recommendHDFSConfigurations,
"HBASE": self.recommendHbaseConfigurations,
+ "STORM": self.recommendStormConfigurations,
"AMBARI_METRICS": self.recommendAmsConfigurations,
"RANGER": self.recommendRangerConfigurations
}
@@ -439,6 +440,13 @@ class HDP206StackAdvisor(DefaultStackAdvisor):
return round_to_n(collector_heapsize), round_to_n(hbase_heapsize), total_sinks_count
+ def recommendStormConfigurations(self, configurations, clusterData, services, hosts):
+ putStormSiteProperty = self.putProperty(configurations, "storm-site", services)
+ servicesList = [service["StackServices"]["service_name"] for service in services["services"]]
+ # Storm AMS integration
+ if 'AMBARI_METRICS' in servicesList:
+ putStormSiteProperty('metrics.reporter.register', 'org.apache.hadoop.metrics2.sink.storm.StormTimelineMetricsReporter')
+
def recommendAmsConfigurations(self, configurations, clusterData, services, hosts):
putAmsEnvProperty = self.putProperty(configurations, "ams-env", services)
putAmsHbaseSiteProperty = self.putProperty(configurations, "ams-hbase-site", services)
@@ -757,6 +765,7 @@ class HDP206StackAdvisor(DefaultStackAdvisor):
"MAPREDUCE2": {"mapred-site": self.validateMapReduce2Configurations},
"YARN": {"yarn-site": self.validateYARNConfigurations},
"HBASE": {"hbase-env": self.validateHbaseEnvConfigurations},
+ "STORM": {"storm-site": self.validateStormConfigurations},
"AMBARI_METRICS": {"ams-hbase-site": self.validateAmsHbaseSiteConfigurations,
"ams-hbase-env": self.validateAmsHbaseEnvConfigurations,
"ams-site": self.validateAmsSiteConfigurations}
@@ -889,6 +898,19 @@ class HDP206StackAdvisor(DefaultStackAdvisor):
return self.toConfigurationValidationProblems(validationItems, "ams-hbase-site")
+ def validateStormConfigurations(self, properties, recommendedDefaults, configurations, services, hosts):
+ validationItems = []
+ servicesList = [service["StackServices"]["service_name"] for service in services["services"]]
+ # Storm AMS integration
+ if 'AMBARI_METRICS' in servicesList and \
+ "org.apache.hadoop.metrics2.sink.storm.StormTimelineMetricsReporter" not in properties.get("metrics.reporter.register"):
+
+ validationItems.append({"config-name": 'metrics.reporter.register',
+ "item": self.getWarnItem(
+ "Should be set to org.apache.hadoop.metrics2.sink.storm.StormTimelineMetricsReporter to report the metrics to Ambari Metrics service.")})
+
+ return self.toConfigurationValidationProblems(validationItems, "storm-site")
+
def validateAmsHbaseEnvConfigurations(self, properties, recommendedDefaults, configurations, services, hosts):
regionServerItem = self.validatorLessThenDefaultValue(properties, recommendedDefaults, "hbase_regionserver_heapsize") ## FIXME if new service added
masterItem = self.validatorLessThenDefaultValue(properties, recommendedDefaults, "hbase_master_heapsize")
http://git-wip-us.apache.org/repos/asf/ambari/blob/ed2d58a6/ambari-server/src/main/resources/stacks/HDP/2.2/services/stack_advisor.py
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/stacks/HDP/2.2/services/stack_advisor.py b/ambari-server/src/main/resources/stacks/HDP/2.2/services/stack_advisor.py
index b51af8b..007abb8 100644
--- a/ambari-server/src/main/resources/stacks/HDP/2.2/services/stack_advisor.py
+++ b/ambari-server/src/main/resources/stacks/HDP/2.2/services/stack_advisor.py
@@ -784,8 +784,8 @@ class HDP22StackAdvisor(HDP21StackAdvisor):
putTezProperty("tez.tez-ui.history-url.base", tez_url)
pass
-
def recommendStormConfigurations(self, configurations, clusterData, services, hosts):
+ super(HDP22StackAdvisor, self).recommendStormConfigurations(configurations, clusterData, services, hosts)
putStormSiteProperty = self.putProperty(configurations, "storm-site", services)
putStormSiteAttributes = self.putPropertyAttribute(configurations, "storm-site")
storm_site = getServicesSiteProperties(services, "storm-site")
http://git-wip-us.apache.org/repos/asf/ambari/blob/ed2d58a6/ambari-server/src/test/java/org/apache/ambari/server/controller/internal/BlueprintConfigurationProcessorTest.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/test/java/org/apache/ambari/server/controller/internal/BlueprintConfigurationProcessorTest.java b/ambari-server/src/test/java/org/apache/ambari/server/controller/internal/BlueprintConfigurationProcessorTest.java
index 1e75dbf..6d2758a 100644
--- a/ambari-server/src/test/java/org/apache/ambari/server/controller/internal/BlueprintConfigurationProcessorTest.java
+++ b/ambari-server/src/test/java/org/apache/ambari/server/controller/internal/BlueprintConfigurationProcessorTest.java
@@ -153,6 +153,14 @@ public class BlueprintConfigurationProcessorTest {
atlasComponents.add("ATLAS_CLIENT");
serviceComponents.put("ATLAS", atlasComponents);
+ Collection<String> amsComponents = new HashSet<String>();
+ amsComponents.add("METRICS_COLLECTOR");
+ serviceComponents.put("AMBARI_METRICS", amsComponents);
+
+ Collection<String> stormComponents = new HashSet<String>();
+ stormComponents.add("NIMBUS");
+ serviceComponents.put("STORM", stormComponents);
+
for (Map.Entry<String, Collection<String>> entry : serviceComponents.entrySet()) {
String service = entry.getKey();
for (String component : entry.getValue()) {
@@ -5217,6 +5225,132 @@ public class BlueprintConfigurationProcessorTest {
}
@Test
+ public void testStormAmsPropertiesDefault() throws Exception {
+ Map<String, Map<String, String>> properties = new HashMap<String, Map<String, String>>();
+
+ Map<String, String> stormSite = new HashMap<String, String>();
+ //default
+ stormSite.put("metrics.reporter.register", "");
+ properties.put("storm-site", stormSite);
+
+ Map<String, Map<String, String>> parentProperties = new HashMap<String, Map<String, String>>();
+ Configuration parentClusterConfig = new Configuration(parentProperties,
+ Collections.<String, Map<String, Map<String, String>>>emptyMap());
+ Configuration clusterConfig = new Configuration(properties,
+ Collections.<String, Map<String, Map<String, String>>>emptyMap(), parentClusterConfig);
+
+ Collection<String> hgComponents1 = new HashSet<String>();
+ hgComponents1.add("METRICS_COLLECTOR");
+ hgComponents1.add("NIMBUS");
+ TestHostGroup group1 = new TestHostGroup("group1", hgComponents1, Collections.singleton("host1"));
+
+ Collection<TestHostGroup> hostGroups = Collections.singletonList(group1);
+
+ ClusterTopology topology = createClusterTopology(bp, clusterConfig, hostGroups);
+ BlueprintConfigurationProcessor configProcessor = new BlueprintConfigurationProcessor(topology);
+
+ configProcessor.doUpdateForClusterCreate();
+
+ assertEquals("org.apache.hadoop.metrics2.sink.storm.StormTimelineMetricsReporter",
+ clusterConfig.getPropertyValue("storm-site", "metrics.reporter.register"));
+ }
+
+ @Test
+ public void testStormAmsPropertiesUserDefinedReporter() throws Exception {
+ Map<String, Map<String, String>> properties = new HashMap<String, Map<String, String>>();
+
+ Map<String, String> stormSite = new HashMap<String, String>();
+ //default
+ stormSite.put("metrics.reporter.register", "user.Reporter");
+ properties.put("storm-site", stormSite);
+
+ Map<String, Map<String, String>> parentProperties = new HashMap<String, Map<String, String>>();
+ Configuration parentClusterConfig = new Configuration(parentProperties,
+ Collections.<String, Map<String, Map<String, String>>>emptyMap());
+ Configuration clusterConfig = new Configuration(properties,
+ Collections.<String, Map<String, Map<String, String>>>emptyMap(), parentClusterConfig);
+
+ Collection<String> hgComponents1 = new HashSet<String>();
+ hgComponents1.add("METRICS_COLLECTOR");
+ hgComponents1.add("NIMBUS");
+ TestHostGroup group1 = new TestHostGroup("group1", hgComponents1, Collections.singleton("host1"));
+
+ Collection<TestHostGroup> hostGroups = Collections.singletonList(group1);
+
+ ClusterTopology topology = createClusterTopology(bp, clusterConfig, hostGroups);
+ BlueprintConfigurationProcessor configProcessor = new BlueprintConfigurationProcessor(topology);
+
+ configProcessor.doUpdateForClusterCreate();
+
+ assertEquals("user.Reporter",
+ clusterConfig.getPropertyValue("storm-site", "metrics.reporter.register"));
+ }
+
+ @Test
+ public void testKafkaAmsProperties() throws Exception {
+ Map<String, Map<String, String>> properties = new HashMap<String, Map<String, String>>();
+
+ Map<String, String> stormSite = new HashMap<String, String>();
+ //default
+ stormSite.put("kafka.metrics.reporters", "");
+ properties.put("kafka-broker", stormSite);
+
+ Map<String, Map<String, String>> parentProperties = new HashMap<String, Map<String, String>>();
+ Configuration parentClusterConfig = new Configuration(parentProperties,
+ Collections.<String, Map<String, Map<String, String>>>emptyMap());
+ Configuration clusterConfig = new Configuration(properties,
+ Collections.<String, Map<String, Map<String, String>>>emptyMap(), parentClusterConfig);
+
+ Collection<String> hgComponents1 = new HashSet<String>();
+ hgComponents1.add("METRICS_COLLECTOR");
+ hgComponents1.add("KAFKA_BROKER");
+ TestHostGroup group1 = new TestHostGroup("group1", hgComponents1, Collections.singleton("host1"));
+
+ Collection<TestHostGroup> hostGroups = Collections.singletonList(group1);
+
+ ClusterTopology topology = createClusterTopology(bp, clusterConfig, hostGroups);
+ BlueprintConfigurationProcessor configProcessor = new BlueprintConfigurationProcessor(topology);
+
+ configProcessor.doUpdateForClusterCreate();
+
+ assertEquals("org.apache.hadoop.metrics2.sink.kafka.KafkaTimelineMetricsReporter",
+ clusterConfig.getPropertyValue("kafka-broker", "kafka.metrics.reporters"));
+
+ }
+
+ @Test
+ public void testKafkaAmsPropertiesMultipleReporters() throws Exception {
+ Map<String, Map<String, String>> properties = new HashMap<String, Map<String, String>>();
+
+ Map<String, String> stormSite = new HashMap<String, String>();
+ //default
+ stormSite.put("kafka.metrics.reporters", "user.Reporter");
+ properties.put("kafka-broker", stormSite);
+
+ Map<String, Map<String, String>> parentProperties = new HashMap<String, Map<String, String>>();
+ Configuration parentClusterConfig = new Configuration(parentProperties,
+ Collections.<String, Map<String, Map<String, String>>>emptyMap());
+ Configuration clusterConfig = new Configuration(properties,
+ Collections.<String, Map<String, Map<String, String>>>emptyMap(), parentClusterConfig);
+
+ Collection<String> hgComponents1 = new HashSet<String>();
+ hgComponents1.add("METRICS_COLLECTOR");
+ hgComponents1.add("KAFKA_BROKER");
+ TestHostGroup group1 = new TestHostGroup("group1", hgComponents1, Collections.singleton("host1"));
+
+ Collection<TestHostGroup> hostGroups = Collections.singletonList(group1);
+
+ ClusterTopology topology = createClusterTopology(bp, clusterConfig, hostGroups);
+ BlueprintConfigurationProcessor configProcessor = new BlueprintConfigurationProcessor(topology);
+
+ configProcessor.doUpdateForClusterCreate();
+
+ assertEquals("user.Reporter,org.apache.hadoop.metrics2.sink.kafka.KafkaTimelineMetricsReporter",
+ clusterConfig.getPropertyValue("kafka-broker", "kafka.metrics.reporters"));
+
+ }
+
+ @Test
public void testAtlasHiveProperties2() throws Exception {
Map<String, Map<String, String>> properties = new HashMap<String, Map<String, String>>();
Map<String, String> atlasProperties = new HashMap<String, String>();
http://git-wip-us.apache.org/repos/asf/ambari/blob/ed2d58a6/ambari-server/src/test/python/stacks/2.0.6/common/test_stack_advisor.py
----------------------------------------------------------------------
diff --git a/ambari-server/src/test/python/stacks/2.0.6/common/test_stack_advisor.py b/ambari-server/src/test/python/stacks/2.0.6/common/test_stack_advisor.py
index a28f764..456977c 100644
--- a/ambari-server/src/test/python/stacks/2.0.6/common/test_stack_advisor.py
+++ b/ambari-server/src/test/python/stacks/2.0.6/common/test_stack_advisor.py
@@ -465,6 +465,49 @@ class TestHDP206StackAdvisor(TestCase):
self.assertEquals(result, expected)
+ def test_recommendStormConfigurations(self):
+ # no AMS
+ configurations = {}
+ services = {
+ "services": [
+ ],
+ "configurations": configurations
+ }
+
+ expected = {
+ "storm-site": {
+ "properties": {
+ }
+ },
+ }
+
+ self.stackAdvisor.recommendStormConfigurations(configurations, None, services, None)
+ self.assertEquals(configurations, expected)
+
+ # with AMS
+ configurations = {}
+ services = {
+ "services": [
+ {
+ "StackServices": {
+ "service_name": "AMBARI_METRICS"
+ }
+ }
+ ],
+ "configurations": configurations
+ }
+
+ expected = {
+ "storm-site": {
+ "properties": {
+ "metrics.reporter.register": "org.apache.hadoop.metrics2.sink.storm.StormTimelineMetricsReporter"
+ }
+ },
+ }
+
+ self.stackAdvisor.recommendStormConfigurations(configurations, None, services, None)
+ self.assertEquals(configurations, expected)
+
def test_recommendYARNConfigurations(self):
configurations = {}
services = {"configurations": configurations}
@@ -1379,6 +1422,51 @@ class TestHDP206StackAdvisor(TestCase):
]
self.assertEquals(res, expected)
+ def test_validateStormSiteConfigurations(self):
+ configurations = {
+ "storm-site": {
+ "properties": {
+ 'metrics.reporter.register': "org.apache.hadoop.metrics2.sink.storm.StormTimelineMetricsReporter"
+ }
+ }
+ }
+
+ recommendedDefaults = {
+ 'metrics.reporter.register': 'org.apache.hadoop.metrics2.sink.storm.StormTimelineMetricsReporter',
+ }
+ properties = {
+ 'metrics.reporter.register': 'org.apache.hadoop.metrics2.sink.storm.StormTimelineMetricsReporter',
+ }
+
+ services = {
+ "services": [
+ {
+ "StackServices": {
+ "service_name": "AMBARI_METRICS"
+ }
+ }
+ ],
+ "configurations": configurations
+ }
+
+ # positive
+ res = self.stackAdvisor.validateStormConfigurations(properties, recommendedDefaults, configurations, services, None)
+ expected = []
+ self.assertEquals(res, expected)
+ properties['metrics.reporter.register'] = ''
+
+ res = self.stackAdvisor.validateStormConfigurations(properties, recommendedDefaults, configurations, services, None)
+ expected = [
+ {'config-name': 'metrics.reporter.register',
+ 'config-type': 'storm-site',
+ 'level': 'WARN',
+ 'message': 'Should be set to org.apache.hadoop.metrics2.sink.storm.StormTimelineMetricsReporter '
+ 'to report the metrics to Ambari Metrics service.',
+ 'type': 'configuration'
+ }
+ ]
+ self.assertEquals(res, expected)
+
def test_getHostsWithComponent(self):
services = {"services":
[{"StackServices":