You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by sw...@apache.org on 2016/03/04 20:41:54 UTC
ambari git commit: Capture HDFS metrics per RPC port number in AMS
and Grafana. (swagle)
Repository: ambari
Updated Branches:
refs/heads/branch-2.2 bda06cfbf -> 75d284ab1
Capture HDFS metrics per RPC port number in AMS and Grafana. (swagle)
Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/75d284ab
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/75d284ab
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/75d284ab
Branch: refs/heads/branch-2.2
Commit: 75d284ab195fc18c87c55680e39c46337db0d5c0
Parents: bda06cf
Author: Siddharth Wagle <sw...@hortonworks.com>
Authored: Fri Mar 4 11:41:38 2016 -0800
Committer: Siddharth Wagle <sw...@hortonworks.com>
Committed: Fri Mar 4 11:41:44 2016 -0800
----------------------------------------------------------------------
.../timeline/AbstractTimelineMetricsSink.java | 6 +-
.../timeline/HadoopTimelineMetricsSink.java | 112 +++++++----
.../timeline/HadoopTimelineMetricsSinkTest.java | 184 +++++++++++++++++--
.../server/api/services/AmbariMetaInfo.java | 96 ++++++----
.../metrics/timeline/AMSPropertyProvider.java | 27 ++-
.../controller/utilities/PropertyHelper.java | 77 ++++++++
.../server/upgrade/UpgradeCatalog222.java | 90 +++++++++
.../common-services/HDFS/2.1.0.2.0/widgets.json | 58 ++++--
.../2.0.6/hooks/before-START/scripts/params.py | 58 ++++++
.../templates/hadoop-metrics2.properties.j2 | 11 ++
.../StackArtifactResourceProviderTest.java | 50 ++++-
.../utilities/PropertyHelperTest.java | 14 ++
.../server/upgrade/UpgradeCatalog222Test.java | 89 +++++++--
13 files changed, 757 insertions(+), 115 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ambari/blob/75d284ab/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/AbstractTimelineMetricsSink.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/AbstractTimelineMetricsSink.java b/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/AbstractTimelineMetricsSink.java
index b2810b7..28d3b9c 100644
--- a/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/AbstractTimelineMetricsSink.java
+++ b/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/AbstractTimelineMetricsSink.java
@@ -47,6 +47,9 @@ public abstract class AbstractTimelineMetricsSink {
public static final String COLLECTOR_PROPERTY = "collector";
public static final int DEFAULT_POST_TIMEOUT_SECONDS = 10;
public static final String SKIP_COUNTER_TRANSFROMATION = "skipCounterDerivative";
+ public static final String RPC_METRIC_PREFIX = "metric.rpc";
+ public static final String RPC_METRIC_NAME_SUFFIX = "suffix";
+ public static final String RPC_METRIC_PORT_SUFFIX = "port";
public static final String WS_V1_TIMELINE_METRICS = "/ws/v1/timeline/metrics";
@@ -78,8 +81,7 @@ public abstract class AbstractTimelineMetricsSink {
HttpURLConnection connection = null;
try {
if (connectUrl == null) {
- throw new IOException("Unknown URL. " +
- "Unable to connect to metrics collector.");
+ throw new IOException("Unknown URL. Unable to connect to metrics collector.");
}
String jsonData = mapper.writeValueAsString(metrics);
connection = connectUrl.startsWith("https") ?
http://git-wip-us.apache.org/repos/asf/ambari/blob/75d284ab/ambari-metrics/ambari-metrics-hadoop-sink/src/main/java/org/apache/hadoop/metrics2/sink/timeline/HadoopTimelineMetricsSink.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-hadoop-sink/src/main/java/org/apache/hadoop/metrics2/sink/timeline/HadoopTimelineMetricsSink.java b/ambari-metrics/ambari-metrics-hadoop-sink/src/main/java/org/apache/hadoop/metrics2/sink/timeline/HadoopTimelineMetricsSink.java
index 6da9257..db8791f 100644
--- a/ambari-metrics/ambari-metrics-hadoop-sink/src/main/java/org/apache/hadoop/metrics2/sink/timeline/HadoopTimelineMetricsSink.java
+++ b/ambari-metrics/ambari-metrics-hadoop-sink/src/main/java/org/apache/hadoop/metrics2/sink/timeline/HadoopTimelineMetricsSink.java
@@ -17,31 +17,31 @@
*/
package org.apache.hadoop.metrics2.sink.timeline;
-import java.net.SocketAddress;
-import java.net.UnknownHostException;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
import org.apache.commons.configuration.SubsetConfiguration;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.metrics2.AbstractMetric;
+import org.apache.hadoop.metrics2.MetricType;
import org.apache.hadoop.metrics2.MetricsRecord;
import org.apache.hadoop.metrics2.MetricsSink;
import org.apache.hadoop.metrics2.MetricsTag;
-import org.apache.hadoop.metrics2.MetricType;
import org.apache.hadoop.metrics2.impl.MsInfo;
import org.apache.hadoop.metrics2.sink.timeline.cache.TimelineMetricsCache;
import org.apache.hadoop.metrics2.util.Servers;
import org.apache.hadoop.net.DNS;
+import java.net.SocketAddress;
+import java.net.UnknownHostException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
@InterfaceAudience.Public
@InterfaceStability.Evolving
public class HadoopTimelineMetricsSink extends AbstractTimelineMetricsSink implements MetricsSink {
@@ -54,9 +54,13 @@ public class HadoopTimelineMetricsSink extends AbstractTimelineMetricsSink imple
private static final String SERVICE_NAME_PREFIX = "serviceName-prefix";
private static final String SERVICE_NAME = "serviceName";
private int timeoutSeconds = 10;
+ private SubsetConfiguration conf;
+ // Cache the rpc port used and the suffix to use if the port tag is found
+ private Map<String, String> rpcPortSuffixes = new HashMap<>(10);
@Override
public void init(SubsetConfiguration conf) {
+ this.conf = conf;
LOG.info("Initializing Timeline metrics sink.");
// Take the hostname from the DNS class.
@@ -83,8 +87,7 @@ public class HadoopTimelineMetricsSink extends AbstractTimelineMetricsSink imple
if (metricsServers == null || metricsServers.isEmpty()) {
LOG.error("No Metric collector configured.");
} else {
- collectorUri = conf.getString(COLLECTOR_PROPERTY).trim()
- + WS_V1_TIMELINE_METRICS;
+ collectorUri = conf.getString(COLLECTOR_PROPERTY).trim() + WS_V1_TIMELINE_METRICS;
if (collectorUri.toLowerCase().startsWith("https://")) {
String trustStorePath = conf.getString(SSL_KEYSTORE_PATH_PROPERTY).trim();
String trustStoreType = conf.getString(SSL_KEYSTORE_TYPE_PROPERTY).trim();
@@ -109,27 +112,40 @@ public class HadoopTimelineMetricsSink extends AbstractTimelineMetricsSink imple
Iterator<String> it = (Iterator<String>) conf.getKeys();
while (it.hasNext()) {
String propertyName = it.next();
- if (propertyName != null && propertyName.startsWith(TAGS_FOR_PREFIX_PROPERTY_PREFIX)) {
- String contextName = propertyName.substring(TAGS_FOR_PREFIX_PROPERTY_PREFIX.length());
- String[] tags = conf.getStringArray(propertyName);
- boolean useAllTags = false;
- Set<String> set = null;
- if (tags.length > 0) {
- set = new HashSet<String>();
- for (String tag : tags) {
- tag = tag.trim();
- useAllTags |= tag.equals("*");
- if (tag.length() > 0) {
- set.add(tag);
+ if (propertyName != null) {
+ if (propertyName.startsWith(TAGS_FOR_PREFIX_PROPERTY_PREFIX)) {
+ String contextName = propertyName.substring(TAGS_FOR_PREFIX_PROPERTY_PREFIX.length());
+ String[] tags = conf.getStringArray(propertyName);
+ boolean useAllTags = false;
+ Set<String> set = null;
+ if (tags.length > 0) {
+ set = new HashSet<String>();
+ for (String tag : tags) {
+ tag = tag.trim();
+ useAllTags |= tag.equals("*");
+ if (tag.length() > 0) {
+ set.add(tag);
+ }
+ }
+ if (useAllTags) {
+ set = null;
}
}
- if (useAllTags) {
- set = null;
- }
+ useTagsMap.put(contextName, set);
+ }
+ // Customized RPC ports
+ if (propertyName.startsWith(RPC_METRIC_PREFIX)) {
+ // metric.rpc.client.port
+ int beginIdx = RPC_METRIC_PREFIX.length() + 1;
+ String suffixStr = propertyName.substring(beginIdx); // client.port
+ String configPrefix = suffixStr.substring(0, suffixStr.indexOf(".")); // client
+ rpcPortSuffixes.put(conf.getString(propertyName).trim(), configPrefix.trim());
}
- useTagsMap.put(contextName, set);
}
}
+ if (!rpcPortSuffixes.isEmpty()) {
+ LOG.info("RPC port properties configured: " + rpcPortSuffixes);
+ }
}
/**
@@ -172,14 +188,41 @@ public class HadoopTimelineMetricsSink extends AbstractTimelineMetricsSink imple
StringBuilder sb = new StringBuilder();
sb.append(contextName);
sb.append('.');
- sb.append(recordName);
+ // Similar to GangliaContext adding processName to distinguish jvm
+ // metrics for co-hosted daemons. We only do this for HBase since the
+ // appId is shared for Master and RS.
+ if (contextName.equals("jvm")) {
+ if (record.tags() != null) {
+ for (MetricsTag tag : record.tags()) {
+ if (tag.info().name().equalsIgnoreCase("processName") &&
+ (tag.value().equals("RegionServer") || tag.value().equals("Master"))) {
+ sb.append(tag.value());
+ sb.append('.');
+ }
+ }
+ }
+ }
+ sb.append(recordName);
appendPrefix(record, sb);
- sb.append(".");
+ sb.append('.');
+
+ // Add port tag for rpc metrics to distinguish rpc calls based on port
+ if (!rpcPortSuffixes.isEmpty() && contextName.contains("rpc")) {
+ if (record.tags() != null) {
+ for (MetricsTag tag : record.tags()) {
+ if (tag.info().name().equalsIgnoreCase("port") &&
+ rpcPortSuffixes.keySet().contains(tag.value())) {
+ sb.append(rpcPortSuffixes.get(tag.value()));
+ sb.append('.');
+ }
+ }
+ }
+ }
+
int sbBaseLen = sb.length();
- Collection<AbstractMetric> metrics =
- (Collection<AbstractMetric>) record.metrics();
+ Collection<AbstractMetric> metrics = (Collection<AbstractMetric>) record.metrics();
List<TimelineMetric> metricList = new ArrayList<TimelineMetric>();
long startTime = record.timestamp();
@@ -247,4 +290,5 @@ public class HadoopTimelineMetricsSink extends AbstractTimelineMetricsSink imple
public void flush() {
// TODO: Buffering implementation
}
+
}
http://git-wip-us.apache.org/repos/asf/ambari/blob/75d284ab/ambari-metrics/ambari-metrics-hadoop-sink/src/test/java/org/apache/hadoop/metrics2/sink/timeline/HadoopTimelineMetricsSinkTest.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-hadoop-sink/src/test/java/org/apache/hadoop/metrics2/sink/timeline/HadoopTimelineMetricsSinkTest.java b/ambari-metrics/ambari-metrics-hadoop-sink/src/test/java/org/apache/hadoop/metrics2/sink/timeline/HadoopTimelineMetricsSinkTest.java
index 528384e..4a5abcc 100644
--- a/ambari-metrics/ambari-metrics-hadoop-sink/src/test/java/org/apache/hadoop/metrics2/sink/timeline/HadoopTimelineMetricsSinkTest.java
+++ b/ambari-metrics/ambari-metrics-hadoop-sink/src/test/java/org/apache/hadoop/metrics2/sink/timeline/HadoopTimelineMetricsSinkTest.java
@@ -18,6 +18,34 @@
package org.apache.hadoop.metrics2.sink.timeline;
+import org.apache.commons.configuration.SubsetConfiguration;
+import org.apache.hadoop.metrics2.AbstractMetric;
+import org.apache.hadoop.metrics2.MetricType;
+import org.apache.hadoop.metrics2.MetricsInfo;
+import org.apache.hadoop.metrics2.MetricsRecord;
+import org.apache.hadoop.metrics2.MetricsTag;
+import org.apache.log4j.Level;
+import org.apache.log4j.Logger;
+import org.easymock.EasyMock;
+import org.easymock.IAnswer;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import java.io.OutputStream;
+import java.net.URL;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+
import static org.apache.hadoop.metrics2.sink.timeline.AbstractTimelineMetricsSink.COLLECTOR_PROPERTY;
import static org.apache.hadoop.metrics2.sink.timeline.AbstractTimelineMetricsSink.MAX_METRIC_ROW_CACHE_SIZE;
import static org.apache.hadoop.metrics2.sink.timeline.AbstractTimelineMetricsSink.METRICS_SEND_INTERVAL;
@@ -31,28 +59,14 @@ import static org.easymock.EasyMock.expectLastCall;
import static org.easymock.EasyMock.replay;
import static org.easymock.EasyMock.verify;
-import java.io.OutputStream;
-import java.net.URL;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Iterator;
-import java.util.List;
-
-import org.apache.commons.configuration.SubsetConfiguration;
-import org.apache.hadoop.metrics2.AbstractMetric;
-import org.apache.hadoop.metrics2.MetricType;
-import org.apache.hadoop.metrics2.MetricsRecord;
-import org.easymock.EasyMock;
-import org.easymock.IAnswer;
-import org.junit.Assert;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.powermock.core.classloader.annotations.PrepareForTest;
-import org.powermock.modules.junit4.PowerMockRunner;
-
@RunWith(PowerMockRunner.class)
public class HadoopTimelineMetricsSinkTest {
+ @Before
+ public void setup() {
+ Logger.getLogger("org.apache.hadoop.metrics2.sink.timeline").setLevel(Level.DEBUG);
+ }
+
@Test
@PrepareForTest({URL.class, OutputStream.class})
public void testPutMetrics() throws Exception {
@@ -241,5 +255,137 @@ public class HadoopTimelineMetricsSinkTest {
Assert.assertEquals(new Double(6.0), values.next());
}
+ @Test
+ public void testRPCPortSuffixHandledCorrectly() throws Exception {
+ HadoopTimelineMetricsSink sink =
+ createMockBuilder(HadoopTimelineMetricsSink.class)
+ .withConstructor().addMockedMethod("appendPrefix")
+ .addMockedMethod("emitMetrics").createNiceMock();
+
+ SubsetConfiguration conf = createNiceMock(SubsetConfiguration.class);
+ expect(conf.getString(eq("slave.host.name"))).andReturn("testhost").anyTimes();
+ expect(conf.getParent()).andReturn(null).anyTimes();
+ expect(conf.getPrefix()).andReturn("service").anyTimes();
+ expect(conf.getString(eq(COLLECTOR_PROPERTY))).andReturn("localhost:63188").anyTimes();
+ expect(conf.getString(eq("serviceName-prefix"), eq(""))).andReturn("").anyTimes();
+
+ expect(conf.getInt(eq(MAX_METRIC_ROW_CACHE_SIZE), anyInt())).andReturn(10).anyTimes();
+ expect(conf.getInt(eq(METRICS_SEND_INTERVAL), anyInt())).andReturn(10).anyTimes();
+
+ conf.setListDelimiter(eq(','));
+ expectLastCall().anyTimes();
+
+ Set<String> rpcPortSuffixes = new HashSet<String>() {{
+ add("metric.rpc.client.port");
+ add("metric.rpc.datanode.port");
+ add("metric.rpc.healthcheck.port");
+ }};
+
+ expect(conf.getKeys()).andReturn(rpcPortSuffixes.iterator());
+ expect(conf.getString("metric.rpc.client.port")).andReturn("8020");
+ expect(conf.getString("metric.rpc.datanode.port")).andReturn("8040");
+ expect(conf.getString("metric.rpc.healthcheck.port")).andReturn("8060");
+ AbstractMetric metric = createNiceMock(AbstractMetric.class);
+ expect(metric.name()).andReturn("rpc.metricName").anyTimes();
+ expect(metric.value()).andReturn(1.0).once();
+ expect(metric.value()).andReturn(2.0).once();
+ expect(metric.value()).andReturn(3.0).once();
+ expect(metric.value()).andReturn(4.0).once();
+ expect(metric.value()).andReturn(5.0).once();
+ expect(metric.value()).andReturn(6.0).once();
+
+ MetricsRecord record = createNiceMock(MetricsRecord.class);
+ expect(record.name()).andReturn("testMetric").anyTimes();
+ expect(record.context()).andReturn("rpc").anyTimes();
+ Collection<MetricsTag> tags1 = Collections.singletonList(
+ new MetricsTag(new MetricsInfo() {
+ @Override
+ public String name() {
+ return "port";
+ }
+
+ @Override
+ public String description() {
+ return null;
+ }
+ }, "8020")
+ );
+ Collection<MetricsTag> tags2 = Collections.singletonList(
+ new MetricsTag(new MetricsInfo() {
+ @Override
+ public String name() {
+ return "port";
+ }
+
+ @Override
+ public String description() {
+ return null;
+ }
+ }, "8040")
+ );
+ expect(record.tags()).andReturn(tags1).times(6);
+ expect(record.tags()).andReturn(tags2).times(6);
+
+ sink.appendPrefix(eq(record), (StringBuilder) anyObject());
+ expectLastCall().anyTimes().andStubAnswer(new IAnswer<Object>() {
+ @Override
+ public Object answer() throws Throwable {
+ return null;
+ }
+ });
+
+ final Long now = System.currentTimeMillis();
+ // TODO: Current implementation of cache needs > 1 elements to evict any
+ expect(record.timestamp()).andReturn(now).times(2);
+ expect(record.timestamp()).andReturn(now + 100l).times(2);
+ expect(record.timestamp()).andReturn(now + 200l).once();
+ expect(record.timestamp()).andReturn(now + 300l).once();
+
+ expect(record.metrics()).andReturn(Arrays.asList(metric)).anyTimes();
+
+ final List<TimelineMetrics> capturedMetrics = new ArrayList<TimelineMetrics>();
+ sink.emitMetrics((TimelineMetrics) anyObject());
+ expectLastCall().andStubAnswer(new IAnswer<Object>() {
+ @Override
+ public Object answer() throws Throwable {
+ capturedMetrics.add((TimelineMetrics) EasyMock.getCurrentArguments()[0]);
+ return null;
+ }
+ });
+
+ replay(conf, sink, record, metric);
+
+ sink.init(conf);
+
+ // time = t1
+ sink.putMetrics(record);
+ // time = t1
+ sink.putMetrics(record);
+ // time = t2
+ sink.putMetrics(record);
+ // Evict
+ // time = t2
+ sink.putMetrics(record);
+ // time = t3
+ sink.putMetrics(record);
+ // time = t4
+ sink.putMetrics(record);
+
+ verify(conf, sink, record, metric);
+
+ Assert.assertEquals(2, capturedMetrics.size());
+ Iterator<TimelineMetrics> metricsIterator = capturedMetrics.iterator();
+
+ // t1, t2
+ TimelineMetric timelineMetric1 = metricsIterator.next().getMetrics().get(0);
+ Assert.assertEquals(2, timelineMetric1.getMetricValues().size());
+ // Assert the tag added to the name
+ Assert.assertEquals("rpc.testMetric.client.rpc.metricName", timelineMetric1.getMetricName());
+ // t3, t4
+ TimelineMetric timelineMetric2 = metricsIterator.next().getMetrics().get(0);
+ Assert.assertEquals(2, timelineMetric2.getMetricValues().size());
+ // Assert the tag added to the name
+ Assert.assertEquals("rpc.testMetric.datanode.rpc.metricName", timelineMetric2.getMetricName());
+ }
}
http://git-wip-us.apache.org/repos/asf/ambari/blob/75d284ab/ambari-server/src/main/java/org/apache/ambari/server/api/services/AmbariMetaInfo.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/api/services/AmbariMetaInfo.java b/ambari-server/src/main/java/org/apache/ambari/server/api/services/AmbariMetaInfo.java
index e35e7ac..b242097 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/api/services/AmbariMetaInfo.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/api/services/AmbariMetaInfo.java
@@ -27,6 +27,7 @@ import org.apache.ambari.server.ParentObjectNotFoundException;
import org.apache.ambari.server.StackAccessException;
import org.apache.ambari.server.configuration.Configuration;
import org.apache.ambari.server.controller.RootServiceResponseFactory.Services;
+import org.apache.ambari.server.controller.utilities.PropertyHelper;
import org.apache.ambari.server.customactions.ActionDefinition;
import org.apache.ambari.server.customactions.ActionDefinitionManager;
import org.apache.ambari.server.events.AlertDefinitionDisabledEvent;
@@ -334,7 +335,7 @@ public class AmbariMetaInfo {
DependencyInfo foundDependency = null;
List<DependencyInfo> componentDependencies = getComponentDependencies(
- stackName, version, service, component);
+ stackName, version, service, component);
Iterator<DependencyInfo> iter = componentDependencies.iterator();
while (foundDependency == null && iter.hasNext()) {
DependencyInfo dependency = iter.next();
@@ -362,7 +363,7 @@ public class AmbariMetaInfo {
for (RepositoryInfo repo : repository) {
if (!reposResult.containsKey(repo.getOsType())) {
reposResult.put(repo.getOsType(),
- new ArrayList<RepositoryInfo>());
+ new ArrayList<RepositoryInfo>());
}
reposResult.get(repo.getOsType()).add(repo);
}
@@ -878,7 +879,7 @@ public class AmbariMetaInfo {
try {
map = gson.fromJson(new FileReader(svc.getMetricsFile()), type);
- svc.setMetrics(updateComponentMetricMapWithAggregateFunctionIds(map));
+ svc.setMetrics(processMetricDefinition(map));
} catch (Exception e) {
LOG.error ("Could not read the metrics file", e);
@@ -891,43 +892,52 @@ public class AmbariMetaInfo {
/**
* Add aggregate function support for all stack defined metrics.
+ *
+ * Refactor Namenode RPC metrics for different kinds of ports.
*/
- private Map<String, Map<String, List<MetricDefinition>>> updateComponentMetricMapWithAggregateFunctionIds(
- Map<String, Map<String, List<MetricDefinition>>> metricMap) {
+ private Map<String, Map<String, List<MetricDefinition>>> processMetricDefinition(
+ Map<String, Map<String, List<MetricDefinition>>> metricMap) {
if (!metricMap.isEmpty()) {
// For every Component
- for (Map<String, List<MetricDefinition>> componentMetricDef : metricMap.values()) {
+ for (Map.Entry<String, Map<String, List<MetricDefinition>>> componentMetricDefEntry : metricMap.entrySet()) {
+ String componentName = componentMetricDefEntry.getKey();
// For every Component / HostComponent category
- for (Map.Entry<String, List<MetricDefinition>> metricDefEntry : componentMetricDef.entrySet()) {
- // NOTE: Only Component aggregates supported for now.
- if (metricDefEntry.getKey().equals(Component.name())) {
- //For every metric definition
- for (MetricDefinition metricDefinition : metricDefEntry.getValue()) {
- // Metrics System metrics only
- if (metricDefinition.getType().equals("ganglia")) {
- // Create a new map for each category
- for (Map<String, Metric> metricByCategory : metricDefinition.getMetricsByCategory().values()) {
- Map<String, Metric> newMetrics = new HashMap<String, Metric>();
-
- // For every function id
- for (String identifierToAdd : AGGREGATE_FUNCTION_IDENTIFIERS) {
-
- for (Map.Entry<String, Metric> metricEntry : metricByCategory.entrySet()) {
- String newMetricKey = metricEntry.getKey() + identifierToAdd;
- Metric currentMetric = metricEntry.getValue();
- Metric newMetric = new Metric(
- currentMetric.getName() + identifierToAdd,
- currentMetric.isPointInTime(),
- currentMetric.isTemporal(),
- currentMetric.isAmsHostMetric(),
- currentMetric.getUnit()
- );
- newMetrics.put(newMetricKey, newMetric);
+ for (Map.Entry<String, List<MetricDefinition>> metricDefEntry : componentMetricDefEntry.getValue().entrySet()) {
+ //For every metric definition
+ for (MetricDefinition metricDefinition : metricDefEntry.getValue()) {
+ // Metrics System metrics only
+ if (metricDefinition.getType().equals("ganglia")) {
+ for (Map.Entry<String, Map<String, Metric>> metricByCategory : metricDefinition.getMetricsByCategory().entrySet()) {
+ String category = metricByCategory.getKey();
+ Iterator<Map.Entry<String, Metric>> iterator = metricByCategory.getValue().entrySet().iterator();
+ Map<String, Metric> newMetricsToAdd = new HashMap<>();
+
+ while (iterator.hasNext()) {
+ Map.Entry<String, Metric> metricEntry = iterator.next();
+ // Process Namenode rpc metrics
+ Map<String, Metric> replacementMetrics = PropertyHelper.processRpcMetricDefinition(
+ componentName, metricEntry.getKey(), metricEntry.getValue());
+ if (replacementMetrics != null) {
+ iterator.remove(); // Remove current metric entry
+ newMetricsToAdd.putAll(replacementMetrics);
+ // Add aggregate functions for replacement metrics
+ if (metricDefEntry.getKey().equals(Component.name())) {
+ for (Map.Entry<String, Metric> replacementMetric : replacementMetrics.entrySet()) {
+ newMetricsToAdd.putAll(getAggregateFunctionMetrics(replacementMetric.getKey(),
+ replacementMetric.getValue()));
+ }
+ }
+ } else {
+ // NOTE: Only Component aggregates supported for now.
+ if (metricDefEntry.getKey().equals(Component.name())) {
+ Map<String, Metric> aggregateFunctionMetrics =
+ getAggregateFunctionMetrics(metricEntry.getKey(), metricEntry.getValue());
+ newMetricsToAdd.putAll(aggregateFunctionMetrics);
}
}
- metricByCategory.putAll(newMetrics);
}
+ metricByCategory.getValue().putAll(newMetricsToAdd);
}
}
}
@@ -938,6 +948,24 @@ public class AmbariMetaInfo {
return metricMap;
}
+ private Map<String, Metric> getAggregateFunctionMetrics(String metricName, Metric currentMetric) {
+ Map<String, Metric> newMetrics = new HashMap<String, Metric>();
+ // For every function id
+ for (String identifierToAdd : AGGREGATE_FUNCTION_IDENTIFIERS) {
+ String newMetricKey = metricName + identifierToAdd;
+ Metric newMetric = new Metric(
+ currentMetric.getName() + identifierToAdd,
+ currentMetric.isPointInTime(),
+ currentMetric.isTemporal(),
+ currentMetric.isAmsHostMetric(),
+ currentMetric.getUnit()
+ );
+ newMetrics.put(newMetricKey, newMetric);
+ }
+
+ return newMetrics;
+ }
+
/**
* Gets the metrics for a Role (component).
* @return the list of defined metrics.
@@ -946,8 +974,8 @@ public class AmbariMetaInfo {
String serviceName, String componentName, String metricType)
throws AmbariException {
- Map<String, Map<String, List<MetricDefinition>>> map =
- getServiceMetrics(stackName, stackVersion, serviceName);
+ Map<String, Map<String, List<MetricDefinition>>> map = getServiceMetrics(
+ stackName, stackVersion, serviceName);
if (map != null && map.containsKey(componentName)) {
if (map.get(componentName).containsKey(metricType)) {
http://git-wip-us.apache.org/repos/asf/ambari/blob/75d284ab/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/timeline/AMSPropertyProvider.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/timeline/AMSPropertyProvider.java b/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/timeline/AMSPropertyProvider.java
index c1b9c50..9e5c1b6 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/timeline/AMSPropertyProvider.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/timeline/AMSPropertyProvider.java
@@ -69,6 +69,13 @@ public abstract class AMSPropertyProvider extends MetricsPropertyProvider {
private static AtomicInteger printSkipPopulateMsgHostCompCounter = new AtomicInteger(0);
private static final Map<String, String> timelineAppIdCache = new ConcurrentHashMap<>(10);
+ private static final Map<String, String> JVM_PROCESS_NAMES = new HashMap<>(2);
+
+ static {
+ JVM_PROCESS_NAMES.put("HBASE_MASTER", "Master.");
+ JVM_PROCESS_NAMES.put("HBASE_REGIONSERVER", "RegionServer.");
+ }
+
public AMSPropertyProvider(Map<String, Map<String, PropertyInfo>> componentPropertyInfoMap,
URLStreamProvider streamProvider,
ComponentSSLConfiguration configuration,
@@ -411,6 +418,7 @@ public abstract class AMSPropertyProvider extends MetricsPropertyProvider {
if (metricsMap != null) {
for (String propertyId : propertyIdSet) {
if (propertyId != null) {
+// propertyId = postProcessPropertyId(propertyId, getComponentName(resource));
if (metricsMap.containsKey(propertyId)){
if (containsArguments(propertyId)) {
int i = 1;
@@ -614,7 +622,9 @@ public abstract class AMSPropertyProvider extends MetricsPropertyProvider {
requests.put(temporalInfo, metricsRequest);
}
metricsRequest.putResource(getComponentName(resource), resource);
- metricsRequest.putPropertyId(propertyInfo.getPropertyId(), propertyId);
+ metricsRequest.putPropertyId(
+ preprocessPropertyId(propertyInfo.getPropertyId(), getComponentName(resource)),
+ propertyId);
// If request is for a host metric we need to create multiple requests
if (propertyInfo.isAmsHostMetric()) {
metricsRequest.putHosComponentHostMetric(propertyInfo.getPropertyId());
@@ -627,6 +637,21 @@ public abstract class AMSPropertyProvider extends MetricsPropertyProvider {
return requestMap;
}
+ /**
+ * Account for the processName added to the jvm metrics by the HadoopSink.
+ * E.g.: jvm.RegionServer.JvmMetrics.GcTimeMillis
+ *
+ */
+ private String preprocessPropertyId(String propertyId, String componentName) {
+ if (propertyId.startsWith("jvm") && JVM_PROCESS_NAMES.keySet().contains(componentName)) {
+ String newPropertyId = propertyId.replace("jvm.", "jvm." + JVM_PROCESS_NAMES.get(componentName));
+ LOG.debug("Pre-process: " + propertyId + ", to: " + newPropertyId);
+ return newPropertyId;
+ }
+
+ return propertyId;
+ }
+
static URIBuilder getAMSUriBuilder(String hostname, int port, boolean httpsEnabled) {
URIBuilder uriBuilder = new URIBuilder();
uriBuilder.setScheme(httpsEnabled ? "https" : "http");
http://git-wip-us.apache.org/repos/asf/ambari/blob/75d284ab/ambari-server/src/main/java/org/apache/ambari/server/controller/utilities/PropertyHelper.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/controller/utilities/PropertyHelper.java b/ambari-server/src/main/java/org/apache/ambari/server/controller/utilities/PropertyHelper.java
index cefe953..41da279 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/controller/utilities/PropertyHelper.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/controller/utilities/PropertyHelper.java
@@ -36,6 +36,7 @@ import org.apache.ambari.server.controller.spi.Request;
import org.apache.ambari.server.controller.spi.Resource;
import org.apache.ambari.server.controller.spi.SortRequest;
import org.apache.ambari.server.controller.spi.TemporalInfo;
+import org.apache.ambari.server.state.stack.Metric;
import org.apache.commons.lang.StringUtils;
import org.codehaus.jackson.map.ObjectMapper;
import org.codehaus.jackson.type.TypeReference;
@@ -67,6 +68,9 @@ public class PropertyHelper {
private static final Map<Resource.InternalType, Map<String, Map<String, PropertyInfo>>> SQLSERVER_PROPERTY_IDS = readPropertyProviderIds(SQLSERVER_PROPERTIES_FILE);
private static final Map<Resource.InternalType, Map<Resource.Type, String>> KEY_PROPERTY_IDS = readKeyPropertyIds(KEY_PROPERTIES_FILE);
+ // Suffixes to add for Namenode rpc metrics prefixes
+ private static final Map<String, List<String>> RPC_METRIC_SUFFIXES = new HashMap<>();
+
/**
* Regular expression to check for replacement arguments (e.g. $1) in a property id.
*/
@@ -92,6 +96,11 @@ public class PropertyHelper {
*/
private static final Pattern METRIC_CATEGORY_TOKENIZE_REGEX = Pattern.compile("/+(?=([^\"\\\\\\\\]*(\\\\\\\\.|\"([^\"\\\\\\\\]*\\\\\\\\.)*[^\"\\\\\\\\]*\"))*[^\"]*$)");
+ static {
+ RPC_METRIC_SUFFIXES.put("rpc.rpc", Arrays.asList("client", "datanode", "healthcheck"));
+ RPC_METRIC_SUFFIXES.put("rpcdetailed.rpcdetailed", Arrays.asList("client", "datanode", "healthcheck"));
+ }
+
public static String getPropertyId(String category, String name) {
String propertyId = (category == null || category.isEmpty())? name :
(name == null || name.isEmpty()) ? category : category + EXTERNAL_PATH_SEP + name;
@@ -626,4 +635,72 @@ public class PropertyHelper {
}
return false;
}
+
+
+ /**
+ * Special handle rpc port tags added to metric names for HDFS Namenode
+ *
+ * Returns the replacement definitions
+ */
+ public static Map<String, org.apache.ambari.server.state.stack.Metric> processRpcMetricDefinition(
+ String componentName, String propertyId, org.apache.ambari.server.state.stack.Metric metric) {
+ Map<String, org.apache.ambari.server.state.stack.Metric> replacementMap = null;
+ if (componentName.equalsIgnoreCase("NAMENODE")) {
+ for (Map.Entry<String, List<String>> entry : RPC_METRIC_SUFFIXES.entrySet()) {
+ String prefix = entry.getKey();
+ if (metric.getName().startsWith(prefix)) {
+ replacementMap = new HashMap<>();
+ for (String suffix : entry.getValue()) {
+ org.apache.ambari.server.state.stack.Metric newMetric = new org.apache.ambari.server.state.stack.Metric(
+ insertTagInToMetricName(suffix, metric.getName(), prefix),
+ metric.isPointInTime(),
+ metric.isTemporal(),
+ metric.isAmsHostMetric(),
+ metric.getUnit()
+ );
+
+ replacementMap.put(insertTagInToMetricName(suffix, propertyId, prefix), newMetric);
+ }
+ }
+ }
+ }
+ return replacementMap;
+ }
+
+ /**
+ * Returns tag inserted metric name after the prefix.
+ * @param tag E.g.: client
+ * @param metricName : rpc.rpc.CallQueueLength Or metrics/rpc/CallQueueLen
+ * @param prefix : rpc.rpc
+ * @return rpc.rpc.client.CallQueueLength Or metrics/rpc/client/CallQueueLen
+ */
+ static String insertTagInToMetricName(String tag, String metricName, String prefix) {
+ String sepExpr = "\\.";
+ String seperator = ".";
+ if (metricName.indexOf(EXTERNAL_PATH_SEP) != -1) {
+ sepExpr = Character.toString(EXTERNAL_PATH_SEP);
+ seperator = sepExpr;
+ }
+ String prefixSep = prefix.contains(".") ? "\\." : "" + EXTERNAL_PATH_SEP;
+
+ // Remove separator if any
+ if (prefix.substring(prefix.length() - 1).equals(prefixSep)) {
+ prefix = prefix.substring(0, prefix.length() - 1);
+ }
+ int pos = prefix.split(prefixSep).length - 1;
+ String[] parts = metricName.split(sepExpr);
+ StringBuilder sb = new StringBuilder();
+
+ for (int i = 0; i < parts.length; i++) {
+ sb.append(parts[i]);
+ if (i < parts.length - 1) {
+ sb.append(seperator);
+ }
+ if (i == pos) { // append the tag
+ sb.append(tag);
+ sb.append(seperator);
+ }
+ }
+ return sb.toString();
+ }
}
http://git-wip-us.apache.org/repos/asf/ambari/blob/75d284ab/ambari-server/src/main/java/org/apache/ambari/server/upgrade/UpgradeCatalog222.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/upgrade/UpgradeCatalog222.java b/ambari-server/src/main/java/org/apache/ambari/server/upgrade/UpgradeCatalog222.java
index 8ea455d..78cf8bd 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/upgrade/UpgradeCatalog222.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/upgrade/UpgradeCatalog222.java
@@ -18,25 +18,38 @@
package org.apache.ambari.server.upgrade;
+import com.google.gson.Gson;
import com.google.gson.JsonObject;
import com.google.gson.JsonParser;
+import com.google.gson.reflect.TypeToken;
import com.google.inject.Inject;
import com.google.inject.Injector;
import org.apache.ambari.server.AmbariException;
+import org.apache.ambari.server.api.services.AmbariMetaInfo;
import org.apache.ambari.server.controller.AmbariManagementController;
import org.apache.ambari.server.orm.dao.AlertDefinitionDAO;
import org.apache.ambari.server.orm.dao.DaoUtils;
+import org.apache.ambari.server.orm.dao.WidgetDAO;
import org.apache.ambari.server.orm.entities.AlertDefinitionEntity;
+import org.apache.ambari.server.orm.entities.WidgetEntity;
import org.apache.ambari.server.state.Cluster;
import org.apache.ambari.server.state.Clusters;
import org.apache.ambari.server.state.Config;
import org.apache.ambari.server.state.ServiceComponentHost;
+import org.apache.ambari.server.state.ServiceInfo;
import org.apache.ambari.server.state.StackId;
+import org.apache.ambari.server.state.StackInfo;
+import org.apache.ambari.server.state.stack.WidgetLayout;
+import org.apache.ambari.server.state.stack.WidgetLayoutInfo;
import org.apache.ambari.server.utils.VersionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.File;
+import java.io.FileReader;
+import java.lang.reflect.Type;
import java.sql.SQLException;
+import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -80,6 +93,9 @@ public class UpgradeCatalog222 extends AbstractUpgradeCatalog {
public static final String CLUSTER_HOUR_TABLE_TTL = "timeline.metrics.cluster.aggregator.hourly.ttl";
public static final String CLUSTER_DAILY_TABLE_TTL = "timeline.metrics.cluster.aggregator.daily.ttl";
+ private static final String[] HDFS_WIDGETS_TO_UPDATE = new String[] {
+ "NameNode RPC", "NN Connection Load" };
+
// ----- Constructors ------------------------------------------------------
@@ -133,6 +149,7 @@ public class UpgradeCatalog222 extends AbstractUpgradeCatalog {
updateAMSConfigs();
updateHiveConfig();
updateHostRoleCommands();
+ updateHDFSWidgetDefinition();
}
protected void updateStormConfigs() throws AmbariException {
@@ -336,6 +353,79 @@ public class UpgradeCatalog222 extends AbstractUpgradeCatalog {
}
}
+ protected void updateHDFSWidgetDefinition() throws AmbariException {
+ LOG.info("Updating HDFS widget definition.");
+ AmbariManagementController ambariManagementController = injector.getInstance(AmbariManagementController.class);
+ AmbariMetaInfo ambariMetaInfo = injector.getInstance(AmbariMetaInfo.class);
+ Type widgetLayoutType = new TypeToken<Map<String, List<WidgetLayout>>>(){}.getType();
+ Gson gson = injector.getInstance(Gson.class);
+ WidgetDAO widgetDAO = injector.getInstance(WidgetDAO.class);
+
+ Clusters clusters = ambariManagementController.getClusters();
+
+ Map<String, Cluster> clusterMap = getCheckedClusterMap(clusters);
+ for (final Cluster cluster : clusterMap.values()) {
+ long clusterID = cluster.getClusterId();
+
+ for (String widgetName : HDFS_WIDGETS_TO_UPDATE) {
+ List<WidgetEntity> widgetEntities = widgetDAO.findByName(clusterID,
+ widgetName, "ambari", "HDFS_SUMMARY");
+
+ if (widgetEntities != null) {
+ WidgetEntity entityToUpdate = null;
+ if (widgetEntities.size() > 1) {
+ LOG.info("Found more that 1 entity with name = "+ widgetName +
+ " for cluster = " + cluster.getClusterName() + ", skipping update.");
+ } else {
+ entityToUpdate = widgetEntities.iterator().next();
+ }
+ if (entityToUpdate != null) {
+ LOG.info("Updating widget: " + entityToUpdate.getWidgetName());
+ // Get the definition from widgets.json file
+ WidgetLayoutInfo targetWidgetLayoutInfo = null;
+ StackId stackId = cluster.getDesiredStackVersion();
+ Map<String, Object> widgetDescriptor = null;
+ StackInfo stackInfo = ambariMetaInfo.getStack(stackId.getStackName(), stackId.getStackVersion());
+ ServiceInfo serviceInfo = stackInfo.getService("HDFS");
+ File widgetDescriptorFile = serviceInfo.getWidgetsDescriptorFile();
+ if (widgetDescriptorFile != null && widgetDescriptorFile.exists()) {
+ try {
+ widgetDescriptor = gson.fromJson(new FileReader(widgetDescriptorFile), widgetLayoutType);
+ } catch (Exception ex) {
+ String msg = "Error loading widgets from file: " + widgetDescriptorFile;
+ LOG.error(msg, ex);
+ widgetDescriptor = null;
+ }
+ }
+ if (widgetDescriptor != null) {
+ LOG.debug("Loaded widget descriptor: " + widgetDescriptor);
+ for (Object artifact : widgetDescriptor.values()) {
+ List<WidgetLayout> widgetLayouts = (List<WidgetLayout>) artifact;
+ for (WidgetLayout widgetLayout : widgetLayouts) {
+ if (widgetLayout.getLayoutName().equals("default_hdfs_dashboard")) {
+ for (WidgetLayoutInfo layoutInfo : widgetLayout.getWidgetLayoutInfoList()) {
+ if (layoutInfo.getWidgetName().equals(widgetName)) {
+ targetWidgetLayoutInfo = layoutInfo;
+ }
+ }
+ }
+ }
+ }
+ }
+ if (targetWidgetLayoutInfo != null) {
+ entityToUpdate.setMetrics(gson.toJson(targetWidgetLayoutInfo.getMetricsInfo()));
+ entityToUpdate.setWidgetValues(gson.toJson(targetWidgetLayoutInfo.getValues()));
+ widgetDAO.merge(entityToUpdate);
+ } else {
+ LOG.warn("Unable to find widget layout info for " + widgetName +
+ " in the stack: " + stackId);
+ }
+ }
+ }
+ }
+ }
+ }
+
protected void updateHiveConfig() throws AmbariException {
AmbariManagementController ambariManagementController = injector.getInstance(AmbariManagementController.class);
for (final Cluster cluster : getCheckedClusterMap(ambariManagementController.getClusters()).values()) {
http://git-wip-us.apache.org/repos/asf/ambari/blob/75d284ab/ambari-server/src/main/resources/common-services/HDFS/2.1.0.2.0/widgets.json
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/common-services/HDFS/2.1.0.2.0/widgets.json b/ambari-server/src/main/resources/common-services/HDFS/2.1.0.2.0/widgets.json
index 7e93a6e..89aab13 100644
--- a/ambari-server/src/main/resources/common-services/HDFS/2.1.0.2.0/widgets.json
+++ b/ambari-server/src/main/resources/common-services/HDFS/2.1.0.2.0/widgets.json
@@ -74,8 +74,16 @@
"is_visible": true,
"metrics": [
{
- "name": "rpc.rpc.NumOpenConnections",
- "metric_path": "metrics/rpc/NumOpenConnections",
+ "name": "rpc.rpc.client.NumOpenConnections",
+ "metric_path": "metrics/rpc/client/NumOpenConnections",
+ "category": "",
+ "service_name": "HDFS",
+ "component_name": "NAMENODE",
+ "host_component_criteria": "host_components/metrics/dfs/FSNamesystem/HAState=active"
+ },
+ {
+ "name": "rpc.rpc.datanode.NumOpenConnections",
+ "metric_path": "metrics/rpc/datanode/NumOpenConnections",
"category": "",
"service_name": "HDFS",
"component_name": "NAMENODE",
@@ -84,8 +92,12 @@
],
"values": [
{
- "name": "Open Connections",
- "value": "${rpc.rpc.NumOpenConnections}"
+ "name": "Open Client Connections",
+ "value": "${rpc.rpc.client.NumOpenConnections}"
+ },
+ {
+ "name": "Open Datanode Connections",
+ "value": "${rpc.rpc.datanode.NumOpenConnections}"
}
],
"properties": {
@@ -216,15 +228,29 @@
"is_visible": true,
"metrics": [
{
- "name": "rpc.rpc.RpcQueueTimeAvgTime",
- "metric_path": "metrics/rpc/RpcQueueTime_avg_time",
+ "name": "rpc.rpc.client.RpcQueueTimeAvgTime",
+ "metric_path": "metrics/rpc/client/RpcQueueTime_avg_time",
+ "service_name": "HDFS",
+ "component_name": "NAMENODE",
+ "host_component_criteria": "host_components/metrics/dfs/FSNamesystem/HAState=active"
+ },
+ {
+ "name": "rpc.rpc.client.RpcProcessingTimeAvgTime",
+ "metric_path": "metrics/rpc/client/RpcProcessingTime_avg_time",
"service_name": "HDFS",
"component_name": "NAMENODE",
"host_component_criteria": "host_components/metrics/dfs/FSNamesystem/HAState=active"
},
{
- "name": "rpc.rpc.RpcProcessingTimeAvgTime",
- "metric_path": "metrics/rpc/RpcProcessingTime_avg_time",
+ "name": "rpc.rpc.datanode.RpcQueueTimeAvgTime",
+ "metric_path": "metrics/rpc/datanode/RpcQueueTime_avg_time",
+ "service_name": "HDFS",
+ "component_name": "NAMENODE",
+ "host_component_criteria": "host_components/metrics/dfs/FSNamesystem/HAState=active"
+ },
+ {
+ "name": "rpc.rpc.datanode.RpcProcessingTimeAvgTime",
+ "metric_path": "metrics/rpc/datanode/RpcProcessingTime_avg_time",
"service_name": "HDFS",
"component_name": "NAMENODE",
"host_component_criteria": "host_components/metrics/dfs/FSNamesystem/HAState=active"
@@ -232,12 +258,20 @@
],
"values": [
{
- "name": "RPC Queue Wait time",
- "value": "${rpc.rpc.RpcQueueTimeAvgTime}"
+ "name": "Client RPC Queue Wait time",
+ "value": "${rpc.rpc.client.RpcQueueTimeAvgTime}"
+ },
+ {
+ "name": "Client RPC Processing time",
+ "value": "${rpc.rpc.client.RpcProcessingTimeAvgTime}"
+ },
+ {
+ "name": "Datanode RPC Queue Wait time",
+ "value": "${rpc.rpc.datanode.RpcQueueTimeAvgTime}"
},
{
- "name": "RPC Processing time",
- "value": "${rpc.rpc.RpcProcessingTimeAvgTime}"
+ "name": "Datanode RPC Processing time",
+ "value": "${rpc.rpc.datanode.RpcProcessingTimeAvgTime}"
}
],
"properties": {
http://git-wip-us.apache.org/repos/asf/ambari/blob/75d284ab/ambari-server/src/main/resources/stacks/HDP/2.0.6/hooks/before-START/scripts/params.py
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/stacks/HDP/2.0.6/hooks/before-START/scripts/params.py b/ambari-server/src/main/resources/stacks/HDP/2.0.6/hooks/before-START/scripts/params.py
index dc71408..ff60227 100644
--- a/ambari-server/src/main/resources/stacks/HDP/2.0.6/hooks/before-START/scripts/params.py
+++ b/ambari-server/src/main/resources/stacks/HDP/2.0.6/hooks/before-START/scripts/params.py
@@ -215,3 +215,61 @@ net_topology_script_file_path = "/etc/hadoop/conf/topology_script.py"
net_topology_script_dir = os.path.dirname(net_topology_script_file_path)
net_topology_mapping_data_file_name = 'topology_mappings.data'
net_topology_mapping_data_file_path = os.path.join(net_topology_script_dir, net_topology_mapping_data_file_name)
+
+##### Namenode RPC ports - metrics config section start #####
+
+# Figure out the rpc ports for current namenode
+
+nn_rpc_client_port = None
+nn_rpc_dn_port = None
+nn_rpc_healthcheck_port = None
+
+namenode_id = None
+namenode_rpc = None
+
+dfs_ha_enabled = False
+dfs_ha_nameservices = default("/configurations/hdfs-site/dfs.nameservices", None)
+dfs_ha_namenode_ids = default(format("/configurations/hdfs-site/dfs.ha.namenodes.{dfs_ha_nameservices}"), None)
+
+dfs_ha_namemodes_ids_list = []
+other_namenode_id = None
+
+if dfs_ha_namenode_ids:
+ dfs_ha_namemodes_ids_list = dfs_ha_namenode_ids.split(",")
+ dfs_ha_namenode_ids_array_len = len(dfs_ha_namemodes_ids_list)
+ if dfs_ha_namenode_ids_array_len > 1:
+ dfs_ha_enabled = True
+
+if dfs_ha_enabled:
+ for nn_id in dfs_ha_namemodes_ids_list:
+ nn_host = config['configurations']['hdfs-site'][format('dfs.namenode.rpc-address.{dfs_ha_nameservices}.{nn_id}')]
+ if hostname in nn_host:
+ namenode_id = nn_id
+ namenode_rpc = nn_host
+ pass
+ pass
+else:
+ namenode_rpc = default('/configurations/hdfs-site/dfs.namenode.rpc-address', None)
+
+if namenode_rpc:
+ nn_rpc_client_port = namenode_rpc.split(':')[1].strip()
+
+if dfs_ha_enabled:
+ dfs_service_rpc_address = default(format('/configurations/hdfs-site/dfs.namenode.servicerpc-address.{namenode_id}'), None)
+ dfs_lifeline_rpc_address = default(format('/configurations/hdfs-site/dfs.namenode.lifeline.rpc-address.{namenode_id}'), None)
+else:
+ dfs_service_rpc_address = default('/configurations/hdfs-site/dfs.namenode.servicerpc-address', None)
+ dfs_lifeline_rpc_address = default(format('/configurations/hdfs-site/dfs.namenode.lifeline.rpc-address'), None)
+
+if dfs_service_rpc_address:
+ nn_rpc_dn_port = dfs_service_rpc_address.split(':')[1].strip()
+
+if dfs_lifeline_rpc_address:
+ nn_rpc_healthcheck_port = dfs_lifeline_rpc_address.split(':')[1].strip()
+
+is_nn_client_port_configured = False if nn_rpc_client_port is None else True
+is_nn_dn_port_configured = False if nn_rpc_dn_port is None else True
+is_nn_healthcheck_port_configured = False if nn_rpc_healthcheck_port is None else True
+
+##### end #####
+
http://git-wip-us.apache.org/repos/asf/ambari/blob/75d284ab/ambari-server/src/main/resources/stacks/HDP/2.0.6/hooks/before-START/templates/hadoop-metrics2.properties.j2
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/stacks/HDP/2.0.6/hooks/before-START/templates/hadoop-metrics2.properties.j2 b/ambari-server/src/main/resources/stacks/HDP/2.0.6/hooks/before-START/templates/hadoop-metrics2.properties.j2
index f9c2164..47b504f 100644
--- a/ambari-server/src/main/resources/stacks/HDP/2.0.6/hooks/before-START/templates/hadoop-metrics2.properties.j2
+++ b/ambari-server/src/main/resources/stacks/HDP/2.0.6/hooks/before-START/templates/hadoop-metrics2.properties.j2
@@ -91,4 +91,15 @@ reducetask.sink.timeline.collector={{metric_collector_protocol}}://{{metric_coll
resourcemanager.sink.timeline.tagsForPrefix.yarn=Queue
+{% if is_nn_client_port_configured %}
+# Namenode rpc ports customization
+namenode.sink.timeline.metric.rpc.client.port={{nn_rpc_client_port}}
+{% endif %}
+{% if is_nn_dn_port_configured %}
+namenode.sink.timeline.metric.rpc.datanode.port={{nn_rpc_dn_port}}
+{% endif %}
+{% if is_nn_healthcheck_port_configured %}
+namenode.sink.timeline.metric.rpc.healthcheck.port={{nn_rpc_healthcheck_port}}
+{% endif %}
+
{% endif %}
http://git-wip-us.apache.org/repos/asf/ambari/blob/75d284ab/ambari-server/src/test/java/org/apache/ambari/server/controller/internal/StackArtifactResourceProviderTest.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/test/java/org/apache/ambari/server/controller/internal/StackArtifactResourceProviderTest.java b/ambari-server/src/test/java/org/apache/ambari/server/controller/internal/StackArtifactResourceProviderTest.java
index f4c212c..fda5e79 100644
--- a/ambari-server/src/test/java/org/apache/ambari/server/controller/internal/StackArtifactResourceProviderTest.java
+++ b/ambari-server/src/test/java/org/apache/ambari/server/controller/internal/StackArtifactResourceProviderTest.java
@@ -118,8 +118,7 @@ public class StackArtifactResourceProviderTest {
Map<String, Object> descriptor = propertyMap.get(ARTIFACT_DATA_PROPERTY_ID + "/HDFS/DATANODE");
Assert.assertNotNull(descriptor);
Assert.assertEquals(1, ((ArrayList) descriptor.get("Component")).size());
- MetricDefinition md = (MetricDefinition) ((ArrayList) descriptor.get
- ("Component")).iterator().next();
+ MetricDefinition md = (MetricDefinition) ((ArrayList) descriptor.get("Component")).iterator().next();
Metric m1 = md.getMetrics().get("metrics/dfs/datanode/heartBeats_avg_time");
Metric m2 = md.getMetrics().get("metrics/rpc/closeRegion_num_ops");
@@ -132,6 +131,53 @@ public class StackArtifactResourceProviderTest {
}
@Test
+ public void testGetMetricsDescriptorRpcForNamenode() throws Exception {
+ AmbariManagementController managementController = createNiceMock(AmbariManagementController.class);
+
+ expect(managementController.getAmbariMetaInfo()).andReturn(metaInfo).anyTimes();
+
+ replay(managementController);
+
+ StackArtifactResourceProvider resourceProvider = getStackArtifactResourceProvider(managementController);
+
+ Set<String> propertyIds = new HashSet<String>();
+ propertyIds.add(ARTIFACT_NAME_PROPERTY_ID);
+ propertyIds.add(STACK_NAME_PROPERTY_ID);
+ propertyIds.add(STACK_VERSION_PROPERTY_ID);
+ propertyIds.add(STACK_SERVICE_NAME_PROPERTY_ID);
+ propertyIds.add(ARTIFACT_DATA_PROPERTY_ID);
+
+ Request request = PropertyHelper.getReadRequest(propertyIds);
+
+ Predicate predicate = new PredicateBuilder().property
+ (ARTIFACT_NAME_PROPERTY_ID).equals("metrics_descriptor").and().property
+ (STACK_NAME_PROPERTY_ID).equals("OTHER").and().property
+ (STACK_VERSION_PROPERTY_ID).equals("1.0").and().property
+ (STACK_SERVICE_NAME_PROPERTY_ID).equals("HDFS").toPredicate();
+
+ Set<Resource> resources = resourceProvider.getResources(request, predicate);
+
+ Assert.assertEquals(1, resources.size());
+ Resource resource = resources.iterator().next();
+ Map<String, Map<String, Object>> propertyMap = resource.getPropertiesMap();
+ Map<String, Object> descriptor = propertyMap.get(ARTIFACT_DATA_PROPERTY_ID + "/HDFS/NAMENODE");
+ Assert.assertNotNull(descriptor);
+ Assert.assertEquals(2, ((ArrayList) descriptor.get("Component")).size());
+ MetricDefinition md = (MetricDefinition) ((ArrayList) descriptor.get("Component")).iterator().next();
+
+ Assert.assertEquals("rpcdetailed.rpcdetailed.client.BlockReceivedAndDeletedAvgTime",
+ md.getMetrics().get("metrics/rpcdetailed/client/blockReceived_avg_time").getName());
+
+ Assert.assertEquals("rpc.rpc.healthcheck.CallQueueLength",
+ md.getMetrics().get("metrics/rpc/healthcheck/callQueueLen").getName());
+
+ Assert.assertEquals("rpcdetailed.rpcdetailed.datanode.DeleteNumOps",
+ md.getMetrics().get("metrics/rpcdetailed/datanode/delete_num_ops").getName());
+
+ verify(managementController);
+ }
+
+ @Test
@SuppressWarnings("unchecked")
public void testGetWidgetDescriptorForService() throws Exception {
AmbariManagementController managementController = createNiceMock(AmbariManagementController.class);
http://git-wip-us.apache.org/repos/asf/ambari/blob/75d284ab/ambari-server/src/test/java/org/apache/ambari/server/controller/utilities/PropertyHelperTest.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/test/java/org/apache/ambari/server/controller/utilities/PropertyHelperTest.java b/ambari-server/src/test/java/org/apache/ambari/server/controller/utilities/PropertyHelperTest.java
index d450177..2beb462 100644
--- a/ambari-server/src/test/java/org/apache/ambari/server/controller/utilities/PropertyHelperTest.java
+++ b/ambari-server/src/test/java/org/apache/ambari/server/controller/utilities/PropertyHelperTest.java
@@ -302,6 +302,20 @@ public class PropertyHelperTest {
}
}
+ @Test
+ public void testInsertTagIntoMetricName() {
+ Assert.assertEquals("rpc.rpc.client.CallQueueLength",
+ PropertyHelper.insertTagInToMetricName("client", "rpc.rpc.CallQueueLength", "rpc.rpc"));
+
+ Assert.assertEquals("rpc.rpc.client.CallQueueLength",
+ PropertyHelper.insertTagInToMetricName("client", "rpc.rpc.CallQueueLength", "rpc.rpc."));
+
+ Assert.assertEquals("metrics/rpc/client/CallQueueLen",
+ PropertyHelper.insertTagInToMetricName("client", "metrics/rpc/CallQueueLen", "rpc.rpc"));
+
+ Assert.assertEquals("metrics/rpc/client/CallQueueLen",
+ PropertyHelper.insertTagInToMetricName("client", "metrics/rpc/CallQueueLen", "rpc.rpc."));
+ }
// remove any replacement tokens (e.g. $1.replaceAll(\",q(\\d+)=\",\"/\").substring(1)) in the metric names
private static Map<String, Map<String, PropertyInfo>> normalizeMetricNames(Map<String, Map<String, PropertyInfo>> gids) {
http://git-wip-us.apache.org/repos/asf/ambari/blob/75d284ab/ambari-server/src/test/java/org/apache/ambari/server/upgrade/UpgradeCatalog222Test.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/test/java/org/apache/ambari/server/upgrade/UpgradeCatalog222Test.java b/ambari-server/src/test/java/org/apache/ambari/server/upgrade/UpgradeCatalog222Test.java
index b88bc09..7a95721 100644
--- a/ambari-server/src/test/java/org/apache/ambari/server/upgrade/UpgradeCatalog222Test.java
+++ b/ambari-server/src/test/java/org/apache/ambari/server/upgrade/UpgradeCatalog222Test.java
@@ -42,24 +42,34 @@ import org.apache.ambari.server.orm.GuiceJpaInitializer;
import org.apache.ambari.server.orm.InMemoryDefaultTestModule;
import org.apache.ambari.server.orm.dao.AlertDefinitionDAO;
import org.apache.ambari.server.orm.dao.StackDAO;
+import org.apache.ambari.server.orm.dao.WidgetDAO;
import org.apache.ambari.server.orm.entities.AlertDefinitionEntity;
import org.apache.ambari.server.orm.entities.StackEntity;
+import org.apache.ambari.server.orm.entities.WidgetEntity;
+import org.apache.ambari.server.stack.StackManagerFactory;
import org.apache.ambari.server.state.Cluster;
import org.apache.ambari.server.state.Clusters;
import org.apache.ambari.server.state.Config;
import org.apache.ambari.server.state.ServiceComponentHost;
+import org.apache.ambari.server.state.ServiceInfo;
import org.apache.ambari.server.state.StackId;
+import org.apache.ambari.server.state.StackInfo;
import org.apache.ambari.server.state.stack.OsFamily;
+import org.apache.commons.io.FileUtils;
import org.easymock.Capture;
import org.easymock.EasyMock;
import org.easymock.EasyMockSupport;
import org.junit.After;
import org.junit.Before;
+import org.junit.Rule;
import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
import javax.persistence.EntityManager;
+import java.io.File;
import java.lang.reflect.Method;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -83,7 +93,8 @@ public class UpgradeCatalog222Test {
private UpgradeCatalogHelper upgradeCatalogHelper;
private StackEntity desiredStackEntity;
-
+ @Rule
+ public TemporaryFolder temporaryFolder = new TemporaryFolder();
@Before
public void init() {
@@ -114,16 +125,18 @@ public class UpgradeCatalog222Test {
Method updateAMSConfigs = UpgradeCatalog222.class.getDeclaredMethod("updateAMSConfigs");
Method updateHiveConfigs = UpgradeCatalog222.class.getDeclaredMethod("updateHiveConfig");
Method updateHostRoleCommands = UpgradeCatalog222.class.getDeclaredMethod("updateHostRoleCommands");
+ Method updateHDFSWidget = UpgradeCatalog222.class.getDeclaredMethod("updateHDFSWidgetDefinition");
UpgradeCatalog222 upgradeCatalog222 = createMockBuilder(UpgradeCatalog222.class)
- .addMockedMethod(addNewConfigurationsFromXml)
- .addMockedMethod(updateAlerts)
- .addMockedMethod(updateStormConfigs)
- .addMockedMethod(updateAMSConfigs)
- .addMockedMethod(updateHiveConfigs)
- .addMockedMethod(updateHostRoleCommands)
- .createMock();
+ .addMockedMethod(addNewConfigurationsFromXml)
+ .addMockedMethod(updateAlerts)
+ .addMockedMethod(updateStormConfigs)
+ .addMockedMethod(updateAMSConfigs)
+ .addMockedMethod(updateHiveConfigs)
+ .addMockedMethod(updateHostRoleCommands)
+ .addMockedMethod(updateHDFSWidget)
+ .createMock();
upgradeCatalog222.addNewConfigurationsFromXml();
expectLastCall().once();
@@ -137,6 +150,8 @@ public class UpgradeCatalog222Test {
expectLastCall().once();
upgradeCatalog222.updateHiveConfig();
expectLastCall().once();
+ upgradeCatalog222.updateHDFSWidgetDefinition();
+ expectLastCall().once();
replay(upgradeCatalog222);
@@ -176,8 +191,8 @@ public class UpgradeCatalog222Test {
expect(mockAlertDefinitionDAO.findByName(eq(clusterId), eq("yarn_app_timeline_server_webui")))
.andReturn(mockATSWebAlert).atLeastOnce();
expect(mockATSWebAlert.getSource()).andReturn("{\"uri\": {\n" +
- " \"http\": \"{{yarn-site/yarn.timeline-service.webapp.address}}/ws/v1/timeline\",\n" +
- " \"https\": \"{{yarn-site/yarn.timeline-service.webapp.https.address}}/ws/v1/timeline\" } }");
+ " \"http\": \"{{yarn-site/yarn.timeline-service.webapp.address}}/ws/v1/timeline\",\n" +
+ " \"https\": \"{{yarn-site/yarn.timeline-service.webapp.https.address}}/ws/v1/timeline\" } }");
mockATSWebAlert.setSource("{\"uri\":{\"http\":\"{{yarn-site/yarn.timeline-service.webapp.address}}/ws/v1/timeline\",\"https\":\"{{yarn-site/yarn.timeline-service.webapp.https.address}}/ws/v1/timeline\"}}");
expectLastCall().once();
@@ -255,7 +270,6 @@ public class UpgradeCatalog222Test {
easyMockSupport.verifyAll();
}
-
@Test
public void testAmsSiteUpdateConfigs() throws Exception{
@@ -331,7 +345,60 @@ public class UpgradeCatalog222Test {
ConfigurationRequest configurationRequest = configurationRequestCapture.getValue();
Map<String, String> updatedProperties = configurationRequest.getProperties();
assertTrue(Maps.difference(newPropertiesAmsSite, updatedProperties).areEqual());
+ }
+ @Test
+ public void testHDFSWidgetUpdate() throws Exception {
+ final Clusters clusters = createNiceMock(Clusters.class);
+ final Cluster cluster = createNiceMock(Cluster.class);
+ final AmbariManagementController controller = createNiceMock(AmbariManagementController.class);
+ final Gson gson = new Gson();
+ final WidgetDAO widgetDAO = createNiceMock(WidgetDAO.class);
+ final AmbariMetaInfo metaInfo = createNiceMock(AmbariMetaInfo.class);
+ WidgetEntity widgetEntity = createNiceMock(WidgetEntity.class);
+ StackId stackId = new StackId("HDP", "2.0.0");
+ StackInfo stackInfo = createNiceMock(StackInfo.class);
+ ServiceInfo serviceInfo = createNiceMock(ServiceInfo.class);
+
+ String widgetStr = "{\"layouts\":[{\"layout_name\":\"default_hdfs_dashboard\",\"display_name\":\"Standard HDFS Dashboard\",\"section_name\":\"HDFS_SUMMARY\",\"widgetLayoutInfo\":[{\"widget_name\":\"NameNode RPC\",\"metrics\":[],\"values\":[]}]}]}";
+
+ File dataDirectory = temporaryFolder.newFolder();
+ File file = new File(dataDirectory, "hdfs_widget.json");
+ FileUtils.writeStringToFile(file, widgetStr);
+
+ final Injector mockInjector = Guice.createInjector(new AbstractModule() {
+ @Override
+ protected void configure() {
+ bind(EntityManager.class).toInstance(createNiceMock(EntityManager.class));
+ bind(AmbariManagementController.class).toInstance(controller);
+ bind(Clusters.class).toInstance(clusters);
+ bind(DBAccessor.class).toInstance(createNiceMock(DBAccessor.class));
+ bind(OsFamily.class).toInstance(createNiceMock(OsFamily.class));
+ bind(Gson.class).toInstance(gson);
+ bind(WidgetDAO.class).toInstance(widgetDAO);
+ bind(StackManagerFactory.class).toInstance(createNiceMock(StackManagerFactory.class));
+ bind(AmbariMetaInfo.class).toInstance(metaInfo);
+ }
+ });
+ expect(controller.getClusters()).andReturn(clusters).anyTimes();
+ expect(clusters.getClusters()).andReturn(new HashMap<String, Cluster>() {{
+ put("normal", cluster);
+ }}).anyTimes();
+ expect(cluster.getClusterId()).andReturn(1L).anyTimes();
+ expect(widgetDAO.findByName(1L, "NameNode RPC", "ambari", "HDFS_SUMMARY"))
+ .andReturn(Collections.singletonList(widgetEntity));
+ expect(cluster.getDesiredStackVersion()).andReturn(stackId);
+ expect(metaInfo.getStack("HDP", "2.0.0")).andReturn(stackInfo);
+ expect(stackInfo.getService("HDFS")).andReturn(serviceInfo);
+ expect(serviceInfo.getWidgetsDescriptorFile()).andReturn(file);
+ expect(widgetDAO.merge(widgetEntity)).andReturn(null);
+ expect(widgetEntity.getWidgetName()).andReturn("Namenode RPC").anyTimes();
+
+ replay(clusters, cluster, controller, widgetDAO, metaInfo, widgetEntity, stackInfo, serviceInfo);
+
+ mockInjector.getInstance(UpgradeCatalog222.class).updateHDFSWidgetDefinition();
+
+ verify(clusters, cluster, controller, widgetDAO, widgetEntity, stackInfo, serviceInfo);
}
@Test