You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by sw...@apache.org on 2015/01/05 23:35:48 UTC
ambari git commit: AMBARI-8994. AMS : Yarn service - RPC metrics
returns duplicate array elements. (swagle)
Repository: ambari
Updated Branches:
refs/heads/trunk 83b8ab969 -> dec5f7d69
AMBARI-8994. AMS : Yarn service - RPC metrics returns duplicate array elements. (swagle)
Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/dec5f7d6
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/dec5f7d6
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/dec5f7d6
Branch: refs/heads/trunk
Commit: dec5f7d69c43db12a4080d6e65de8be102936fb1
Parents: 83b8ab9
Author: Siddharth Wagle <sw...@hortonworks.com>
Authored: Mon Jan 5 14:19:27 2015 -0800
Committer: Siddharth Wagle <sw...@hortonworks.com>
Committed: Mon Jan 5 14:19:34 2015 -0800
----------------------------------------------------------------------
.../timeline/AbstractTimelineMetricsSink.java | 83 +++++++++++
.../base/AbstractTimelineMetricsSink.java | 79 -----------
.../timeline/cache/TimelineMetricsCache.java | 30 +++-
.../sink/flume/FlumeTimelineMetricsSink.java | 2 +-
.../timeline/HadoopTimelineMetricsSink.java | 11 +-
.../timeline/HadoopTimelineMetricsSinkTest.java | 142 ++++++++++++++++++-
.../sink/storm/StormTimelineMetricsSink.java | 2 +-
7 files changed, 257 insertions(+), 92 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ambari/blob/dec5f7d6/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/AbstractTimelineMetricsSink.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/AbstractTimelineMetricsSink.java b/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/AbstractTimelineMetricsSink.java
new file mode 100644
index 0000000..a382ccb
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/AbstractTimelineMetricsSink.java
@@ -0,0 +1,83 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.metrics2.sink.timeline;
+
+import org.apache.commons.httpclient.HttpClient;
+import org.apache.commons.httpclient.methods.PostMethod;
+import org.apache.commons.httpclient.methods.StringRequestEntity;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics;
+import org.codehaus.jackson.map.AnnotationIntrospector;
+import org.codehaus.jackson.map.ObjectMapper;
+import org.codehaus.jackson.map.annotate.JsonSerialize;
+import org.codehaus.jackson.xc.JaxbAnnotationIntrospector;
+
+import java.io.IOException;
+import java.net.SocketAddress;
+
+public abstract class AbstractTimelineMetricsSink {
+ public static final String TAGS_FOR_PREFIX_PROPERTY_PREFIX = "tagsForPrefix.";
+ public static final String MAX_METRIC_ROW_CACHE_SIZE = "maxRowCacheSize";
+ public static final String METRICS_SEND_INTERVAL = "sendInterval";
+ public static final String COLLECTOR_HOST_PROPERTY = "collector";
+
+ protected final Log LOG;
+ private HttpClient httpClient = new HttpClient();
+
+ protected static ObjectMapper mapper;
+
+ static {
+ mapper = new ObjectMapper();
+ AnnotationIntrospector introspector = new JaxbAnnotationIntrospector();
+ mapper.setAnnotationIntrospector(introspector);
+ mapper.getSerializationConfig()
+ .setSerializationInclusion(JsonSerialize.Inclusion.NON_NULL);
+ }
+
+ public AbstractTimelineMetricsSink() {
+ LOG = LogFactory.getLog(this.getClass());
+ }
+
+ protected void emitMetrics(TimelineMetrics metrics) throws IOException {
+ String jsonData = mapper.writeValueAsString(metrics);
+
+ SocketAddress socketAddress = getServerSocketAddress();
+
+ if (socketAddress != null) {
+ StringRequestEntity requestEntity = new StringRequestEntity(jsonData, "application/json", "UTF-8");
+
+ PostMethod postMethod = new PostMethod(getCollectorUri());
+ postMethod.setRequestEntity(requestEntity);
+ int statusCode = httpClient.executeMethod(postMethod);
+ if (statusCode != 200) {
+ LOG.info("Unable to POST metrics to collector, " + getCollectorUri());
+ } else {
+ LOG.debug("Metrics posted to Collector " + getCollectorUri());
+ }
+ }
+ }
+
+ public void setHttpClient(HttpClient httpClient) {
+ this.httpClient = httpClient;
+ }
+
+ abstract protected SocketAddress getServerSocketAddress();
+
+ abstract protected String getCollectorUri();
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/dec5f7d6/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/base/AbstractTimelineMetricsSink.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/base/AbstractTimelineMetricsSink.java b/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/base/AbstractTimelineMetricsSink.java
deleted file mode 100644
index d51ee67..0000000
--- a/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/base/AbstractTimelineMetricsSink.java
+++ /dev/null
@@ -1,79 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.metrics2.sink.timeline.base;
-
-import org.apache.commons.httpclient.HttpClient;
-import org.apache.commons.httpclient.methods.PostMethod;
-import org.apache.commons.httpclient.methods.StringRequestEntity;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics;
-import org.codehaus.jackson.map.AnnotationIntrospector;
-import org.codehaus.jackson.map.ObjectMapper;
-import org.codehaus.jackson.map.annotate.JsonSerialize;
-import org.codehaus.jackson.xc.JaxbAnnotationIntrospector;
-
-import java.io.IOException;
-import java.net.SocketAddress;
-
-public abstract class AbstractTimelineMetricsSink {
- public static final String TAGS_FOR_PREFIX_PROPERTY_PREFIX = "tagsForPrefix.";
- public static final String MAX_METRIC_ROW_CACHE_SIZE = "maxRowCacheSize";
- public static final String METRICS_SEND_INTERVAL = "sendInterval";
- public static final String COLLECTOR_HOST_PROPERTY = "collector";
-
- protected final Log LOG = LogFactory.getLog(this.getClass());
- private HttpClient httpClient = new HttpClient();
-
- protected static ObjectMapper mapper;
-
- static {
- mapper = new ObjectMapper();
- AnnotationIntrospector introspector = new JaxbAnnotationIntrospector();
- mapper.setAnnotationIntrospector(introspector);
- mapper.getSerializationConfig()
- .setSerializationInclusion(JsonSerialize.Inclusion.NON_NULL);
- }
-
- protected void emitMetrics(TimelineMetrics metrics) throws IOException {
- String jsonData = mapper.writeValueAsString(metrics);
-
- SocketAddress socketAddress = getServerSocketAddress();
-
- if (socketAddress != null) {
- StringRequestEntity requestEntity = new StringRequestEntity(jsonData, "application/json", "UTF-8");
-
- PostMethod postMethod = new PostMethod(getCollectorUri());
- postMethod.setRequestEntity(requestEntity);
- int statusCode = httpClient.executeMethod(postMethod);
- if (statusCode != 200) {
- LOG.info("Unable to POST metrics to collector, " + getCollectorUri());
- } else {
- LOG.debug("Metrics posted to Collector " + getCollectorUri());
- }
- }
- }
-
- public void setHttpClient(HttpClient httpClient) {
- this.httpClient = httpClient;
- }
-
- abstract protected SocketAddress getServerSocketAddress();
-
- abstract protected String getCollectorUri();
-}
http://git-wip-us.apache.org/repos/asf/ambari/blob/dec5f7d6/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/cache/TimelineMetricsCache.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/cache/TimelineMetricsCache.java b/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/cache/TimelineMetricsCache.java
index 06c3441..5e89e14 100644
--- a/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/cache/TimelineMetricsCache.java
+++ b/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/cache/TimelineMetricsCache.java
@@ -23,6 +23,7 @@ import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
+import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.Map;
@@ -78,6 +79,9 @@ public class TimelineMetricsCache {
class TimelineMetricHolder extends LinkedHashMap<String, TimelineMetricWrapper> {
private static final long serialVersionUID = 1L;
private boolean gotOverflow = false;
+ // To avoid duplication at the end of the buffer and beginning of the next
+ // segment of values
+ private Map<String, Long> endOfBufferTimestamps = new HashMap<String, Long>();
@Override
protected boolean removeEldestEntry(Map.Entry<String, TimelineMetricWrapper> eldest) {
@@ -93,7 +97,7 @@ public class TimelineMetricsCache {
TimelineMetricWrapper metricWrapper = this.get(metricName);
if (metricWrapper == null
- || metricWrapper.getTimeDiff() < maxEvictionTimeInMillis) {
+ || metricWrapper.getTimeDiff() < getMaxEvictionTimeInMillis()) {
return null;
}
@@ -104,13 +108,27 @@ public class TimelineMetricsCache {
}
public void put(String metricName, TimelineMetric timelineMetric) {
-
+ if (isDuplicate(timelineMetric)) {
+ return;
+ }
TimelineMetricWrapper metric = this.get(metricName);
if (metric == null) {
this.put(metricName, new TimelineMetricWrapper(timelineMetric));
} else {
metric.putMetric(timelineMetric);
}
+ // Buffer last ts value
+ endOfBufferTimestamps.put(metricName, timelineMetric.getStartTime());
+ }
+
+ /**
+ * Test whether last buffered timestamp is same as the newly received.
+ * @param timelineMetric @TimelineMetric
+ * @return true/false
+ */
+ private boolean isDuplicate(TimelineMetric timelineMetric) {
+ return endOfBufferTimestamps.containsKey(timelineMetric.getMetricName())
+ && endOfBufferTimestamps.get(timelineMetric.getMetricName()).equals(timelineMetric.getStartTime());
}
}
@@ -122,6 +140,14 @@ public class TimelineMetricsCache {
return null;
}
+ /**
+ * Getter method to help testing eviction
+ * @return @int
+ */
+ public int getMaxEvictionTimeInMillis() {
+ return maxEvictionTimeInMillis;
+ }
+
public void putTimelineMetric(TimelineMetric timelineMetric) {
timelineMetricCache.put(timelineMetric.getMetricName(), timelineMetric);
}
http://git-wip-us.apache.org/repos/asf/ambari/blob/dec5f7d6/ambari-metrics/ambari-metrics-flume-sink/src/main/java/org/apache/hadoop/metrics2/sink/flume/FlumeTimelineMetricsSink.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-flume-sink/src/main/java/org/apache/hadoop/metrics2/sink/flume/FlumeTimelineMetricsSink.java b/ambari-metrics/ambari-metrics-flume-sink/src/main/java/org/apache/hadoop/metrics2/sink/flume/FlumeTimelineMetricsSink.java
index d28b345..4207426 100644
--- a/ambari-metrics/ambari-metrics-flume-sink/src/main/java/org/apache/hadoop/metrics2/sink/flume/FlumeTimelineMetricsSink.java
+++ b/ambari-metrics/ambari-metrics-flume-sink/src/main/java/org/apache/hadoop/metrics2/sink/flume/FlumeTimelineMetricsSink.java
@@ -26,7 +26,7 @@ import org.apache.flume.instrumentation.MonitorService;
import org.apache.flume.instrumentation.util.JMXPollUtil;
import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics;
-import org.apache.hadoop.metrics2.sink.timeline.base.AbstractTimelineMetricsSink;
+import org.apache.hadoop.metrics2.sink.timeline.AbstractTimelineMetricsSink;
import org.apache.hadoop.metrics2.sink.timeline.cache.TimelineMetricsCache;
import org.apache.hadoop.metrics2.sink.timeline.configuration.Configuration;
import org.apache.hadoop.metrics2.util.Servers;
http://git-wip-us.apache.org/repos/asf/ambari/blob/dec5f7d6/ambari-metrics/ambari-metrics-hadoop-sink/src/main/java/org/apache/hadoop/metrics2/sink/timeline/HadoopTimelineMetricsSink.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-hadoop-sink/src/main/java/org/apache/hadoop/metrics2/sink/timeline/HadoopTimelineMetricsSink.java b/ambari-metrics/ambari-metrics-hadoop-sink/src/main/java/org/apache/hadoop/metrics2/sink/timeline/HadoopTimelineMetricsSink.java
index fb061a9..a8d3a37 100644
--- a/ambari-metrics/ambari-metrics-hadoop-sink/src/main/java/org/apache/hadoop/metrics2/sink/timeline/HadoopTimelineMetricsSink.java
+++ b/ambari-metrics/ambari-metrics-hadoop-sink/src/main/java/org/apache/hadoop/metrics2/sink/timeline/HadoopTimelineMetricsSink.java
@@ -23,7 +23,6 @@ import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.metrics2.*;
import org.apache.hadoop.metrics2.impl.MsInfo;
-import org.apache.hadoop.metrics2.sink.timeline.base.AbstractTimelineMetricsSink;
import org.apache.hadoop.metrics2.sink.timeline.cache.TimelineMetricsCache;
import org.apache.hadoop.metrics2.util.Servers;
import org.apache.hadoop.net.DNS;
@@ -151,19 +150,19 @@ public class HadoopTimelineMetricsSink extends AbstractTimelineMetricsSink imple
(Collection<AbstractMetric>) record.metrics();
List<TimelineMetric> metricList = new ArrayList<TimelineMetric>();
+ long startTime = record.timestamp();
for (AbstractMetric metric : metrics) {
sb.append(metric.name());
String name = sb.toString();
+ Number value = metric.value();
TimelineMetric timelineMetric = new TimelineMetric();
timelineMetric.setMetricName(name);
timelineMetric.setHostName(hostName);
timelineMetric.setAppId(serviceName);
- timelineMetric.setStartTime(record.timestamp());
- timelineMetric.setType(ClassUtils.getShortCanonicalName(
- metric.value(), "Number"));
- timelineMetric.getMetricValues().put(record.timestamp(),
- metric.value().doubleValue());
+ timelineMetric.setStartTime(startTime);
+ timelineMetric.setType(ClassUtils.getShortCanonicalName(value, "Number"));
+ timelineMetric.getMetricValues().put(startTime, value.doubleValue());
// Put intermediate values into the cache until it is time to send
metricsCache.putTimelineMetric(timelineMetric);
http://git-wip-us.apache.org/repos/asf/ambari/blob/dec5f7d6/ambari-metrics/ambari-metrics-hadoop-sink/src/test/java/org/apache/hadoop/metrics2/sink/timeline/HadoopTimelineMetricsSinkTest.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-hadoop-sink/src/test/java/org/apache/hadoop/metrics2/sink/timeline/HadoopTimelineMetricsSinkTest.java b/ambari-metrics/ambari-metrics-hadoop-sink/src/test/java/org/apache/hadoop/metrics2/sink/timeline/HadoopTimelineMetricsSinkTest.java
index 3c83868..d7b5d73 100644
--- a/ambari-metrics/ambari-metrics-hadoop-sink/src/test/java/org/apache/hadoop/metrics2/sink/timeline/HadoopTimelineMetricsSinkTest.java
+++ b/ambari-metrics/ambari-metrics-hadoop-sink/src/test/java/org/apache/hadoop/metrics2/sink/timeline/HadoopTimelineMetricsSinkTest.java
@@ -23,25 +23,41 @@ import org.apache.commons.httpclient.HttpClient;
import org.apache.commons.httpclient.methods.PostMethod;
import org.apache.hadoop.metrics2.AbstractMetric;
import org.apache.hadoop.metrics2.MetricsRecord;
-import org.apache.hadoop.metrics2.sink.timeline.base.AbstractTimelineMetricsSink;
+import org.apache.hadoop.metrics2.sink.timeline.cache.TimelineMetricsCache;
+import org.easymock.Capture;
+import org.easymock.EasyMock;
import org.easymock.IAnswer;
+import org.easymock.IArgumentMatcher;
+import org.junit.Assert;
+import org.junit.Test;
+import java.lang.reflect.Field;
+import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashSet;
import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
-import static org.apache.hadoop.metrics2.sink.timeline.base.AbstractTimelineMetricsSink.*;
+import static org.apache.hadoop.metrics2.sink.timeline.AbstractTimelineMetricsSink.COLLECTOR_HOST_PROPERTY;
+import static org.apache.hadoop.metrics2.sink.timeline.AbstractTimelineMetricsSink.MAX_METRIC_ROW_CACHE_SIZE;
+import static org.apache.hadoop.metrics2.sink.timeline.AbstractTimelineMetricsSink.METRICS_SEND_INTERVAL;
import static org.easymock.EasyMock.anyInt;
import static org.easymock.EasyMock.anyObject;
+import static org.easymock.EasyMock.capture;
+import static org.easymock.EasyMock.createMockBuilder;
import static org.easymock.EasyMock.createNiceMock;
import static org.easymock.EasyMock.eq;
import static org.easymock.EasyMock.expect;
import static org.easymock.EasyMock.expectLastCall;
import static org.easymock.EasyMock.replay;
+import static org.easymock.EasyMock.reportMatcher;
import static org.easymock.EasyMock.verify;
public class HadoopTimelineMetricsSinkTest {
- @org.junit.Test
+ @Test
public void testPutMetrics() throws Exception {
HadoopTimelineMetricsSink sink = new HadoopTimelineMetricsSink();
@@ -110,7 +126,127 @@ public class HadoopTimelineMetricsSinkTest {
sink.putMetrics(record);
verify(conf, httpClient, record, metric);
+ }
+
+ @Test
+ public void testDuplicateTimeSeriesNotSaved() throws Exception {
+ HadoopTimelineMetricsSink sink =
+ createMockBuilder(HadoopTimelineMetricsSink.class)
+ .withConstructor().addMockedMethod("appendPrefix")
+ .addMockedMethod("emitMetrics").createNiceMock();
+
+ SubsetConfiguration conf = createNiceMock(SubsetConfiguration.class);
+ expect(conf.getString(eq("slave.host.name"))).andReturn("testhost").anyTimes();
+ expect(conf.getParent()).andReturn(null).anyTimes();
+ expect(conf.getPrefix()).andReturn("service").anyTimes();
+ expect(conf.getString(eq(COLLECTOR_HOST_PROPERTY))).andReturn("localhost:63188").anyTimes();
+ expect(conf.getString(eq("serviceName-prefix"), eq(""))).andReturn("").anyTimes();
+
+ expect(conf.getInt(eq(MAX_METRIC_ROW_CACHE_SIZE), anyInt())).andReturn(10).anyTimes();
+ // Return eviction time smaller than time diff for first 3 entries
+ // Third entry will result in eviction
+ expect(conf.getInt(eq(METRICS_SEND_INTERVAL), anyInt())).andReturn(10).anyTimes();
+
+ conf.setListDelimiter(eq(','));
+ expectLastCall().anyTimes();
+
+ expect(conf.getKeys()).andReturn(new Iterator() {
+ @Override
+ public boolean hasNext() {
+ return false;
+ }
+
+ @Override
+ public Object next() {
+ return null;
+ }
+
+ @Override
+ public void remove() {
+
+ }
+ }).once();
+
+ AbstractMetric metric = createNiceMock(AbstractMetric.class);
+ expect(metric.name()).andReturn("metricName").anyTimes();
+ expect(metric.value()).andReturn(1.0).once();
+ expect(metric.value()).andReturn(2.0).once();
+ expect(metric.value()).andReturn(3.0).once();
+ expect(metric.value()).andReturn(4.0).once();
+ expect(metric.value()).andReturn(5.0).once();
+ expect(metric.value()).andReturn(6.0).once();
+ MetricsRecord record = createNiceMock(MetricsRecord.class);
+ expect(record.name()).andReturn("testName").anyTimes();
+ expect(record.context()).andReturn("testContext").anyTimes();
+
+ sink.appendPrefix(eq(record), (StringBuilder) anyObject());
+ expectLastCall().anyTimes().andStubAnswer(new IAnswer<Object>() {
+ @Override
+ public Object answer() throws Throwable {
+ return null;
+ }
+ });
+
+ final Long now = System.currentTimeMillis();
+ // TODO: Current implementation of cache needs > 1 elements to evict any
+ expect(record.timestamp()).andReturn(now).times(2);
+ expect(record.timestamp()).andReturn(now + 100l).times(2);
+ expect(record.timestamp()).andReturn(now + 200l).once();
+ expect(record.timestamp()).andReturn(now + 300l).once();
+
+ expect(record.metrics()).andReturn(Arrays.asList(metric)).anyTimes();
+
+ final List<TimelineMetrics> capturedMetrics = new ArrayList<TimelineMetrics>();
+ sink.emitMetrics((TimelineMetrics) anyObject());
+ expectLastCall().andStubAnswer(new IAnswer<Object>() {
+ @Override
+ public Object answer() throws Throwable {
+ capturedMetrics.add((TimelineMetrics) EasyMock.getCurrentArguments()[0]);
+ return null;
+ }
+ });
+
+ replay(conf, sink, record, metric);
+
+ sink.init(conf);
+
+ // time = t1
+ sink.putMetrics(record);
+ // time = t1
+ sink.putMetrics(record);
+ // time = t2
+ sink.putMetrics(record);
+ // Evict
+ // time = t2
+ sink.putMetrics(record);
+ // time = t3
+ sink.putMetrics(record);
+ // time = t4
+ sink.putMetrics(record);
+ verify(conf, sink, record, metric);
+
+ Assert.assertEquals(2, capturedMetrics.size());
+ Iterator<TimelineMetrics> metricsIterator = capturedMetrics.iterator();
+
+ // t1, t2
+ TimelineMetric timelineMetric1 = metricsIterator.next().getMetrics().get(0);
+ Assert.assertEquals(2, timelineMetric1.getMetricValues().size());
+ Iterator<Long> timestamps = timelineMetric1.getMetricValues().keySet().iterator();
+ Assert.assertEquals(now, timestamps.next());
+ Assert.assertEquals(new Long(now + 100l), timestamps.next());
+ Iterator<Double> values = timelineMetric1.getMetricValues().values().iterator();
+ Assert.assertEquals(new Double(1.0), values.next());
+ Assert.assertEquals(new Double(3.0), values.next());
+ // t3, t4
+ TimelineMetric timelineMetric2 = metricsIterator.next().getMetrics().get(0);
+ Assert.assertEquals(2, timelineMetric2.getMetricValues().size());
+ timestamps = timelineMetric2.getMetricValues().keySet().iterator();
+ Assert.assertEquals(new Long(now + 200l), timestamps.next());
+ Assert.assertEquals(new Long(now + 300l), timestamps.next());
+ values = timelineMetric2.getMetricValues().values().iterator();
+ Assert.assertEquals(new Double(5.0), values.next());
+ Assert.assertEquals(new Double(6.0), values.next());
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ambari/blob/dec5f7d6/ambari-metrics/ambari-metrics-storm-sink/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSink.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-storm-sink/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSink.java b/ambari-metrics/ambari-metrics-storm-sink/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSink.java
index a8f35e4..1fd81de 100644
--- a/ambari-metrics/ambari-metrics-storm-sink/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSink.java
+++ b/ambari-metrics/ambari-metrics-storm-sink/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSink.java
@@ -25,7 +25,7 @@ import org.apache.commons.lang.ClassUtils;
import org.apache.commons.lang.math.NumberUtils;
import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics;
-import org.apache.hadoop.metrics2.sink.timeline.base.AbstractTimelineMetricsSink;
+import org.apache.hadoop.metrics2.sink.timeline.AbstractTimelineMetricsSink;
import org.apache.hadoop.metrics2.sink.timeline.cache.TimelineMetricsCache;
import org.apache.hadoop.metrics2.sink.timeline.configuration.Configuration;
import org.apache.hadoop.metrics2.util.Servers;