You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by sw...@apache.org on 2015/01/05 23:35:48 UTC

ambari git commit: AMBARI-8994. AMS : Yarn service - RPC metrics returns duplicate array elements. (swagle)

Repository: ambari
Updated Branches:
  refs/heads/trunk 83b8ab969 -> dec5f7d69


AMBARI-8994. AMS : Yarn service - RPC metrics returns duplicate array elements. (swagle)


Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/dec5f7d6
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/dec5f7d6
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/dec5f7d6

Branch: refs/heads/trunk
Commit: dec5f7d69c43db12a4080d6e65de8be102936fb1
Parents: 83b8ab9
Author: Siddharth Wagle <sw...@hortonworks.com>
Authored: Mon Jan 5 14:19:27 2015 -0800
Committer: Siddharth Wagle <sw...@hortonworks.com>
Committed: Mon Jan 5 14:19:34 2015 -0800

----------------------------------------------------------------------
 .../timeline/AbstractTimelineMetricsSink.java   |  83 +++++++++++
 .../base/AbstractTimelineMetricsSink.java       |  79 -----------
 .../timeline/cache/TimelineMetricsCache.java    |  30 +++-
 .../sink/flume/FlumeTimelineMetricsSink.java    |   2 +-
 .../timeline/HadoopTimelineMetricsSink.java     |  11 +-
 .../timeline/HadoopTimelineMetricsSinkTest.java | 142 ++++++++++++++++++-
 .../sink/storm/StormTimelineMetricsSink.java    |   2 +-
 7 files changed, 257 insertions(+), 92 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ambari/blob/dec5f7d6/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/AbstractTimelineMetricsSink.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/AbstractTimelineMetricsSink.java b/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/AbstractTimelineMetricsSink.java
new file mode 100644
index 0000000..a382ccb
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/AbstractTimelineMetricsSink.java
@@ -0,0 +1,83 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.metrics2.sink.timeline;
+
+import org.apache.commons.httpclient.HttpClient;
+import org.apache.commons.httpclient.methods.PostMethod;
+import org.apache.commons.httpclient.methods.StringRequestEntity;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics;
+import org.codehaus.jackson.map.AnnotationIntrospector;
+import org.codehaus.jackson.map.ObjectMapper;
+import org.codehaus.jackson.map.annotate.JsonSerialize;
+import org.codehaus.jackson.xc.JaxbAnnotationIntrospector;
+
+import java.io.IOException;
+import java.net.SocketAddress;
+
+public abstract class AbstractTimelineMetricsSink {
+  public static final String TAGS_FOR_PREFIX_PROPERTY_PREFIX = "tagsForPrefix.";
+  public static final String MAX_METRIC_ROW_CACHE_SIZE = "maxRowCacheSize";
+  public static final String METRICS_SEND_INTERVAL = "sendInterval";
+  public static final String COLLECTOR_HOST_PROPERTY = "collector";
+
+  protected final Log LOG;
+  private HttpClient httpClient = new HttpClient();
+
+  protected static ObjectMapper mapper;
+
+  static {
+    mapper = new ObjectMapper();
+    AnnotationIntrospector introspector = new JaxbAnnotationIntrospector();
+    mapper.setAnnotationIntrospector(introspector);
+    mapper.getSerializationConfig()
+        .setSerializationInclusion(JsonSerialize.Inclusion.NON_NULL);
+  }
+
+  public AbstractTimelineMetricsSink() {
+    LOG = LogFactory.getLog(this.getClass());
+  }
+
+  protected void emitMetrics(TimelineMetrics metrics) throws IOException {
+    String jsonData = mapper.writeValueAsString(metrics);
+
+    SocketAddress socketAddress = getServerSocketAddress();
+
+    if (socketAddress != null) {
+      StringRequestEntity requestEntity = new StringRequestEntity(jsonData, "application/json", "UTF-8");
+
+      PostMethod postMethod = new PostMethod(getCollectorUri());
+      postMethod.setRequestEntity(requestEntity);
+      int statusCode = httpClient.executeMethod(postMethod);
+      if (statusCode != 200) {
+        LOG.info("Unable to POST metrics to collector, " + getCollectorUri());
+      } else {
+        LOG.debug("Metrics posted to Collector " + getCollectorUri());
+      }
+    }
+  }
+
+  public void setHttpClient(HttpClient httpClient) {
+    this.httpClient = httpClient;
+  }
+
+  abstract protected SocketAddress getServerSocketAddress();
+
+  abstract protected String getCollectorUri();
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/dec5f7d6/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/base/AbstractTimelineMetricsSink.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/base/AbstractTimelineMetricsSink.java b/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/base/AbstractTimelineMetricsSink.java
deleted file mode 100644
index d51ee67..0000000
--- a/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/base/AbstractTimelineMetricsSink.java
+++ /dev/null
@@ -1,79 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.metrics2.sink.timeline.base;
-
-import org.apache.commons.httpclient.HttpClient;
-import org.apache.commons.httpclient.methods.PostMethod;
-import org.apache.commons.httpclient.methods.StringRequestEntity;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics;
-import org.codehaus.jackson.map.AnnotationIntrospector;
-import org.codehaus.jackson.map.ObjectMapper;
-import org.codehaus.jackson.map.annotate.JsonSerialize;
-import org.codehaus.jackson.xc.JaxbAnnotationIntrospector;
-
-import java.io.IOException;
-import java.net.SocketAddress;
-
-public abstract class AbstractTimelineMetricsSink {
-  public static final String TAGS_FOR_PREFIX_PROPERTY_PREFIX = "tagsForPrefix.";
-  public static final String MAX_METRIC_ROW_CACHE_SIZE = "maxRowCacheSize";
-  public static final String METRICS_SEND_INTERVAL = "sendInterval";
-  public static final String COLLECTOR_HOST_PROPERTY = "collector";
-
-  protected final Log LOG = LogFactory.getLog(this.getClass());
-  private HttpClient httpClient = new HttpClient();
-
-  protected static ObjectMapper mapper;
-
-  static {
-    mapper = new ObjectMapper();
-    AnnotationIntrospector introspector = new JaxbAnnotationIntrospector();
-    mapper.setAnnotationIntrospector(introspector);
-    mapper.getSerializationConfig()
-        .setSerializationInclusion(JsonSerialize.Inclusion.NON_NULL);
-  }
-
-  protected void emitMetrics(TimelineMetrics metrics) throws IOException {
-    String jsonData = mapper.writeValueAsString(metrics);
-
-    SocketAddress socketAddress = getServerSocketAddress();
-
-    if (socketAddress != null) {
-      StringRequestEntity requestEntity = new StringRequestEntity(jsonData, "application/json", "UTF-8");
-
-      PostMethod postMethod = new PostMethod(getCollectorUri());
-      postMethod.setRequestEntity(requestEntity);
-      int statusCode = httpClient.executeMethod(postMethod);
-      if (statusCode != 200) {
-        LOG.info("Unable to POST metrics to collector, " + getCollectorUri());
-      } else {
-        LOG.debug("Metrics posted to Collector " + getCollectorUri());
-      }
-    }
-  }
-
-  public void setHttpClient(HttpClient httpClient) {
-    this.httpClient = httpClient;
-  }
-
-  abstract protected SocketAddress getServerSocketAddress();
-
-  abstract protected String getCollectorUri();
-}

http://git-wip-us.apache.org/repos/asf/ambari/blob/dec5f7d6/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/cache/TimelineMetricsCache.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/cache/TimelineMetricsCache.java b/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/cache/TimelineMetricsCache.java
index 06c3441..5e89e14 100644
--- a/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/cache/TimelineMetricsCache.java
+++ b/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/cache/TimelineMetricsCache.java
@@ -23,6 +23,7 @@ import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
 
+import java.util.HashMap;
 import java.util.LinkedHashMap;
 import java.util.Map;
 
@@ -78,6 +79,9 @@ public class TimelineMetricsCache {
   class TimelineMetricHolder extends LinkedHashMap<String, TimelineMetricWrapper> {
     private static final long serialVersionUID = 1L;
     private boolean gotOverflow = false;
+    // To avoid duplication at the end of the buffer and beginning of the next
+    // segment of values
+    private Map<String, Long> endOfBufferTimestamps = new HashMap<String, Long>();
 
     @Override
     protected boolean removeEldestEntry(Map.Entry<String, TimelineMetricWrapper> eldest) {
@@ -93,7 +97,7 @@ public class TimelineMetricsCache {
       TimelineMetricWrapper metricWrapper = this.get(metricName);
 
       if (metricWrapper == null
-        || metricWrapper.getTimeDiff() < maxEvictionTimeInMillis) {
+        || metricWrapper.getTimeDiff() < getMaxEvictionTimeInMillis()) {
         return null;
       }
 
@@ -104,13 +108,27 @@ public class TimelineMetricsCache {
     }
 
     public void put(String metricName, TimelineMetric timelineMetric) {
-
+      if (isDuplicate(timelineMetric)) {
+        return;
+      }
       TimelineMetricWrapper metric = this.get(metricName);
       if (metric == null) {
         this.put(metricName, new TimelineMetricWrapper(timelineMetric));
       } else {
         metric.putMetric(timelineMetric);
       }
+      // Buffer last ts value
+      endOfBufferTimestamps.put(metricName, timelineMetric.getStartTime());
+    }
+
+    /**
+     * Test whether last buffered timestamp is same as the newly received.
+     * @param timelineMetric @TimelineMetric
+     * @return true/false
+     */
+    private boolean isDuplicate(TimelineMetric timelineMetric) {
+      return endOfBufferTimestamps.containsKey(timelineMetric.getMetricName())
+        && endOfBufferTimestamps.get(timelineMetric.getMetricName()).equals(timelineMetric.getStartTime());
     }
   }
 
@@ -122,6 +140,14 @@ public class TimelineMetricsCache {
     return null;
   }
 
+  /**
+   * Getter method to help testing eviction
+   * @return @int
+   */
+  public int getMaxEvictionTimeInMillis() {
+    return maxEvictionTimeInMillis;
+  }
+
   public void putTimelineMetric(TimelineMetric timelineMetric) {
     timelineMetricCache.put(timelineMetric.getMetricName(), timelineMetric);
   }

http://git-wip-us.apache.org/repos/asf/ambari/blob/dec5f7d6/ambari-metrics/ambari-metrics-flume-sink/src/main/java/org/apache/hadoop/metrics2/sink/flume/FlumeTimelineMetricsSink.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-flume-sink/src/main/java/org/apache/hadoop/metrics2/sink/flume/FlumeTimelineMetricsSink.java b/ambari-metrics/ambari-metrics-flume-sink/src/main/java/org/apache/hadoop/metrics2/sink/flume/FlumeTimelineMetricsSink.java
index d28b345..4207426 100644
--- a/ambari-metrics/ambari-metrics-flume-sink/src/main/java/org/apache/hadoop/metrics2/sink/flume/FlumeTimelineMetricsSink.java
+++ b/ambari-metrics/ambari-metrics-flume-sink/src/main/java/org/apache/hadoop/metrics2/sink/flume/FlumeTimelineMetricsSink.java
@@ -26,7 +26,7 @@ import org.apache.flume.instrumentation.MonitorService;
 import org.apache.flume.instrumentation.util.JMXPollUtil;
 import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
 import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics;
-import org.apache.hadoop.metrics2.sink.timeline.base.AbstractTimelineMetricsSink;
+import org.apache.hadoop.metrics2.sink.timeline.AbstractTimelineMetricsSink;
 import org.apache.hadoop.metrics2.sink.timeline.cache.TimelineMetricsCache;
 import org.apache.hadoop.metrics2.sink.timeline.configuration.Configuration;
 import org.apache.hadoop.metrics2.util.Servers;

http://git-wip-us.apache.org/repos/asf/ambari/blob/dec5f7d6/ambari-metrics/ambari-metrics-hadoop-sink/src/main/java/org/apache/hadoop/metrics2/sink/timeline/HadoopTimelineMetricsSink.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-hadoop-sink/src/main/java/org/apache/hadoop/metrics2/sink/timeline/HadoopTimelineMetricsSink.java b/ambari-metrics/ambari-metrics-hadoop-sink/src/main/java/org/apache/hadoop/metrics2/sink/timeline/HadoopTimelineMetricsSink.java
index fb061a9..a8d3a37 100644
--- a/ambari-metrics/ambari-metrics-hadoop-sink/src/main/java/org/apache/hadoop/metrics2/sink/timeline/HadoopTimelineMetricsSink.java
+++ b/ambari-metrics/ambari-metrics-hadoop-sink/src/main/java/org/apache/hadoop/metrics2/sink/timeline/HadoopTimelineMetricsSink.java
@@ -23,7 +23,6 @@ import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.metrics2.*;
 import org.apache.hadoop.metrics2.impl.MsInfo;
-import org.apache.hadoop.metrics2.sink.timeline.base.AbstractTimelineMetricsSink;
 import org.apache.hadoop.metrics2.sink.timeline.cache.TimelineMetricsCache;
 import org.apache.hadoop.metrics2.util.Servers;
 import org.apache.hadoop.net.DNS;
@@ -151,19 +150,19 @@ public class HadoopTimelineMetricsSink extends AbstractTimelineMetricsSink imple
         (Collection<AbstractMetric>) record.metrics();
 
       List<TimelineMetric> metricList = new ArrayList<TimelineMetric>();
+      long startTime = record.timestamp();
 
       for (AbstractMetric metric : metrics) {
         sb.append(metric.name());
         String name = sb.toString();
+        Number value = metric.value();
         TimelineMetric timelineMetric = new TimelineMetric();
         timelineMetric.setMetricName(name);
         timelineMetric.setHostName(hostName);
         timelineMetric.setAppId(serviceName);
-        timelineMetric.setStartTime(record.timestamp());
-        timelineMetric.setType(ClassUtils.getShortCanonicalName(
-          metric.value(), "Number"));
-        timelineMetric.getMetricValues().put(record.timestamp(),
-          metric.value().doubleValue());
+        timelineMetric.setStartTime(startTime);
+        timelineMetric.setType(ClassUtils.getShortCanonicalName(value, "Number"));
+        timelineMetric.getMetricValues().put(startTime, value.doubleValue());
         // Put intermediate values into the cache until it is time to send
         metricsCache.putTimelineMetric(timelineMetric);
 

http://git-wip-us.apache.org/repos/asf/ambari/blob/dec5f7d6/ambari-metrics/ambari-metrics-hadoop-sink/src/test/java/org/apache/hadoop/metrics2/sink/timeline/HadoopTimelineMetricsSinkTest.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-hadoop-sink/src/test/java/org/apache/hadoop/metrics2/sink/timeline/HadoopTimelineMetricsSinkTest.java b/ambari-metrics/ambari-metrics-hadoop-sink/src/test/java/org/apache/hadoop/metrics2/sink/timeline/HadoopTimelineMetricsSinkTest.java
index 3c83868..d7b5d73 100644
--- a/ambari-metrics/ambari-metrics-hadoop-sink/src/test/java/org/apache/hadoop/metrics2/sink/timeline/HadoopTimelineMetricsSinkTest.java
+++ b/ambari-metrics/ambari-metrics-hadoop-sink/src/test/java/org/apache/hadoop/metrics2/sink/timeline/HadoopTimelineMetricsSinkTest.java
@@ -23,25 +23,41 @@ import org.apache.commons.httpclient.HttpClient;
 import org.apache.commons.httpclient.methods.PostMethod;
 import org.apache.hadoop.metrics2.AbstractMetric;
 import org.apache.hadoop.metrics2.MetricsRecord;
-import org.apache.hadoop.metrics2.sink.timeline.base.AbstractTimelineMetricsSink;
+import org.apache.hadoop.metrics2.sink.timeline.cache.TimelineMetricsCache;
+import org.easymock.Capture;
+import org.easymock.EasyMock;
 import org.easymock.IAnswer;
+import org.easymock.IArgumentMatcher;
+import org.junit.Assert;
+import org.junit.Test;
 
+import java.lang.reflect.Field;
+import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashSet;
 import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
 
-import static org.apache.hadoop.metrics2.sink.timeline.base.AbstractTimelineMetricsSink.*;
+import static org.apache.hadoop.metrics2.sink.timeline.AbstractTimelineMetricsSink.COLLECTOR_HOST_PROPERTY;
+import static org.apache.hadoop.metrics2.sink.timeline.AbstractTimelineMetricsSink.MAX_METRIC_ROW_CACHE_SIZE;
+import static org.apache.hadoop.metrics2.sink.timeline.AbstractTimelineMetricsSink.METRICS_SEND_INTERVAL;
 import static org.easymock.EasyMock.anyInt;
 import static org.easymock.EasyMock.anyObject;
+import static org.easymock.EasyMock.capture;
+import static org.easymock.EasyMock.createMockBuilder;
 import static org.easymock.EasyMock.createNiceMock;
 import static org.easymock.EasyMock.eq;
 import static org.easymock.EasyMock.expect;
 import static org.easymock.EasyMock.expectLastCall;
 import static org.easymock.EasyMock.replay;
+import static org.easymock.EasyMock.reportMatcher;
 import static org.easymock.EasyMock.verify;
 
 public class HadoopTimelineMetricsSinkTest {
 
-  @org.junit.Test
+  @Test
   public void testPutMetrics() throws Exception {
     HadoopTimelineMetricsSink sink = new HadoopTimelineMetricsSink();
 
@@ -110,7 +126,127 @@ public class HadoopTimelineMetricsSinkTest {
     sink.putMetrics(record);
 
     verify(conf, httpClient, record, metric);
+  }
+
+  @Test
+  public void testDuplicateTimeSeriesNotSaved() throws Exception {
+    HadoopTimelineMetricsSink sink =
+      createMockBuilder(HadoopTimelineMetricsSink.class)
+        .withConstructor().addMockedMethod("appendPrefix")
+        .addMockedMethod("emitMetrics").createNiceMock();
+
+    SubsetConfiguration conf = createNiceMock(SubsetConfiguration.class);
+    expect(conf.getString(eq("slave.host.name"))).andReturn("testhost").anyTimes();
+    expect(conf.getParent()).andReturn(null).anyTimes();
+    expect(conf.getPrefix()).andReturn("service").anyTimes();
+    expect(conf.getString(eq(COLLECTOR_HOST_PROPERTY))).andReturn("localhost:63188").anyTimes();
+    expect(conf.getString(eq("serviceName-prefix"), eq(""))).andReturn("").anyTimes();
+
+    expect(conf.getInt(eq(MAX_METRIC_ROW_CACHE_SIZE), anyInt())).andReturn(10).anyTimes();
+    // Return eviction time smaller than time diff for first 3 entries
+    // Third entry will result in eviction
+    expect(conf.getInt(eq(METRICS_SEND_INTERVAL), anyInt())).andReturn(10).anyTimes();
+
+    conf.setListDelimiter(eq(','));
+    expectLastCall().anyTimes();
+
+    expect(conf.getKeys()).andReturn(new Iterator() {
+      @Override
+      public boolean hasNext() {
+        return false;
+      }
+
+      @Override
+      public Object next() {
+        return null;
+      }
+
+      @Override
+      public void remove() {
+
+      }
+    }).once();
+
+    AbstractMetric metric = createNiceMock(AbstractMetric.class);
+    expect(metric.name()).andReturn("metricName").anyTimes();
+    expect(metric.value()).andReturn(1.0).once();
+    expect(metric.value()).andReturn(2.0).once();
+    expect(metric.value()).andReturn(3.0).once();
+    expect(metric.value()).andReturn(4.0).once();
+    expect(metric.value()).andReturn(5.0).once();
+    expect(metric.value()).andReturn(6.0).once();
 
+    MetricsRecord record = createNiceMock(MetricsRecord.class);
+    expect(record.name()).andReturn("testName").anyTimes();
+    expect(record.context()).andReturn("testContext").anyTimes();
+
+    sink.appendPrefix(eq(record), (StringBuilder) anyObject());
+    expectLastCall().anyTimes().andStubAnswer(new IAnswer<Object>() {
+      @Override
+      public Object answer() throws Throwable {
+        return null;
+      }
+    });
+
+    final Long now = System.currentTimeMillis();
+    // TODO: Current implementation of cache needs > 1 elements to evict any
+    expect(record.timestamp()).andReturn(now).times(2);
+    expect(record.timestamp()).andReturn(now + 100l).times(2);
+    expect(record.timestamp()).andReturn(now + 200l).once();
+    expect(record.timestamp()).andReturn(now + 300l).once();
+
+    expect(record.metrics()).andReturn(Arrays.asList(metric)).anyTimes();
+
+    final List<TimelineMetrics> capturedMetrics = new ArrayList<TimelineMetrics>();
+    sink.emitMetrics((TimelineMetrics) anyObject());
+    expectLastCall().andStubAnswer(new IAnswer<Object>() {
+      @Override
+      public Object answer() throws Throwable {
+        capturedMetrics.add((TimelineMetrics) EasyMock.getCurrentArguments()[0]);
+        return null;
+      }
+    });
+
+    replay(conf, sink, record, metric);
+
+    sink.init(conf);
+
+    // time = t1
+    sink.putMetrics(record);
+    // time = t1
+    sink.putMetrics(record);
+    // time = t2
+    sink.putMetrics(record);
+    // Evict
+    // time = t2
+    sink.putMetrics(record);
+    // time = t3
+    sink.putMetrics(record);
+    // time = t4
+    sink.putMetrics(record);
 
+    verify(conf, sink, record, metric);
+
+    Assert.assertEquals(2, capturedMetrics.size());
+    Iterator<TimelineMetrics> metricsIterator = capturedMetrics.iterator();
+
+    // t1, t2
+    TimelineMetric timelineMetric1 = metricsIterator.next().getMetrics().get(0);
+    Assert.assertEquals(2, timelineMetric1.getMetricValues().size());
+    Iterator<Long> timestamps = timelineMetric1.getMetricValues().keySet().iterator();
+    Assert.assertEquals(now, timestamps.next());
+    Assert.assertEquals(new Long(now + 100l), timestamps.next());
+    Iterator<Double> values = timelineMetric1.getMetricValues().values().iterator();
+    Assert.assertEquals(new Double(1.0), values.next());
+    Assert.assertEquals(new Double(3.0), values.next());
+    // t3, t4
+    TimelineMetric timelineMetric2 = metricsIterator.next().getMetrics().get(0);
+    Assert.assertEquals(2, timelineMetric2.getMetricValues().size());
+    timestamps = timelineMetric2.getMetricValues().keySet().iterator();
+    Assert.assertEquals(new Long(now + 200l), timestamps.next());
+    Assert.assertEquals(new Long(now + 300l), timestamps.next());
+    values = timelineMetric2.getMetricValues().values().iterator();
+    Assert.assertEquals(new Double(5.0), values.next());
+    Assert.assertEquals(new Double(6.0), values.next());
   }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ambari/blob/dec5f7d6/ambari-metrics/ambari-metrics-storm-sink/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSink.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-storm-sink/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSink.java b/ambari-metrics/ambari-metrics-storm-sink/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSink.java
index a8f35e4..1fd81de 100644
--- a/ambari-metrics/ambari-metrics-storm-sink/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSink.java
+++ b/ambari-metrics/ambari-metrics-storm-sink/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSink.java
@@ -25,7 +25,7 @@ import org.apache.commons.lang.ClassUtils;
 import org.apache.commons.lang.math.NumberUtils;
 import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
 import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics;
-import org.apache.hadoop.metrics2.sink.timeline.base.AbstractTimelineMetricsSink;
+import org.apache.hadoop.metrics2.sink.timeline.AbstractTimelineMetricsSink;
 import org.apache.hadoop.metrics2.sink.timeline.cache.TimelineMetricsCache;
 import org.apache.hadoop.metrics2.sink.timeline.configuration.Configuration;
 import org.apache.hadoop.metrics2.util.Servers;