You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by sw...@apache.org on 2016/02/03 01:37:25 UTC
ambari git commit: AMBARI-14882. AMS aggregates Counter values as
average over the timeseries (and other issues). (swagle)
Repository: ambari
Updated Branches:
refs/heads/branch-2.2 76d54b46d -> 17afb926f
AMBARI-14882. AMS aggregates Counter values as average over the timeseries (and other issues). (swagle)
Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/17afb926
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/17afb926
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/17afb926
Branch: refs/heads/branch-2.2
Commit: 17afb926f7b262ef7a978dc020a718329fc01900
Parents: 76d54b4
Author: Siddharth Wagle <sw...@hortonworks.com>
Authored: Tue Feb 2 16:36:40 2016 -0800
Committer: Siddharth Wagle <sw...@hortonworks.com>
Committed: Tue Feb 2 16:36:48 2016 -0800
----------------------------------------------------------------------
.../timeline/AbstractTimelineMetricsSink.java | 4 +-
.../metrics2/sink/timeline/TimelineMetric.java | 13 ++-
.../sink/timeline/TimelineMetricMetadata.java | 15 +--
.../timeline/cache/TimelineMetricsCache.java | 9 +-
.../sink/flume/FlumeTimelineMetricsSink.java | 7 +-
.../timeline/HadoopTimelineMetricsSink.java | 6 +-
.../timeline/HadoopTimelineMetricsSinkTest.java | 5 +-
.../kafka/KafkaTimelineMetricsReporter.java | 16 +--
.../storm/StormTimelineMetricsReporter.java | 2 -
.../timeline/HBaseTimelineMetricStore.java | 19 ++--
.../metrics/timeline/PhoenixHBaseAccessor.java | 111 +++++++------------
.../metrics/timeline/aggregators/Function.java | 75 ++++++++++---
.../aggregators/TimelineMetricReadHelper.java | 38 +++++++
.../TimelineMetricMetadataManager.java | 5 +-
.../metrics/timeline/FunctionTest.java | 10 +-
.../timeline/HBaseTimelineMetricStoreTest.java | 31 +++++-
16 files changed, 233 insertions(+), 133 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ambari/blob/17afb926/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/AbstractTimelineMetricsSink.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/AbstractTimelineMetricsSink.java b/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/AbstractTimelineMetricsSink.java
index 6d7c55f..9173889 100644
--- a/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/AbstractTimelineMetricsSink.java
+++ b/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/AbstractTimelineMetricsSink.java
@@ -37,6 +37,7 @@ public abstract class AbstractTimelineMetricsSink {
public static final String COLLECTOR_HOST_PROPERTY = "collector";
public static final String COLLECTOR_PORT_PROPERTY = "port";
public static final int DEFAULT_POST_TIMEOUT_SECONDS = 10;
+ public static final String SKIP_COUNTER_TRANSFROMATION = "skipCounterDerivative";
protected final Log LOG;
@@ -60,8 +61,7 @@ public abstract class AbstractTimelineMetricsSink {
try {
String jsonData = mapper.writeValueAsString(metrics);
- HttpURLConnection connection =
- (HttpURLConnection) new URL(connectUrl).openConnection();
+ HttpURLConnection connection = (HttpURLConnection) new URL(connectUrl).openConnection();
connection.setRequestMethod("POST");
connection.setRequestProperty("Content-Type", "application/json");
http://git-wip-us.apache.org/repos/asf/ambari/blob/17afb926/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/TimelineMetric.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/TimelineMetric.java b/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/TimelineMetric.java
index e4dc423..98f4978 100644
--- a/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/TimelineMetric.java
+++ b/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/TimelineMetric.java
@@ -42,6 +42,7 @@ public class TimelineMetric implements Comparable<TimelineMetric> {
private long timestamp;
private long startTime;
private String type;
+ private String units;
private TreeMap<Long, Double> metricValues = new TreeMap<Long, Double>();
// default
@@ -53,6 +54,7 @@ public class TimelineMetric implements Comparable<TimelineMetric> {
public TimelineMetric(TimelineMetric metric) {
setMetricName(metric.getMetricName());
setType(metric.getType());
+ setUnits(metric.getUnits());
setTimestamp(metric.getTimestamp());
setAppId(metric.getAppId());
setInstanceId(metric.getInstanceId());
@@ -115,7 +117,7 @@ public class TimelineMetric implements Comparable<TimelineMetric> {
this.startTime = startTime;
}
- @XmlElement(name = "type")
+ @XmlElement(name = "type", defaultValue = "UNDEFINED")
public String getType() {
return type;
}
@@ -124,6 +126,15 @@ public class TimelineMetric implements Comparable<TimelineMetric> {
this.type = type;
}
+ @XmlElement(name = "units")
+ public String getUnits() {
+ return units;
+ }
+
+ public void setUnits(String units) {
+ this.units = units;
+ }
+
@XmlElement(name = "metrics")
public TreeMap<Long, Double> getMetricValues() {
return metricValues;
http://git-wip-us.apache.org/repos/asf/ambari/blob/17afb926/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/TimelineMetricMetadata.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/TimelineMetricMetadata.java b/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/TimelineMetricMetadata.java
index 0624f9c..1f413a0 100644
--- a/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/TimelineMetricMetadata.java
+++ b/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/TimelineMetricMetadata.java
@@ -33,16 +33,17 @@ public class TimelineMetricMetadata {
private String metricName;
private String appId;
private String units;
- private MetricType type = MetricType.UNDEFINED;
+ private String type = "UNDEFINED";
private Long seriesStartTime;
boolean supportsAggregates = true;
// Serialization ignored helper flag
boolean isPersisted = false;
+ // Placeholder to add more type later
public enum MetricType {
- GAUGE, // Can vary in both directions
- COUNTER, // Single dimension
- UNDEFINED // Default
+ GAUGE,
+ COUNTER,
+ UNDEFINED
}
// Default constructor
@@ -50,7 +51,7 @@ public class TimelineMetricMetadata {
}
public TimelineMetricMetadata(String metricName, String appId, String units,
- MetricType type, Long seriesStartTime,
+ String type, Long seriesStartTime,
boolean supportsAggregates) {
this.metricName = metricName;
this.appId = appId;
@@ -89,11 +90,11 @@ public class TimelineMetricMetadata {
}
@XmlElement(name = "type")
- public MetricType getType() {
+ public String getType() {
return type;
}
- public void setType(MetricType type) {
+ public void setType(String type) {
this.type = type;
}
http://git-wip-us.apache.org/repos/asf/ambari/blob/17afb926/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/cache/TimelineMetricsCache.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/cache/TimelineMetricsCache.java b/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/cache/TimelineMetricsCache.java
index 4e9e36e..15bd5f4 100644
--- a/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/cache/TimelineMetricsCache.java
+++ b/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/cache/TimelineMetricsCache.java
@@ -38,11 +38,18 @@ public class TimelineMetricsCache {
public static final int MAX_EVICTION_TIME_MILLIS = 59000; // ~ 1 min
private final int maxRecsPerName;
private final int maxEvictionTimeInMillis;
+ private boolean skipCounterTransform = true;
private final Map<String, Double> counterMetricLastValue = new HashMap<String, Double>();
public TimelineMetricsCache(int maxRecsPerName, int maxEvictionTimeInMillis) {
+ this(maxRecsPerName, maxEvictionTimeInMillis, false);
+ }
+
+ public TimelineMetricsCache(int maxRecsPerName, int maxEvictionTimeInMillis,
+ boolean skipCounterTransform) {
this.maxRecsPerName = maxRecsPerName;
this.maxEvictionTimeInMillis = maxEvictionTimeInMillis;
+ this.skipCounterTransform = skipCounterTransform;
}
class TimelineMetricWrapper {
@@ -171,7 +178,7 @@ public class TimelineMetricsCache {
}
public void putTimelineMetric(TimelineMetric timelineMetric, boolean isCounter) {
- if (isCounter) {
+ if (isCounter && !skipCounterTransform) {
transformMetricValuesToDerivative(timelineMetric);
}
putTimelineMetric(timelineMetric);
http://git-wip-us.apache.org/repos/asf/ambari/blob/17afb926/ambari-metrics/ambari-metrics-flume-sink/src/main/java/org/apache/hadoop/metrics2/sink/flume/FlumeTimelineMetricsSink.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-flume-sink/src/main/java/org/apache/hadoop/metrics2/sink/flume/FlumeTimelineMetricsSink.java b/ambari-metrics/ambari-metrics-flume-sink/src/main/java/org/apache/hadoop/metrics2/sink/flume/FlumeTimelineMetricsSink.java
index 0257ada..cf2b4ae 100644
--- a/ambari-metrics/ambari-metrics-flume-sink/src/main/java/org/apache/hadoop/metrics2/sink/flume/FlumeTimelineMetricsSink.java
+++ b/ambari-metrics/ambari-metrics-flume-sink/src/main/java/org/apache/hadoop/metrics2/sink/flume/FlumeTimelineMetricsSink.java
@@ -135,12 +135,11 @@ public class FlumeTimelineMetricsSink extends AbstractTimelineMetricsSink implem
public void run() {
LOG.debug("Collecting Metrics for Flume");
try {
- Map<String, Map<String, String>> metricsMap =
- JMXPollUtil.getAllMBeans();
+ Map<String, Map<String, String>> metricsMap = JMXPollUtil.getAllMBeans();
long currentTimeMillis = System.currentTimeMillis();
for (String component : metricsMap.keySet()) {
Map<String, String> attributeMap = metricsMap.get(component);
- LOG.info("Attributes for component " + component);
+ LOG.debug("Attributes for component " + component);
processComponentAttributes(currentTimeMillis, component, attributeMap);
}
} catch (UnableToConnectException uce) {
@@ -188,8 +187,6 @@ public class FlumeTimelineMetricsSink extends AbstractTimelineMetricsSink implem
timelineMetric.setInstanceId(component);
timelineMetric.setAppId("FLUME_HANDLER");
timelineMetric.setStartTime(currentTimeMillis);
- timelineMetric.setType(ClassUtils.getShortCanonicalName(
- attributeValue, "Number"));
timelineMetric.getMetricValues().put(currentTimeMillis, Double.parseDouble(attributeValue));
return timelineMetric;
}
http://git-wip-us.apache.org/repos/asf/ambari/blob/17afb926/ambari-metrics/ambari-metrics-hadoop-sink/src/main/java/org/apache/hadoop/metrics2/sink/timeline/HadoopTimelineMetricsSink.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-hadoop-sink/src/main/java/org/apache/hadoop/metrics2/sink/timeline/HadoopTimelineMetricsSink.java b/ambari-metrics/ambari-metrics-hadoop-sink/src/main/java/org/apache/hadoop/metrics2/sink/timeline/HadoopTimelineMetricsSink.java
index f23dc42..000b82e 100644
--- a/ambari-metrics/ambari-metrics-hadoop-sink/src/main/java/org/apache/hadoop/metrics2/sink/timeline/HadoopTimelineMetricsSink.java
+++ b/ambari-metrics/ambari-metrics-hadoop-sink/src/main/java/org/apache/hadoop/metrics2/sink/timeline/HadoopTimelineMetricsSink.java
@@ -96,7 +96,9 @@ public class HadoopTimelineMetricsSink extends AbstractTimelineMetricsSink imple
TimelineMetricsCache.MAX_RECS_PER_NAME_DEFAULT);
int metricsSendInterval = conf.getInt(METRICS_SEND_INTERVAL,
TimelineMetricsCache.MAX_EVICTION_TIME_MILLIS); // ~ 1 min
- metricsCache = new TimelineMetricsCache(maxRowCacheSize, metricsSendInterval);
+ // Skip aggregation of counter values by calculating derivative
+ metricsCache = new TimelineMetricsCache(maxRowCacheSize,
+ metricsSendInterval, conf.getBoolean(SKIP_COUNTER_TRANSFROMATION, true));
conf.setListDelimiter(',');
Iterator<String> it = (Iterator<String>) conf.getKeys();
@@ -186,7 +188,7 @@ public class HadoopTimelineMetricsSink extends AbstractTimelineMetricsSink imple
timelineMetric.setHostName(hostName);
timelineMetric.setAppId(serviceName);
timelineMetric.setStartTime(startTime);
- timelineMetric.setType(ClassUtils.getShortCanonicalName(value, "Number"));
+ timelineMetric.setType(metric.type() != null ? metric.type().name() : null);
timelineMetric.getMetricValues().put(startTime, value.doubleValue());
// Put intermediate values into the cache until it is time to send
boolean isCounter = MetricType.COUNTER == metric.type();
http://git-wip-us.apache.org/repos/asf/ambari/blob/17afb926/ambari-metrics/ambari-metrics-hadoop-sink/src/test/java/org/apache/hadoop/metrics2/sink/timeline/HadoopTimelineMetricsSinkTest.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-hadoop-sink/src/test/java/org/apache/hadoop/metrics2/sink/timeline/HadoopTimelineMetricsSinkTest.java b/ambari-metrics/ambari-metrics-hadoop-sink/src/test/java/org/apache/hadoop/metrics2/sink/timeline/HadoopTimelineMetricsSinkTest.java
index a69b7c7..6b23f36 100644
--- a/ambari-metrics/ambari-metrics-hadoop-sink/src/test/java/org/apache/hadoop/metrics2/sink/timeline/HadoopTimelineMetricsSinkTest.java
+++ b/ambari-metrics/ambari-metrics-hadoop-sink/src/test/java/org/apache/hadoop/metrics2/sink/timeline/HadoopTimelineMetricsSinkTest.java
@@ -40,6 +40,7 @@ import java.util.List;
import org.apache.commons.configuration.SubsetConfiguration;
import org.apache.hadoop.metrics2.AbstractMetric;
+import org.apache.hadoop.metrics2.MetricType;
import org.apache.hadoop.metrics2.MetricsRecord;
import org.easymock.EasyMock;
import org.easymock.IAnswer;
@@ -90,6 +91,7 @@ public class HadoopTimelineMetricsSinkTest {
AbstractMetric metric = createNiceMock(AbstractMetric.class);
expect(metric.name()).andReturn("metricName").anyTimes();
expect(metric.value()).andReturn(9.5687).anyTimes();
+ expect(metric.type()).andReturn(MetricType.COUNTER).anyTimes();
//TODO currently only numeric metrics are supported
MetricsRecord record = createNiceMock(MetricsRecord.class);
@@ -104,7 +106,6 @@ public class HadoopTimelineMetricsSinkTest {
expect(record.metrics()).andReturn(Arrays.asList(metric)).anyTimes();
-
replay(conf, record, metric);
sink.init(conf);
@@ -239,4 +240,6 @@ public class HadoopTimelineMetricsSinkTest {
Assert.assertEquals(new Double(5.0), values.next());
Assert.assertEquals(new Double(6.0), values.next());
}
+
+
}
http://git-wip-us.apache.org/repos/asf/ambari/blob/17afb926/ambari-metrics/ambari-metrics-kafka-sink/src/main/java/org/apache/hadoop/metrics2/sink/kafka/KafkaTimelineMetricsReporter.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-kafka-sink/src/main/java/org/apache/hadoop/metrics2/sink/kafka/KafkaTimelineMetricsReporter.java b/ambari-metrics/ambari-metrics-kafka-sink/src/main/java/org/apache/hadoop/metrics2/sink/kafka/KafkaTimelineMetricsReporter.java
index ff2db1d..4915435 100644
--- a/ambari-metrics/ambari-metrics-kafka-sink/src/main/java/org/apache/hadoop/metrics2/sink/kafka/KafkaTimelineMetricsReporter.java
+++ b/ambari-metrics/ambari-metrics-kafka-sink/src/main/java/org/apache/hadoop/metrics2/sink/kafka/KafkaTimelineMetricsReporter.java
@@ -42,7 +42,6 @@ import org.apache.hadoop.metrics2.sink.timeline.AbstractTimelineMetricsSink;
import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics;
import org.apache.hadoop.metrics2.sink.timeline.cache.TimelineMetricsCache;
-
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.ArrayList;
@@ -51,7 +50,7 @@ import java.util.List;
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.TimeUnit;
-
+import static org.apache.hadoop.metrics2.sink.timeline.TimelineMetricMetadata.MetricType;
import static org.apache.hadoop.metrics2.sink.timeline.cache.TimelineMetricsCache.MAX_EVICTION_TIME_MILLIS;
import static org.apache.hadoop.metrics2.sink.timeline.cache.TimelineMetricsCache.MAX_RECS_PER_NAME_DEFAULT;
@@ -280,7 +279,7 @@ public class KafkaTimelineMetricsReporter extends AbstractTimelineMetricsSink
String[] metricNames = cacheKafkaMetered(currentTimeMillis, sanitizedName, meter);
- populateMetricsList(context, metricNames);
+ populateMetricsList(context, MetricType.GAUGE, metricNames);
}
@Override
@@ -291,7 +290,7 @@ public class KafkaTimelineMetricsReporter extends AbstractTimelineMetricsSink
final String metricCountName = cacheSanitizedTimelineMetric(currentTimeMillis, sanitizedName,
COUNT_SUFIX, counter.count());
- populateMetricsList(context, metricCountName);
+ populateMetricsList(context, MetricType.COUNTER, metricCountName);
}
@Override
@@ -305,7 +304,7 @@ public class KafkaTimelineMetricsReporter extends AbstractTimelineMetricsSink
String[] metricNames = (String[]) ArrayUtils.addAll(metricHNames, metricSNames);
- populateMetricsList(context, metricNames);
+ populateMetricsList(context, MetricType.GAUGE, metricNames);
}
@Override
@@ -321,7 +320,7 @@ public class KafkaTimelineMetricsReporter extends AbstractTimelineMetricsSink
String[] metricNames = (String[]) ArrayUtils.addAll(metricMNames, metricTNames);
metricNames = (String[]) ArrayUtils.addAll(metricNames, metricSNames);
- populateMetricsList(context, metricNames);
+ populateMetricsList(context, MetricType.GAUGE, metricNames);
}
@Override
@@ -331,7 +330,7 @@ public class KafkaTimelineMetricsReporter extends AbstractTimelineMetricsSink
cacheSanitizedTimelineMetric(currentTimeMillis, sanitizedName, "", Double.parseDouble(String.valueOf(gauge.value())));
- populateMetricsList(context, sanitizedName);
+ populateMetricsList(context, MetricType.GAUGE, sanitizedName);
}
private String[] cacheKafkaMetered(long currentTimeMillis, String sanitizedName, Metered meter) {
@@ -393,10 +392,11 @@ public class KafkaTimelineMetricsReporter extends AbstractTimelineMetricsSink
return meterName;
}
- private void populateMetricsList(Context context, String... metricNames) {
+ private void populateMetricsList(Context context, MetricType type, String... metricNames) {
for (String metricName : metricNames) {
TimelineMetric cachedMetric = metricsCache.getTimelineMetric(metricName);
if (cachedMetric != null) {
+ cachedMetric.setType(type.name());
context.getTimelineMetricList().add(cachedMetric);
}
}
http://git-wip-us.apache.org/repos/asf/ambari/blob/17afb926/ambari-metrics/ambari-metrics-storm-sink/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsReporter.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-storm-sink/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsReporter.java b/ambari-metrics/ambari-metrics-storm-sink/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsReporter.java
index 73e3de8..f054f16 100644
--- a/ambari-metrics/ambari-metrics-storm-sink/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsReporter.java
+++ b/ambari-metrics/ambari-metrics-storm-sink/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsReporter.java
@@ -153,8 +153,6 @@ public class StormTimelineMetricsReporter extends AbstractTimelineMetricsSink
timelineMetric.setHostName(hostname);
timelineMetric.setAppId(component);
timelineMetric.setStartTime(currentTimeMillis);
- timelineMetric.setType(ClassUtils.getShortCanonicalName(
- attributeValue, "Number"));
timelineMetric.getMetricValues().put(currentTimeMillis, Double.parseDouble(attributeValue));
return timelineMetric;
}
http://git-wip-us.apache.org/repos/asf/ambari/blob/17afb926/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricStore.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricStore.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricStore.java
index c30a354..5ee8b44 100644
--- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricStore.java
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricStore.java
@@ -202,16 +202,18 @@ public class HBaseTimelineMetricStore extends AbstractService implements Timelin
for (TimelineMetric metric : metricsList){
String name = metric.getMetricName();
if (name.contains("._rate")){
- updateValueAsRate(metric.getMetricValues());
+ updateValuesAsRate(metric.getMetricValues());
}
}
return metrics;
}
- private Map<Long, Double> updateValueAsRate(Map<Long, Double> metricValues) {
+ static Map<Long, Double> updateValuesAsRate(Map<Long, Double> metricValues) {
Long prevTime = null;
+ Double prevVal = null;
long step;
+ Double diff;
for (Map.Entry<Long, Double> timeValueEntry : metricValues.entrySet()) {
Long currTime = timeValueEntry.getKey();
@@ -219,21 +221,22 @@ public class HBaseTimelineMetricStore extends AbstractService implements Timelin
if (prevTime != null) {
step = currTime - prevTime;
- Double rate = currVal / TimeUnit.MILLISECONDS.toSeconds(step);
+ diff = currVal - prevVal;
+ Double rate = diff / TimeUnit.MILLISECONDS.toSeconds(step);
timeValueEntry.setValue(rate);
} else {
timeValueEntry.setValue(0.0);
}
prevTime = currTime;
+ prevVal = currVal;
}
return metricValues;
}
- public static HashMap<String, List<Function>> parseMetricNamesToAggregationFunctions(List<String> metricNames) {
- HashMap<String, List<Function>> metricsFunctions = new HashMap<String,
- List<Function>>();
+ static HashMap<String, List<Function>> parseMetricNamesToAggregationFunctions(List<String> metricNames) {
+ HashMap<String, List<Function>> metricsFunctions = new HashMap<>();
for (String metricName : metricNames){
Function function = Function.DEFAULT_VALUE_FUNCTION;
@@ -242,7 +245,7 @@ public class HBaseTimelineMetricStore extends AbstractService implements Timelin
try {
function = Function.fromMetricName(metricName);
int functionStartIndex = metricName.indexOf("._");
- if(functionStartIndex > 0 ) {
+ if (functionStartIndex > 0) {
cleanMetricName = metricName.substring(0, functionStartIndex);
}
} catch (Function.FunctionFormatException ffe){
@@ -252,7 +255,7 @@ public class HBaseTimelineMetricStore extends AbstractService implements Timelin
List<Function> functionsList = metricsFunctions.get(cleanMetricName);
if (functionsList == null) {
- functionsList = new ArrayList<Function>(1);
+ functionsList = new ArrayList<>(1);
}
functionsList.add(function);
metricsFunctions.put(cleanMetricName, functionsList);
http://git-wip-us.apache.org/repos/asf/ambari/blob/17afb926/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessor.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessor.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessor.java
index 53eb9d2..5067454 100644
--- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessor.java
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessor.java
@@ -17,7 +17,6 @@
*/
package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline;
-import com.google.common.base.Enums;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -66,7 +65,6 @@ import java.util.TreeMap;
import java.util.concurrent.TimeUnit;
import static java.util.concurrent.TimeUnit.SECONDS;
-import static org.apache.hadoop.metrics2.sink.timeline.TimelineMetricMetadata.*;
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.AGGREGATE_TABLE_SPLIT_POINTS;
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_DAILY_TABLE_TTL;
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_HOUR_TABLE_TTL;
@@ -164,51 +162,12 @@ public class PhoenixHBaseAccessor {
}
private static TimelineMetric getLastTimelineMetricFromResultSet(ResultSet rs)
- throws SQLException, IOException {
+ throws SQLException, IOException {
TimelineMetric metric = TIMELINE_METRIC_READ_HELPER.getTimelineMetricCommonsFromResultSet(rs);
metric.setMetricValues(readLastMetricValueFromJSON(rs.getString("METRICS")));
return metric;
}
- public static SingleValuedTimelineMetric getAggregatedTimelineMetricFromResultSet(
- ResultSet rs, Function f) throws SQLException, IOException {
-
- SingleValuedTimelineMetric metric = new SingleValuedTimelineMetric(
- rs.getString("METRIC_NAME") + f.getSuffix(),
- rs.getString("APP_ID"),
- rs.getString("INSTANCE_ID"),
- rs.getString("HOSTNAME"),
- rs.getLong("SERVER_TIME"),
- rs.getLong("SERVER_TIME"),
- rs.getString("UNITS")
- );
-
- // get functions for metricnames
-
- double value;
- switch(f.getReadFunction()){
- case AVG:
- value = rs.getDouble("METRIC_SUM") / rs.getInt("METRIC_COUNT");
- break;
- case MIN:
- value = rs.getDouble("METRIC_MIN");
- break;
- case MAX:
- value = rs.getDouble("METRIC_MAX");
- break;
- case SUM:
- value = rs.getDouble("METRIC_SUM");
- break;
- default:
- value = rs.getDouble("METRIC_SUM") / rs.getInt("METRIC_COUNT");
- break;
- }
-
- metric.setSingleTimeseriesValue(rs.getLong("SERVER_TIME"), value);
-
- return metric;
- }
-
private static TreeMap<Long, Double> readLastMetricValueFromJSON(String json)
throws IOException {
TreeMap<Long, Double> values = readMetricFromJSON(json);
@@ -434,7 +393,7 @@ public class PhoenixHBaseAccessor {
metricRecordStmt.setString(4, metric.getInstanceId());
metricRecordStmt.setLong(5, currentTime);
metricRecordStmt.setLong(6, metric.getStartTime());
- metricRecordStmt.setString(7, metric.getType());
+ metricRecordStmt.setString(7, metric.getUnits());
metricRecordStmt.setDouble(8, aggregates[0]);
metricRecordStmt.setDouble(9, aggregates[1]);
metricRecordStmt.setDouble(10, aggregates[2]);
@@ -496,7 +455,7 @@ public class PhoenixHBaseAccessor {
try {
//get latest
- if(condition.isPointInTime()){
+ if (condition.isPointInTime()){
getLatestMetricRecords(condition, conn, metrics);
} else {
if (condition.getEndTime() >= condition.getStartTime()) {
@@ -578,19 +537,24 @@ public class PhoenixHBaseAccessor {
return metrics;
}
- private void appendMetricFromResultSet(
- TimelineMetrics metrics, Condition condition, Map<String,
- List<Function>> metricFunctions, ResultSet rs)
- throws SQLException, IOException {
- if (condition.getPrecision() == Precision.HOURS
- || condition.getPrecision() == Precision.MINUTES
- || condition.getPrecision() == Precision.DAYS) {
-
- String metricName = rs.getString("METRIC_NAME");
- List<Function> functions = metricFunctions.get(metricName);
+ /**
+ * Apply aggregate function to the result if supplied else get precision
+ * or aggregate data with default function applied.
+ */
+ private void appendMetricFromResultSet(TimelineMetrics metrics, Condition condition,
+ Map<String, List<Function>> metricFunctions,
+ ResultSet rs) throws SQLException, IOException {
+ String metricName = rs.getString("METRIC_NAME");
+ List<Function> functions = metricFunctions.get(metricName);
+ // Apply aggregation function if present
+ if (functions != null && !functions.isEmpty()) {
+ if (functions.size() > 1) {
+ throw new IllegalArgumentException("Multiple aggregate functions not supported.");
+ }
for (Function f : functions) {
- SingleValuedTimelineMetric metric = getAggregatedTimelineMetricFromResultSet(rs, f);
+ SingleValuedTimelineMetric metric =
+ TIMELINE_METRIC_READ_HELPER.getAggregatedTimelineMetricFromResultSet(rs, f);
if (condition.isGrouped()) {
metrics.addOrMergeTimelineMetric(metric);
@@ -598,28 +562,35 @@ public class PhoenixHBaseAccessor {
metrics.getMetrics().add(metric.getTimelineMetric());
}
}
- }
- else {
- TimelineMetric metric = TIMELINE_METRIC_READ_HELPER.getTimelineMetricFromResultSet(rs);
+ } else {
+ // No aggregation requested
+ if (condition.getPrecision().equals(Precision.SECONDS)) {
+ TimelineMetric metric = TIMELINE_METRIC_READ_HELPER.getTimelineMetricFromResultSet(rs);
+ if (condition.isGrouped()) {
+ metrics.addOrMergeTimelineMetric(metric);
+ } else {
+ metrics.getMetrics().add(metric);
+ }
- if (condition.isGrouped()) {
- metrics.addOrMergeTimelineMetric(metric);
} else {
- metrics.getMetrics().add(metric);
+ SingleValuedTimelineMetric metric =
+ TIMELINE_METRIC_READ_HELPER.getAggregatedTimelineMetricFromResultSet(rs,
+ Function.DEFAULT_VALUE_FUNCTION);
+ if (condition.isGrouped()) {
+ metrics.addOrMergeTimelineMetric(metric);
+ } else {
+ metrics.getMetrics().add(metric.getTimelineMetric());
+ }
}
}
}
- private void getLatestMetricRecords(
- Condition condition, Connection conn, TimelineMetrics metrics)
- throws SQLException, IOException {
+ private void getLatestMetricRecords(Condition condition, Connection conn,
+ TimelineMetrics metrics) throws SQLException, IOException {
validateConditionIsNotEmpty(condition);
- PreparedStatement stmt;
-
- stmt = PhoenixTransactSQL.prepareGetLatestMetricSqlStmt(conn,
- condition);
+ PreparedStatement stmt = PhoenixTransactSQL.prepareGetLatestMetricSqlStmt(conn, condition);
ResultSet rs = null;
try {
rs = stmt.executeQuery();
@@ -1137,7 +1108,7 @@ public class PhoenixHBaseAccessor {
stmt.setString(1, metadata.getMetricName());
stmt.setString(2, metadata.getAppId());
stmt.setString(3, metadata.getUnits());
- stmt.setString(4, metadata.getType().name());
+ stmt.setString(4, metadata.getType());
stmt.setLong(5, metadata.getSeriesStartTime());
stmt.setBoolean(6, metadata.isSupportsAggregates());
@@ -1230,7 +1201,7 @@ public class PhoenixHBaseAccessor {
metricName,
appId,
rs.getString("UNITS"),
- Enums.getIfPresent(MetricType.class, rs.getString("TYPE")).or(MetricType.UNDEFINED),
+ rs.getString("TYPE"),
rs.getLong("START_TIME"),
rs.getBoolean("SUPPORTS_AGGREGATION")
);
http://git-wip-us.apache.org/repos/asf/ambari/blob/17afb926/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/Function.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/Function.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/Function.java
index 8292657..6f408a5 100644
--- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/Function.java
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/Function.java
@@ -17,6 +17,8 @@
*/
package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators;
+import java.util.Arrays;
+
/**
* Is used to determine metrics aggregate table.
*
@@ -24,8 +26,7 @@ package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline
* @see org.apache.hadoop.yarn.server.applicationhistoryservice.webapp.TimelineWebServices#getTimelineMetrics
*/
public class Function {
- public static Function DEFAULT_VALUE_FUNCTION =
- new Function(ReadFunction.VALUE, null);
+ public static Function DEFAULT_VALUE_FUNCTION = new Function(ReadFunction.VALUE, null);
private static final String SUFFIX_SEPARATOR = "\\._";
private ReadFunction readFunction = ReadFunction.VALUE;
@@ -42,7 +43,13 @@ public class Function {
this.postProcessingFunction = ppFunction;
}
- public static Function fromMetricName(String metricName){
+ /**
+ * Segregate post processing function eg: rate from aggregate function,
+ * example: avg, in any order
+ * @param metricName metric name from request
+ * @return @Function
+ */
+ public static Function fromMetricName(String metricName) {
// gets postprocessing, and aggregation function
// ex. Metric._rate._avg
String[] parts = metricName.split(SUFFIX_SEPARATOR);
@@ -50,14 +57,31 @@ public class Function {
ReadFunction readFunction = ReadFunction.VALUE;
PostProcessingFunction ppFunction = null;
- if (parts.length == 3) {
- ppFunction = PostProcessingFunction.getFunction(parts[1]);
- readFunction = ReadFunction.getFunction(parts[2]);
- } else if (parts.length == 2) {
- ppFunction = null;
- readFunction = ReadFunction.getFunction(parts[1]);
+ if (parts.length <= 1) {
+ return new Function(readFunction, null);
+ }
+ if (parts.length > 3) {
+ throw new IllegalArgumentException("Invalid number of functions specified.");
+ }
+
+ // Parse functions
+ boolean isSuccessful = false; // Best effort
+ for (String part : parts) {
+ if (ReadFunction.isPresent(part)) {
+ readFunction = ReadFunction.getFunction(part);
+ isSuccessful = true;
+ }
+ if (PostProcessingFunction.isPresent(part)) {
+ ppFunction = PostProcessingFunction.getFunction(part);
+ isSuccessful = true;
}
+ }
+ // Throw exception if parsing failed
+ if (!isSuccessful) {
+ throw new FunctionFormatException("Could not parse provided functions: " +
+ "" + Arrays.asList(parts));
+ }
return new Function(readFunction, ppFunction);
}
@@ -113,8 +137,16 @@ public class Function {
return suffix;
}
- public static PostProcessingFunction getFunction(String functionName) throws
- FunctionFormatException {
+ public static boolean isPresent(String functionName) {
+ try {
+ PostProcessingFunction.valueOf(functionName.toUpperCase());
+ } catch (IllegalArgumentException e) {
+ return false;
+ }
+ return true;
+ }
+
+ public static PostProcessingFunction getFunction(String functionName) throws FunctionFormatException {
if (functionName == null) {
return NONE;
}
@@ -122,8 +154,7 @@ public class Function {
try {
return PostProcessingFunction.valueOf(functionName.toUpperCase());
} catch (IllegalArgumentException e) {
- throw new FunctionFormatException("Function should be value, avg, min, " +
- "max", e);
+ throw new FunctionFormatException("Function should be ._rate", e);
}
}
}
@@ -145,8 +176,16 @@ public class Function {
return suffix;
}
- public static ReadFunction getFunction(String functionName) throws
- FunctionFormatException {
+ public static boolean isPresent(String functionName) {
+ try {
+ ReadFunction.valueOf(functionName.toUpperCase());
+ } catch (IllegalArgumentException e) {
+ return false;
+ }
+ return true;
+ }
+
+ public static ReadFunction getFunction(String functionName) throws FunctionFormatException {
if (functionName == null) {
return VALUE;
}
@@ -154,12 +193,16 @@ public class Function {
return ReadFunction.valueOf(functionName.toUpperCase());
} catch (IllegalArgumentException e) {
throw new FunctionFormatException(
- "Function should be value, avg, min, max. Got " + functionName, e);
+ "Function should be sum, avg, min, max. Got " + functionName, e);
}
}
}
public static class FunctionFormatException extends IllegalArgumentException {
+ public FunctionFormatException(String message) {
+ super(message);
+ }
+
public FunctionFormatException(String message, Throwable cause) {
super(message, cause);
}
http://git-wip-us.apache.org/repos/asf/ambari/blob/17afb926/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricReadHelper.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricReadHelper.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricReadHelper.java
index c5e60fe..846ae92 100644
--- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricReadHelper.java
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricReadHelper.java
@@ -18,6 +18,7 @@
package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators;
+import org.apache.hadoop.metrics2.sink.timeline.SingleValuedTimelineMetric;
import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixHBaseAccessor;
@@ -46,6 +47,43 @@ public class TimelineMetricReadHelper {
return metric;
}
+ public SingleValuedTimelineMetric getAggregatedTimelineMetricFromResultSet(ResultSet rs,
+ Function f) throws SQLException, IOException {
+
+ SingleValuedTimelineMetric metric = new SingleValuedTimelineMetric(
+ rs.getString("METRIC_NAME") + f.getSuffix(),
+ rs.getString("APP_ID"),
+ rs.getString("INSTANCE_ID"),
+ rs.getString("HOSTNAME"),
+ rs.getLong("SERVER_TIME"),
+ rs.getLong("SERVER_TIME"),
+ rs.getString("UNITS")
+ );
+
+ double value;
+ switch(f.getReadFunction()){
+ case AVG:
+ value = rs.getDouble("METRIC_SUM") / rs.getInt("METRIC_COUNT");
+ break;
+ case MIN:
+ value = rs.getDouble("METRIC_MIN");
+ break;
+ case MAX:
+ value = rs.getDouble("METRIC_MAX");
+ break;
+ case SUM:
+ value = rs.getDouble("METRIC_SUM");
+ break;
+ default:
+ value = rs.getDouble("METRIC_SUM") / rs.getInt("METRIC_COUNT");
+ break;
+ }
+
+ metric.setSingleTimeseriesValue(rs.getLong("SERVER_TIME"), value);
+
+ return metric;
+ }
+
/**
* Returns common part of timeline metrics record without the values.
*/
http://git-wip-us.apache.org/repos/asf/ambari/blob/17afb926/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/discovery/TimelineMetricMetadataManager.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/discovery/TimelineMetricMetadataManager.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/discovery/TimelineMetricMetadataManager.java
index 1c1a1dc..8e58203 100644
--- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/discovery/TimelineMetricMetadataManager.java
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/discovery/TimelineMetricMetadataManager.java
@@ -36,7 +36,6 @@ import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
-import static org.apache.hadoop.metrics2.sink.timeline.TimelineMetricMetadata.MetricType.UNDEFINED;
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.DISABLE_METRIC_METADATA_MGMT;
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.METRICS_METADATA_SYNC_INIT_DELAY;
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.METRICS_METADATA_SYNC_SCHEDULE_DELAY;
@@ -166,8 +165,8 @@ public class TimelineMetricMetadataManager {
return new TimelineMetricMetadata(
timelineMetric.getMetricName(),
timelineMetric.getAppId(),
- timelineMetric.getType(), // Present type and unit are synonyms
- UNDEFINED, // TODO: Add support for types in the application
+ timelineMetric.getUnits(),
+ timelineMetric.getType(),
timelineMetric.getStartTime(),
true
);
http://git-wip-us.apache.org/repos/asf/ambari/blob/17afb926/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/FunctionTest.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/FunctionTest.java b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/FunctionTest.java
index af9c6bb..46bc6f8 100644
--- a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/FunctionTest.java
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/FunctionTest.java
@@ -18,6 +18,7 @@
package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline;
import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.Function;
+import org.junit.Ignore;
import org.junit.Test;
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.Function.fromMetricName;
@@ -32,17 +33,20 @@ public class FunctionTest {
Function f = fromMetricName("Metric._avg");
assertThat(f).isEqualTo(new Function(AVG, null));
-
f = fromMetricName("Metric._rate._avg");
assertThat(f).isEqualTo(new Function(AVG, RATE));
f = fromMetricName("bytes_in");
assertThat(f).isEqualTo(Function.DEFAULT_VALUE_FUNCTION);
- }
+ // Rate support without aggregates
+ f = fromMetricName("Metric._rate");
+ assertThat(f).isEqualTo(new Function(null, RATE));
+ }
+ @Ignore // If unknown function: behavior is best effort query without function
@Test(expected = Function.FunctionFormatException.class)
public void testNotAFunction() throws Exception {
- Function f = fromMetricName("bytes._not._afunction");
+ fromMetricName("bytes._not._afunction");
}
}
http://git-wip-us.apache.org/repos/asf/ambari/blob/17afb926/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricStoreTest.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricStoreTest.java b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricStoreTest.java
index 8233b3f..512a7db 100644
--- a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricStoreTest.java
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricStoreTest.java
@@ -17,12 +17,15 @@
*/
package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline;
+import junit.framework.Assert;
import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.Function;
import org.junit.Test;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.Function.ReadFunction.AVG;
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.Function.PostProcessingFunction.RATE;
@@ -32,8 +35,7 @@ public class HBaseTimelineMetricStoreTest {
public static final String MEM_METRIC = "mem";
public static final String BYTES_IN_METRIC = "bytes_in";
- public static final String BYTES_NOT_AFUNCTION_METRIC = "bytes._not" +
- "._afunction";
+ public static final String BYTES_NOT_AFUNCTION_METRIC = "bytes._not._afunction";
@Test
public void testParseMetricNamesToAggregationFunctions() throws Exception {
@@ -45,8 +47,8 @@ public class HBaseTimelineMetricStoreTest {
BYTES_NOT_AFUNCTION_METRIC);
//when
- HashMap<String, List<Function>> mfm = HBaseTimelineMetricStore
- .parseMetricNamesToAggregationFunctions(metricNames);
+ HashMap<String, List<Function>> mfm =
+ HBaseTimelineMetricStore.parseMetricNamesToAggregationFunctions(metricNames);
//then
assertThat(mfm).hasSize(3)
@@ -63,4 +65,25 @@ public class HBaseTimelineMetricStoreTest {
.contains(Function.DEFAULT_VALUE_FUNCTION);
}
+
+ @Test
+ public void testRateCalculationOnMetricsWithEqualValues() throws Exception {
+ Map<Long, Double> metricValues = new TreeMap<>();
+ metricValues.put(1454016368371L, 1011.25);
+ metricValues.put(1454016428371L, 1011.25);
+ metricValues.put(1454016488371L, 1011.25);
+ metricValues.put(1454016548371L, 1011.25);
+ metricValues.put(1454016608371L, 1011.25);
+ metricValues.put(1454016668371L, 1011.25);
+ metricValues.put(1454016728371L, 1011.25);
+
+ // Calculate rate
+ Map<Long, Double> rates = HBaseTimelineMetricStore.updateValuesAsRate(new TreeMap<>(metricValues));
+
+ // Make sure rate is zero
+ for (Map.Entry<Long, Double> rateEntry : rates.entrySet()) {
+ Assert.assertEquals("Rate should be zero, key = " + rateEntry.getKey()
+ + ", value = " + rateEntry.getValue(), 0.0, rateEntry.getValue());
+ }
+ }
}