You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by mp...@apache.org on 2014/12/05 20:42:13 UTC

[2/2] ambari git commit: AMBARI-8522. Enable Flume metrics sink to AMS. (Szilard Nemethy via mpapirkovskyy)

AMBARI-8522. Enable Flume metrics sink to AMS. (Szilard Nemethy via mpapirkovskyy)


Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/b42150ca
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/b42150ca
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/b42150ca

Branch: refs/heads/trunk
Commit: b42150cad1fb21d777519e029154614072a5b219
Parents: 9cdd4e2
Author: Myroslav Papirkovskyy <mp...@hortonworks.com>
Authored: Fri Dec 5 21:34:58 2014 +0200
Committer: Myroslav Papirkovskyy <mp...@hortonworks.com>
Committed: Fri Dec 5 21:42:06 2014 +0200

----------------------------------------------------------------------
 ambari-metrics/ambari-metrics-common/pom.xml    |  81 +++++++
 .../metrics2/sink/timeline/TimelineMetric.java  | 172 +++++++++++++++
 .../metrics2/sink/timeline/TimelineMetrics.java | 101 +++++++++
 .../base/AbstractTimelineMetricsSink.java       |  79 +++++++
 .../timeline/cache/TimelineMetricsCache.java    | 128 +++++++++++
 .../timeline/configuration/Configuration.java   |  62 ++++++
 .../ambari-metrics-flume-sink/pom.xml           | 181 ++++++++++++++++
 .../src/main/assemblies/empty.xml               |  21 ++
 .../src/main/assemblies/sink.xml                |  34 +++
 .../src/main/conf/flume-metrics2.properties.j2  |  22 ++
 .../sink/flume/FlumeTimelineMetricsSink.java    | 176 ++++++++++++++++
 .../flume/FlumeTimelineMetricsSinkTest.java     | 117 ++++++++++
 .../ambari-metrics-hadoop-sink/pom.xml          |  12 +-
 .../conf/hadoop-metrics2-hbase.properties.j2    |   8 +-
 .../src/main/conf/hadoop-metrics2.properties.j2 |   2 +-
 .../timeline/AbstractTimelineMetricsSink.java   | 101 ---------
 .../timeline/HadoopTimelineMetricsSink.java     | 211 +++++++++++++++++++
 .../metrics2/sink/timeline/TimelineMetric.java  | 172 ---------------
 .../metrics2/sink/timeline/TimelineMetrics.java | 102 ---------
 .../sink/timeline/TimelineMetricsCache.java     | 128 -----------
 .../sink/timeline/TimelineMetricsSink.java      | 211 -------------------
 .../ambari-metrics-timelineservice/pom.xml      |   2 +-
 ambari-metrics/pom.xml                          |   2 +
 23 files changed, 1399 insertions(+), 726 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ambari/blob/b42150ca/ambari-metrics/ambari-metrics-common/pom.xml
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-common/pom.xml b/ambari-metrics/ambari-metrics-common/pom.xml
new file mode 100644
index 0000000..786ad93
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-common/pom.xml
@@ -0,0 +1,81 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+   Licensed to the Apache Software Foundation (ASF) under one or more
+   contributor license agreements.  See the NOTICE file distributed with
+   this work for additional information regarding copyright ownership.
+   The ASF licenses this file to You under the Apache License, Version 2.0
+   (the "License"); you may not use this file except in compliance with
+   the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+  <parent>
+    <artifactId>ambari-metrics</artifactId>
+    <groupId>org.apache.ambari</groupId>
+    <version>0.1.0-SNAPSHOT</version>
+  </parent>
+  <modelVersion>4.0.0</modelVersion>
+  <artifactId>ambari-metrics-common</artifactId>
+  <build>
+    <plugins>
+      <plugin>
+        <artifactId>maven-compiler-plugin</artifactId>
+        <version>3.0</version>
+      </plugin>
+      <plugin>
+        <groupId>org.codehaus.mojo</groupId>
+        <artifactId>build-helper-maven-plugin</artifactId>
+        <version>1.8</version>
+        <executions>
+          <execution>
+            <id>parse-version</id>
+            <phase>validate</phase>
+            <goals>
+              <goal>parse-version</goal>
+            </goals>
+          </execution>
+          <execution>
+            <id>regex-property</id>
+            <goals>
+              <goal>regex-property</goal>
+            </goals>
+            <configuration>
+              <name>ambariVersion</name>
+              <value>${project.version}</value>
+              <regex>^([0-9]+)\.([0-9]+)\.([0-9]+)(\.|-).*</regex>
+              <replacement>$1.$2.$3</replacement>
+              <failIfNoMatch>false</failIfNoMatch>
+            </configuration>
+          </execution>
+        </executions>
+      </plugin>
+    </plugins>
+  </build>
+
+  <dependencies>
+    <dependency>
+      <groupId>commons-logging</groupId>
+      <artifactId>commons-logging</artifactId>
+      <version>1.1.1</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-common</artifactId>
+      <version>2.4.0</version>
+    </dependency>
+    <dependency>
+      <groupId>org.codehaus.jackson</groupId>
+      <artifactId>jackson-mapper-asl</artifactId>
+      <version>1.8.0</version>
+    </dependency>
+  </dependencies>
+</project>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ambari/blob/b42150ca/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/TimelineMetric.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/TimelineMetric.java b/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/TimelineMetric.java
new file mode 100644
index 0000000..68b4be8
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/TimelineMetric.java
@@ -0,0 +1,172 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.metrics2.sink.timeline;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import javax.xml.bind.annotation.XmlAccessType;
+import javax.xml.bind.annotation.XmlAccessorType;
+import javax.xml.bind.annotation.XmlElement;
+import javax.xml.bind.annotation.XmlRootElement;
+import java.util.Map;
+import java.util.TreeMap;
+
+@XmlRootElement(name = "metric")
+@XmlAccessorType(XmlAccessType.NONE)
+@InterfaceAudience.Public
+@InterfaceStability.Unstable
+public class TimelineMetric implements Comparable<TimelineMetric> {
+
+  private String metricName;
+  private String appId;
+  private String instanceId;
+  private String hostName;
+  private long timestamp;
+  private long startTime;
+  private String type;
+  private Map<Long, Double> metricValues = new TreeMap<Long, Double>();
+
+  @XmlElement(name = "metricname")
+  public String getMetricName() {
+    return metricName;
+  }
+
+  public void setMetricName(String metricName) {
+    this.metricName = metricName;
+  }
+
+  @XmlElement(name = "appid")
+  public String getAppId() {
+    return appId;
+  }
+
+  public void setAppId(String appId) {
+    this.appId = appId;
+  }
+
+  @XmlElement(name = "instanceid")
+  public String getInstanceId() {
+    return instanceId;
+  }
+
+  public void setInstanceId(String instanceId) {
+    this.instanceId = instanceId;
+  }
+
+  @XmlElement(name = "hostname")
+  public String getHostName() {
+    return hostName;
+  }
+
+  public void setHostName(String hostName) {
+    this.hostName = hostName;
+  }
+
+  @XmlElement(name = "timestamp")
+  public long getTimestamp() {
+    return timestamp;
+  }
+
+  public void setTimestamp(long timestamp) {
+    this.timestamp = timestamp;
+  }
+
+  @XmlElement(name = "starttime")
+  public long getStartTime() {
+    return startTime;
+  }
+
+  public void setStartTime(long startTime) {
+    this.startTime = startTime;
+  }
+
+  @XmlElement(name = "type")
+  public String getType() {
+    return type;
+  }
+
+  public void setType(String type) {
+    this.type = type;
+  }
+
+  @XmlElement(name = "metrics")
+  public Map<Long, Double> getMetricValues() {
+    return metricValues;
+  }
+
+  public void setMetricValues(Map<Long, Double> metricValues) {
+    this.metricValues = metricValues;
+  }
+
+  public void addMetricValues(Map<Long, Double> metricValues) {
+    this.metricValues.putAll(metricValues);
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) return true;
+    if (o == null || getClass() != o.getClass()) return false;
+
+    TimelineMetric metric = (TimelineMetric) o;
+
+    if (!metricName.equals(metric.metricName)) return false;
+    if (hostName != null ? !hostName.equals(metric.hostName) : metric.hostName != null)
+      return false;
+    if (appId != null ? !appId.equals(metric.appId) : metric.appId != null)
+      return false;
+    if (instanceId != null ? !instanceId.equals(metric.instanceId) : metric.instanceId != null)
+      return false;
+    if (timestamp != metric.timestamp) return false;
+    if (startTime != metric.startTime) return false;
+
+    return true;
+  }
+
+  public boolean equalsExceptTime(TimelineMetric metric) {
+    if (!metricName.equals(metric.metricName)) return false;
+    if (hostName != null ? !hostName.equals(metric.hostName) : metric.hostName != null)
+      return false;
+    if (appId != null ? !appId.equals(metric.appId) : metric.appId != null)
+      return false;
+    if (instanceId != null ? !instanceId.equals(metric.instanceId) : metric.instanceId != null)
+      return false;
+
+    return true;
+  }
+
+  @Override
+  public int hashCode() {
+    int result = metricName.hashCode();
+    result = 31 * result + (appId != null ? appId.hashCode() : 0);
+    result = 31 * result + (instanceId != null ? instanceId.hashCode() : 0);
+    result = 31 * result + (hostName != null ? hostName.hashCode() : 0);
+    result = 31 * result + (int) (timestamp ^ (timestamp >>> 32));
+    return result;
+  }
+
+  @Override
+  public int compareTo(TimelineMetric other) {
+    if (timestamp > other.timestamp) {
+      return -1;
+    } else if (timestamp < other.timestamp) {
+      return 1;
+    } else {
+      return metricName.compareTo(other.metricName);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/b42150ca/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/TimelineMetrics.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/TimelineMetrics.java b/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/TimelineMetrics.java
new file mode 100644
index 0000000..4355fb1
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/TimelineMetrics.java
@@ -0,0 +1,101 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.metrics2.sink.timeline;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+
+import javax.xml.bind.annotation.XmlAccessType;
+import javax.xml.bind.annotation.XmlAccessorType;
+import javax.xml.bind.annotation.XmlElement;
+import javax.xml.bind.annotation.XmlRootElement;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * The class that hosts a list of timeline entities.
+ */
+@XmlRootElement(name = "metrics")
+@XmlAccessorType(XmlAccessType.NONE)
+@InterfaceAudience.Public
+@InterfaceStability.Unstable
+public class TimelineMetrics {
+
+  private List<TimelineMetric> allMetrics = new ArrayList<TimelineMetric>();
+
+  public TimelineMetrics() {}
+
+  @XmlElement(name = "metrics")
+  public List<TimelineMetric> getMetrics() {
+    return allMetrics;
+  }
+
+  public void setMetrics(List<TimelineMetric> allMetrics) {
+    this.allMetrics = allMetrics;
+  }
+
+  private boolean isEqualTimelineMetrics(TimelineMetric metric1,
+                                         TimelineMetric metric2) {
+
+    boolean isEqual = true;
+
+    if (!metric1.getMetricName().equals(metric2.getMetricName())) {
+      return false;
+    }
+
+    if (metric1.getHostName() != null) {
+      isEqual = metric1.getHostName().equals(metric2.getHostName());
+    }
+
+    if (metric1.getAppId() != null) {
+      isEqual = metric1.getAppId().equals(metric2.getAppId());
+    }
+
+    return isEqual;
+  }
+
+  /**
+   * Merge with existing TimelineMetric if everything except startTime is
+   * the same.
+   * @param metric {@link TimelineMetric}
+   */
+  public void addOrMergeTimelineMetric(TimelineMetric metric) {
+    TimelineMetric metricToMerge = null;
+
+    if (!allMetrics.isEmpty()) {
+      for (TimelineMetric timelineMetric : allMetrics) {
+        if (timelineMetric.equalsExceptTime(metric)) {
+          metricToMerge = timelineMetric;
+          break;
+        }
+      }
+    }
+
+    if (metricToMerge != null) {
+      metricToMerge.addMetricValues(metric.getMetricValues());
+      if (metricToMerge.getTimestamp() > metric.getTimestamp()) {
+        metricToMerge.setTimestamp(metric.getTimestamp());
+      }
+      if (metricToMerge.getStartTime() > metric.getStartTime()) {
+        metricToMerge.setStartTime(metric.getStartTime());
+      }
+    } else {
+      allMetrics.add(metric);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/b42150ca/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/base/AbstractTimelineMetricsSink.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/base/AbstractTimelineMetricsSink.java b/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/base/AbstractTimelineMetricsSink.java
new file mode 100644
index 0000000..d51ee67
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/base/AbstractTimelineMetricsSink.java
@@ -0,0 +1,79 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.metrics2.sink.timeline.base;
+
+import org.apache.commons.httpclient.HttpClient;
+import org.apache.commons.httpclient.methods.PostMethod;
+import org.apache.commons.httpclient.methods.StringRequestEntity;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics;
+import org.codehaus.jackson.map.AnnotationIntrospector;
+import org.codehaus.jackson.map.ObjectMapper;
+import org.codehaus.jackson.map.annotate.JsonSerialize;
+import org.codehaus.jackson.xc.JaxbAnnotationIntrospector;
+
+import java.io.IOException;
+import java.net.SocketAddress;
+
+public abstract class AbstractTimelineMetricsSink {
+  public static final String TAGS_FOR_PREFIX_PROPERTY_PREFIX = "tagsForPrefix.";
+  public static final String MAX_METRIC_ROW_CACHE_SIZE = "maxRowCacheSize";
+  public static final String METRICS_SEND_INTERVAL = "sendInterval";
+  public static final String COLLECTOR_HOST_PROPERTY = "collector";
+
+  protected final Log LOG = LogFactory.getLog(this.getClass());
+  private HttpClient httpClient = new HttpClient();
+
+  protected static ObjectMapper mapper;
+
+  static {
+    mapper = new ObjectMapper();
+    AnnotationIntrospector introspector = new JaxbAnnotationIntrospector();
+    mapper.setAnnotationIntrospector(introspector);
+    mapper.getSerializationConfig()
+        .setSerializationInclusion(JsonSerialize.Inclusion.NON_NULL);
+  }
+
+  protected void emitMetrics(TimelineMetrics metrics) throws IOException {
+    String jsonData = mapper.writeValueAsString(metrics);
+
+    SocketAddress socketAddress = getServerSocketAddress();
+
+    if (socketAddress != null) {
+      StringRequestEntity requestEntity = new StringRequestEntity(jsonData, "application/json", "UTF-8");
+
+      PostMethod postMethod = new PostMethod(getCollectorUri());
+      postMethod.setRequestEntity(requestEntity);
+      int statusCode = httpClient.executeMethod(postMethod);
+      if (statusCode != 200) {
+        LOG.info("Unable to POST metrics to collector, " + getCollectorUri());
+      } else {
+        LOG.debug("Metrics posted to Collector " + getCollectorUri());
+      }
+    }
+  }
+
+  public void setHttpClient(HttpClient httpClient) {
+    this.httpClient = httpClient;
+  }
+
+  abstract protected SocketAddress getServerSocketAddress();
+
+  abstract protected String getCollectorUri();
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/b42150ca/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/cache/TimelineMetricsCache.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/cache/TimelineMetricsCache.java b/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/cache/TimelineMetricsCache.java
new file mode 100644
index 0000000..06c3441
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/cache/TimelineMetricsCache.java
@@ -0,0 +1,128 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.metrics2.sink.timeline.cache;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
+
+import java.util.LinkedHashMap;
+import java.util.Map;
+
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+public class TimelineMetricsCache {
+
+  private final TimelineMetricHolder timelineMetricCache = new TimelineMetricHolder();
+  private static final Log LOG = LogFactory.getLog(TimelineMetric.class);
+  public static final int MAX_RECS_PER_NAME_DEFAULT = 10000;
+  public static final int MAX_EVICTION_TIME_MILLIS = 59000; // ~ 1 min
+  private final int maxRecsPerName;
+  private final int maxEvictionTimeInMillis;
+
+  public TimelineMetricsCache(int maxRecsPerName, int maxEvictionTimeInMillis) {
+    this.maxRecsPerName = maxRecsPerName;
+    this.maxEvictionTimeInMillis = maxEvictionTimeInMillis;
+  }
+
+  class TimelineMetricWrapper {
+    private long timeDiff = -1;
+    private long oldestTimestamp = -1;
+    private TimelineMetric timelineMetric;
+
+    TimelineMetricWrapper(TimelineMetric timelineMetric) {
+      this.timelineMetric = timelineMetric;
+      this.oldestTimestamp = timelineMetric.getStartTime();
+    }
+
+    private void updateTimeDiff(long timestamp) {
+      if (oldestTimestamp != -1 && timestamp > oldestTimestamp) {
+        timeDiff = timestamp - oldestTimestamp;
+      } else {
+        oldestTimestamp = timestamp;
+      }
+    }
+
+    public void putMetric(TimelineMetric metric) {
+      this.timelineMetric.addMetricValues(metric.getMetricValues());
+      updateTimeDiff(metric.getStartTime());
+    }
+
+    public long getTimeDiff() {
+      return timeDiff;
+    }
+
+    public TimelineMetric getTimelineMetric() {
+      return timelineMetric;
+    }
+  }
+
+  // TODO: Change to ConcurentHashMap with weighted eviction
+  class TimelineMetricHolder extends LinkedHashMap<String, TimelineMetricWrapper> {
+    private static final long serialVersionUID = 1L;
+    private boolean gotOverflow = false;
+
+    @Override
+    protected boolean removeEldestEntry(Map.Entry<String, TimelineMetricWrapper> eldest) {
+      boolean overflow = size() > maxRecsPerName;
+      if (overflow && !gotOverflow) {
+        LOG.warn("Metrics cache overflow at "+ size() +" for "+ eldest);
+        gotOverflow = true;
+      }
+      return overflow;
+    }
+
+    public TimelineMetric evict(String metricName) {
+      TimelineMetricWrapper metricWrapper = this.get(metricName);
+
+      if (metricWrapper == null
+        || metricWrapper.getTimeDiff() < maxEvictionTimeInMillis) {
+        return null;
+      }
+
+      TimelineMetric timelineMetric = metricWrapper.getTimelineMetric();
+      this.remove(metricName);
+
+      return timelineMetric;
+    }
+
+    public void put(String metricName, TimelineMetric timelineMetric) {
+
+      TimelineMetricWrapper metric = this.get(metricName);
+      if (metric == null) {
+        this.put(metricName, new TimelineMetricWrapper(timelineMetric));
+      } else {
+        metric.putMetric(timelineMetric);
+      }
+    }
+  }
+
+  public TimelineMetric getTimelineMetric(String metricName) {
+    if (timelineMetricCache.containsKey(metricName)) {
+      return timelineMetricCache.evict(metricName);
+    }
+
+    return null;
+  }
+
+  public void putTimelineMetric(TimelineMetric timelineMetric) {
+    timelineMetricCache.put(timelineMetric.getMetricName(), timelineMetric);
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/b42150ca/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/configuration/Configuration.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/configuration/Configuration.java b/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/configuration/Configuration.java
new file mode 100644
index 0000000..940ea75
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/configuration/Configuration.java
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.metrics2.sink.timeline.configuration;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Properties;
+
+public class Configuration {
+  public final Log LOG = LogFactory.getLog(this.getClass());
+  private final Properties properties;
+
+  public Configuration(String configFile) {
+    properties = new Properties();
+
+    //Get property file stream from classpath
+    InputStream inputStream = Configuration.class.getResourceAsStream(configFile);
+
+    if (inputStream == null) {
+      throw new IllegalArgumentException(configFile + " not found in classpath");
+    }
+
+    // load the properties
+    try {
+      properties.load(inputStream);
+      inputStream.close();
+    } catch (FileNotFoundException fnf) {
+      LOG.info("No configuration file " + configFile + " found in classpath.", fnf);
+    } catch (IOException ie) {
+      throw new IllegalArgumentException("Can't read configuration file " +
+          configFile, ie);
+    }
+  }
+
+  public String getProperty(String key) {
+    return properties.getProperty(key);
+  }
+
+  public String getProperty(String key, String defaultValue) {
+    return properties.getProperty(key, defaultValue);
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/b42150ca/ambari-metrics/ambari-metrics-flume-sink/pom.xml
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-flume-sink/pom.xml b/ambari-metrics/ambari-metrics-flume-sink/pom.xml
new file mode 100644
index 0000000..3021438
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-flume-sink/pom.xml
@@ -0,0 +1,181 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+   http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
+                             http://maven.apache.org/xsd/maven-4.0.0.xsd">
+  <parent>
+    <artifactId>ambari-metrics</artifactId>
+    <groupId>org.apache.ambari</groupId>
+    <version>0.1.0-SNAPSHOT</version>
+  </parent>
+  <modelVersion>4.0.0</modelVersion>
+  <artifactId>ambari-metrics-flume-sink</artifactId>
+  <version>0.1.0-SNAPSHOT</version>
+  <packaging>jar</packaging>
+  <build>
+    <plugins>
+      <plugin>
+        <artifactId>maven-dependency-plugin</artifactId>
+        <executions>
+          <execution>
+            <phase>package</phase>
+            <goals>
+              <goal>copy-dependencies</goal>
+            </goals>
+            <configuration>
+              <outputDirectory>${project.build.directory}/lib</outputDirectory>
+            </configuration>
+          </execution>
+        </executions>
+      </plugin>
+      <plugin>
+        <artifactId>maven-assembly-plugin</artifactId>
+        <configuration>
+          <descriptors>
+            <descriptor>src/main/assemblies/sink.xml</descriptor>
+          </descriptors>
+          <tarLongFileMode>gnu</tarLongFileMode>
+        </configuration>
+        <executions>
+          <execution>
+            <id>build-tarball</id>
+            <phase>package</phase>
+            <goals>
+              <goal>single</goal>
+            </goals>
+          </execution>
+        </executions>
+      </plugin>
+      <plugin>
+        <artifactId>maven-compiler-plugin</artifactId>
+        <version>3.0</version>
+      </plugin>
+      <plugin>
+        <groupId>org.codehaus.mojo</groupId>
+        <artifactId>build-helper-maven-plugin</artifactId>
+        <version>1.8</version>
+        <executions>
+          <execution>
+            <id>parse-version</id>
+            <phase>validate</phase>
+            <goals>
+              <goal>parse-version</goal>
+            </goals>
+          </execution>
+          <execution>
+            <id>regex-property</id>
+            <goals>
+              <goal>regex-property</goal>
+            </goals>
+            <configuration>
+              <name>ambariVersion</name>
+              <value>${project.version}</value>
+              <regex>^([0-9]+)\.([0-9]+)\.([0-9]+)(\.|-).*</regex>
+              <replacement>$1.$2.$3</replacement>
+              <failIfNoMatch>false</failIfNoMatch>
+            </configuration>
+          </execution>
+        </executions>
+      </plugin>
+      <plugin>
+        <groupId>com.github.goldin</groupId>
+        <artifactId>copy-maven-plugin</artifactId>
+        <version>0.2.5</version>
+        <executions>
+          <execution>
+            <id>create-archive</id>
+            <phase>none</phase>
+          </execution>
+        </executions>
+      </plugin>
+      <plugin>
+        <groupId>org.codehaus.mojo</groupId>
+        <artifactId>rpm-maven-plugin</artifactId>
+        <version>2.0.1</version>
+        <executions>
+          <execution>
+            <phase>none</phase>
+            <goals>
+              <goal>rpm</goal>
+            </goals>
+          </execution>
+        </executions>
+        <configuration>
+          <name>ambari-metrics-flume-sink</name>
+          <copyright>2012, Apache Software Foundation</copyright>
+          <group>Development</group>
+          <description>Maven Recipe: RPM Package.</description>
+          <mappings>
+            <mapping>
+              <directory>/usr/lib/flume/lib</directory>
+              <filemode>644</filemode>
+              <username>root</username>
+              <groupname>root</groupname>
+              <sources>
+                <source>
+                  <location>target/${project.artifactId}-${project.version}.jar</location>
+                </source>
+                <source>
+                  <location>target/lib</location>
+                </source>
+              </sources>
+            </mapping>
+          </mappings>
+        </configuration>
+      </plugin>
+    </plugins>
+  </build>
+  <dependencies>
+    <dependency>
+      <groupId>org.apache.flume</groupId>
+      <artifactId>flume-ng-core</artifactId>
+      <version>1.5.1</version>
+      <scope>provided</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.ambari</groupId>
+      <artifactId>ambari-metrics-common</artifactId>
+      <version>0.1.0-SNAPSHOT</version>
+    </dependency>
+    <dependency>
+      <groupId>junit</groupId>
+      <artifactId>junit</artifactId>
+      <scope>test</scope>
+      <version>4.10</version>
+    </dependency>
+    <dependency>
+      <groupId>org.easymock</groupId>
+      <artifactId>easymock</artifactId>
+      <version>3.2</version>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.powermock</groupId>
+      <artifactId>powermock-api-easymock</artifactId>
+      <version>1.4.9</version>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.powermock</groupId>
+      <artifactId>powermock-module-junit4</artifactId>
+      <version>1.4.9</version>
+      <scope>test</scope>
+    </dependency>
+  </dependencies>
+</project>

http://git-wip-us.apache.org/repos/asf/ambari/blob/b42150ca/ambari-metrics/ambari-metrics-flume-sink/src/main/assemblies/empty.xml
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-flume-sink/src/main/assemblies/empty.xml b/ambari-metrics/ambari-metrics-flume-sink/src/main/assemblies/empty.xml
new file mode 100644
index 0000000..35738b1
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-flume-sink/src/main/assemblies/empty.xml
@@ -0,0 +1,21 @@
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements.  See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership.  The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License.  You may obtain a copy of the License at
+  
+       http://www.apache.org/licenses/LICENSE-2.0
+  
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<assembly>
+    <id>empty</id>
+    <formats/>
+</assembly>

http://git-wip-us.apache.org/repos/asf/ambari/blob/b42150ca/ambari-metrics/ambari-metrics-flume-sink/src/main/assemblies/sink.xml
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-flume-sink/src/main/assemblies/sink.xml b/ambari-metrics/ambari-metrics-flume-sink/src/main/assemblies/sink.xml
new file mode 100644
index 0000000..21a6b36
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-flume-sink/src/main/assemblies/sink.xml
@@ -0,0 +1,34 @@
+<?xml version="1.0"?>
+<!--
+   Licensed to the Apache Software Foundation (ASF) under one or more
+   contributor license agreements.  See the NOTICE file distributed with
+   this work for additional information regarding copyright ownership.
+   The ASF licenses this file to You under the Apache License, Version 2.0
+   (the "License"); you may not use this file except in compliance with
+   the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+-->
+<assembly>
+  <!--This 'all' id is not appended to the produced bundle because we do this:
+    http://maven.apache.org/plugins/maven-assembly-plugin/faq.html#required-classifiers
+  -->
+  <id>dist</id>
+  <formats>
+    <format>dir</format>
+    <format>tar.gz</format>
+  </formats>
+  <includeBaseDirectory>false</includeBaseDirectory>
+  <files>
+    <file>
+      <source>${project.build.directory}/${artifact.artifactId}-${artifact.version}.jar</source>
+      <outputDirectory>ambari-metrics-${project.version}/lib/ambari-metrics</outputDirectory>
+    </file>
+  </files>
+</assembly>

http://git-wip-us.apache.org/repos/asf/ambari/blob/b42150ca/ambari-metrics/ambari-metrics-flume-sink/src/main/conf/flume-metrics2.properties.j2
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-flume-sink/src/main/conf/flume-metrics2.properties.j2 b/ambari-metrics/ambari-metrics-flume-sink/src/main/conf/flume-metrics2.properties.j2
new file mode 100644
index 0000000..7458bf8
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-flume-sink/src/main/conf/flume-metrics2.properties.j2
@@ -0,0 +1,22 @@
+{#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#}
+
+collector={{metric_collector_host}}:8188
+collectionFrequency=60000
+maxRowCacheSize=10000
+sendInterval=59000
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ambari/blob/b42150ca/ambari-metrics/ambari-metrics-flume-sink/src/main/java/org/apache/hadoop/metrics2/sink/flume/FlumeTimelineMetricsSink.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-flume-sink/src/main/java/org/apache/hadoop/metrics2/sink/flume/FlumeTimelineMetricsSink.java b/ambari-metrics/ambari-metrics-flume-sink/src/main/java/org/apache/hadoop/metrics2/sink/flume/FlumeTimelineMetricsSink.java
new file mode 100644
index 0000000..87c4ab8
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-flume-sink/src/main/java/org/apache/hadoop/metrics2/sink/flume/FlumeTimelineMetricsSink.java
@@ -0,0 +1,176 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.metrics2.sink.flume;
+
+import org.apache.commons.lang.ClassUtils;
+import org.apache.commons.lang.math.NumberUtils;
+import org.apache.flume.Context;
+import org.apache.flume.FlumeException;
+import org.apache.flume.instrumentation.MonitorService;
+import org.apache.flume.instrumentation.util.JMXPollUtil;
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics;
+import org.apache.hadoop.metrics2.sink.timeline.base.AbstractTimelineMetricsSink;
+import org.apache.hadoop.metrics2.sink.timeline.cache.TimelineMetricsCache;
+import org.apache.hadoop.metrics2.sink.timeline.configuration.Configuration;
+import org.apache.hadoop.metrics2.util.Servers;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.net.UnknownHostException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+public class FlumeTimelineMetricsSink extends AbstractTimelineMetricsSink implements MonitorService {
+  private SocketAddress socketAddress;
+  private String collectorUri;
+  private TimelineMetricsCache metricsCache;
+  private ScheduledExecutorService scheduledExecutorService;
+  private long pollFrequency;
+  private String hostname;
+
+  @Override
+  public void start() {
+    LOG.info("Starting Flume Metrics Sink");
+    TimelineMetricsCollector timelineMetricsCollector = new TimelineMetricsCollector();
+    if (scheduledExecutorService == null || scheduledExecutorService.isShutdown() || scheduledExecutorService.isTerminated()) {
+      scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
+    }
+    scheduledExecutorService.scheduleWithFixedDelay(timelineMetricsCollector, 0,
+        pollFrequency, TimeUnit.MILLISECONDS);
+  }
+
+  @Override
+  public void stop() {
+    LOG.info("Stopping Flume Metrics Sink");
+    scheduledExecutorService.shutdown();
+  }
+
+  @Override
+  public void configure(Context context) {
+    LOG.info("Context parameters " + context);
+    try {
+      hostname = InetAddress.getLocalHost().getHostName();
+    } catch (UnknownHostException e) {
+      LOG.error("Could not identify hostname.");
+      throw new FlumeException("Could not identify hostname.", e);
+    }
+    Configuration configuration = new Configuration("/flume-metrics2.properties");
+    int maxRowCacheSize = Integer.parseInt(configuration.getProperty(MAX_METRIC_ROW_CACHE_SIZE,
+        String.valueOf(TimelineMetricsCache.MAX_RECS_PER_NAME_DEFAULT)));
+    int metricsSendInterval = Integer.parseInt(configuration.getProperty(METRICS_SEND_INTERVAL,
+        String.valueOf(TimelineMetricsCache.MAX_EVICTION_TIME_MILLIS)));
+    metricsCache = new TimelineMetricsCache(maxRowCacheSize, metricsSendInterval);
+    collectorUri = "http://" + configuration.getProperty(COLLECTOR_HOST_PROPERTY) + "/ws/v1/timeline/metrics";
+    List<InetSocketAddress> socketAddresses =
+        Servers.parse(configuration.getProperty(configuration.getProperty(COLLECTOR_HOST_PROPERTY)), 8188);
+    if (socketAddresses != null && !socketAddresses.isEmpty()) {
+      socketAddress = socketAddresses.get(0);
+    }
+    pollFrequency = Long.parseLong(configuration.getProperty("collectionFrequency"));
+  }
+
+  @Override
+  public SocketAddress getServerSocketAddress() {
+    return socketAddress;
+  }
+
+  @Override
+  public String getCollectorUri() {
+    return collectorUri;
+  }
+
+  public void setPollFrequency(long pollFrequency) {
+    this.pollFrequency = pollFrequency;
+  }
+
+  public void setMetricsCache(TimelineMetricsCache metricsCache) {
+    this.metricsCache = metricsCache;
+  }
+
+  /**
+   * Worker which polls JMX for all mbeans with
+   * {@link javax.management.ObjectName} within the flume namespace:
+   * org.apache.flume. All attributes of such beans are sent
+   * to the metrics collector service.
+   */
+  private class TimelineMetricsCollector implements Runnable {
+    @Override
+    public void run() {
+      LOG.debug("Collecting Metrics for Flume");
+      try {
+        Map<String, Map<String, String>> metricsMap =
+            JMXPollUtil.getAllMBeans();
+        long currentTimeMillis = System.currentTimeMillis();
+        for (String component : metricsMap.keySet()) {
+          Map<String, String> attributeMap = metricsMap.get(component);
+          LOG.info("Attributes for component " + component);
+          processComponentAttributes(currentTimeMillis, component, attributeMap);
+        }
+      } catch (Exception e) {
+        LOG.error("Unexpected error", e);
+      }
+      LOG.debug("Finished collecting Metrics for Flume");
+    }
+
+    private void processComponentAttributes(long currentTimeMillis, String component, Map<String, String> attributeMap) throws IOException {
+      List<TimelineMetric> metricList = new ArrayList<TimelineMetric>();
+      for (String attributeName : attributeMap.keySet()) {
+        String attributeValue = attributeMap.get(attributeName);
+        if (NumberUtils.isNumber(attributeValue)) {
+          LOG.info(attributeName + " = " + attributeValue);
+          TimelineMetric timelineMetric = createTimelineMetric(currentTimeMillis,
+              component, attributeName, attributeValue);
+          // Put intermediate values into the cache until it is time to send
+          metricsCache.putTimelineMetric(timelineMetric);
+
+          TimelineMetric cachedMetric = metricsCache.getTimelineMetric(attributeName);
+
+          if (cachedMetric != null) {
+            metricList.add(cachedMetric);
+          }
+        }
+      }
+
+      if (!metricList.isEmpty()) {
+        TimelineMetrics timelineMetrics = new TimelineMetrics();
+        timelineMetrics.setMetrics(metricList);
+        emitMetrics(timelineMetrics);
+      }
+    }
+
+    private TimelineMetric createTimelineMetric(long currentTimeMillis, String component, String attributeName, String attributeValue) {
+      TimelineMetric timelineMetric = new TimelineMetric();
+      timelineMetric.setMetricName(attributeName);
+      timelineMetric.setHostName(hostname);
+      timelineMetric.setAppId("flume." + component);
+      timelineMetric.setStartTime(currentTimeMillis);
+      timelineMetric.setType(ClassUtils.getShortCanonicalName(
+          attributeValue, "Number"));
+      timelineMetric.getMetricValues().put(currentTimeMillis, Double.parseDouble(attributeValue));
+      return timelineMetric;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/b42150ca/ambari-metrics/ambari-metrics-flume-sink/src/test/org/apache/hadoop/metrics2/sink/flume/FlumeTimelineMetricsSinkTest.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-flume-sink/src/test/org/apache/hadoop/metrics2/sink/flume/FlumeTimelineMetricsSinkTest.java b/ambari-metrics/ambari-metrics-flume-sink/src/test/org/apache/hadoop/metrics2/sink/flume/FlumeTimelineMetricsSinkTest.java
new file mode 100644
index 0000000..0275db6
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-flume-sink/src/test/org/apache/hadoop/metrics2/sink/flume/FlumeTimelineMetricsSinkTest.java
@@ -0,0 +1,117 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.metrics2.sink.flume;
+
+import org.apache.commons.httpclient.HttpClient;
+import org.apache.flume.instrumentation.util.JMXPollUtil;
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
+import org.apache.hadoop.metrics2.sink.timeline.cache.TimelineMetricsCache;
+import org.easymock.EasyMock;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import java.util.Collections;
+
+import static org.powermock.api.easymock.PowerMock.*;
+
+@RunWith(PowerMockRunner.class)
+@PrepareForTest(JMXPollUtil.class)
+public class FlumeTimelineMetricsSinkTest {
+  @Test
+  public void testNonNumericMetricMetricExclusion() throws InterruptedException {
+    FlumeTimelineMetricsSink flumeTimelineMetricsSink = new FlumeTimelineMetricsSink();
+    TimelineMetricsCache timelineMetricsCache = getTimelineMetricsCache(flumeTimelineMetricsSink);
+    flumeTimelineMetricsSink.setPollFrequency(1);
+    HttpClient httpClient = EasyMock.createNiceMock(HttpClient.class);
+    flumeTimelineMetricsSink.setHttpClient(httpClient);
+    mockStatic(JMXPollUtil.class);
+    EasyMock.expect(JMXPollUtil.getAllMBeans()).andReturn(
+        Collections.singletonMap("component1", Collections.singletonMap("key1", "value1"))).once();
+    replay(JMXPollUtil.class, timelineMetricsCache, httpClient);
+    flumeTimelineMetricsSink.start();
+    Thread.sleep(5);
+    flumeTimelineMetricsSink.stop();
+    verifyAll();
+  }
+
+  @Test
+  public void testNumericMetricMetricSubmission() throws InterruptedException {
+    FlumeTimelineMetricsSink flumeTimelineMetricsSink = new FlumeTimelineMetricsSink();
+    TimelineMetricsCache timelineMetricsCache = getTimelineMetricsCache(flumeTimelineMetricsSink);
+    flumeTimelineMetricsSink.setPollFrequency(1);
+    HttpClient httpClient = EasyMock.createNiceMock(HttpClient.class);
+    flumeTimelineMetricsSink.setHttpClient(httpClient);
+    mockStatic(JMXPollUtil.class);
+    EasyMock.expect(JMXPollUtil.getAllMBeans()).andReturn(
+        Collections.singletonMap("component1", Collections.singletonMap("key1", "42"))).once();
+    replay(JMXPollUtil.class, timelineMetricsCache, httpClient);
+    flumeTimelineMetricsSink.start();
+    Thread.sleep(5);
+    flumeTimelineMetricsSink.stop();
+    verifyAll();
+  }
+
+  private TimelineMetricsCache getTimelineMetricsCache(FlumeTimelineMetricsSink flumeTimelineMetricsSink) {
+    TimelineMetricsCache timelineMetricsCache = EasyMock.createNiceMock(TimelineMetricsCache.class);
+    flumeTimelineMetricsSink.setMetricsCache(timelineMetricsCache);
+    EasyMock.expect(timelineMetricsCache.getTimelineMetric("key1"))
+        .andReturn(new TimelineMetric()).once();
+    timelineMetricsCache.putTimelineMetric(EasyMock.anyObject(TimelineMetric.class));
+    EasyMock.expectLastCall().once();
+    return timelineMetricsCache;
+  }
+
+  @Test
+  public void testMonitorRestart() throws InterruptedException {
+    FlumeTimelineMetricsSink flumeTimelineMetricsSink = new FlumeTimelineMetricsSink();
+    TimelineMetricsCache timelineMetricsCache = getTimelineMetricsCache(flumeTimelineMetricsSink);
+    flumeTimelineMetricsSink.setPollFrequency(1);
+    HttpClient httpClient = EasyMock.createNiceMock(HttpClient.class);
+    flumeTimelineMetricsSink.setHttpClient(httpClient);
+    mockStatic(JMXPollUtil.class);
+    EasyMock.expect(JMXPollUtil.getAllMBeans()).andReturn(
+        Collections.singletonMap("component1", Collections.singletonMap("key1", "42"))).once();
+    flumeTimelineMetricsSink.start();
+    flumeTimelineMetricsSink.stop();
+    replay(JMXPollUtil.class, timelineMetricsCache, httpClient);
+    flumeTimelineMetricsSink.start();
+    Thread.sleep(5);
+    flumeTimelineMetricsSink.stop();
+    verifyAll();
+  }
+
+  @Test
+  public void testMetricsRetrievalExceptionTolerance() throws InterruptedException {
+    FlumeTimelineMetricsSink flumeTimelineMetricsSink = new FlumeTimelineMetricsSink();
+    TimelineMetricsCache timelineMetricsCache = getTimelineMetricsCache(flumeTimelineMetricsSink);
+    flumeTimelineMetricsSink.setPollFrequency(1);
+    HttpClient httpClient = EasyMock.createNiceMock(HttpClient.class);
+    flumeTimelineMetricsSink.setHttpClient(httpClient);
+    mockStatic(JMXPollUtil.class);
+    EasyMock.expect(JMXPollUtil.getAllMBeans()).
+        andThrow(new RuntimeException("Failed to retrieve Flume Properties")).once();
+    replay(JMXPollUtil.class, timelineMetricsCache, httpClient);
+    flumeTimelineMetricsSink.start();
+    Thread.sleep(5);
+    flumeTimelineMetricsSink.stop();
+    verifyAll();
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ambari/blob/b42150ca/ambari-metrics/ambari-metrics-hadoop-sink/pom.xml
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-hadoop-sink/pom.xml b/ambari-metrics/ambari-metrics-hadoop-sink/pom.xml
index 0397e2e..1e854e2 100644
--- a/ambari-metrics/ambari-metrics-hadoop-sink/pom.xml
+++ b/ambari-metrics/ambari-metrics-hadoop-sink/pom.xml
@@ -133,6 +133,11 @@ limitations under the License.
 
   <dependencies>
     <dependency>
+      <groupId>org.apache.ambari</groupId>
+      <artifactId>ambari-metrics-common</artifactId>
+        <version>0.1.0-SNAPSHOT</version>
+    </dependency>
+    <dependency>
       <groupId>org.apache.hadoop</groupId>
       <artifactId>hadoop-common</artifactId>
       <version>2.4.0</version>
@@ -176,12 +181,7 @@ limitations under the License.
     <dependency>
       <groupId>org.codehaus.jackson</groupId>
       <artifactId>jackson-core-asl</artifactId>
-      <version>1.9.9</version>
-    </dependency>
-    <dependency>
-      <groupId>org.codehaus.jackson</groupId>
-      <artifactId>jackson-mapper-asl</artifactId>
-      <version>1.9.13</version>
+      <version>1.8.0</version>
     </dependency>
   </dependencies>
 

http://git-wip-us.apache.org/repos/asf/ambari/blob/b42150ca/ambari-metrics/ambari-metrics-hadoop-sink/src/main/conf/hadoop-metrics2-hbase.properties.j2
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-hadoop-sink/src/main/conf/hadoop-metrics2-hbase.properties.j2 b/ambari-metrics/ambari-metrics-hadoop-sink/src/main/conf/hadoop-metrics2-hbase.properties.j2
index 6e64421..0d13498 100644
--- a/ambari-metrics/ambari-metrics-hadoop-sink/src/main/conf/hadoop-metrics2-hbase.properties.j2
+++ b/ambari-metrics/ambari-metrics-hadoop-sink/src/main/conf/hadoop-metrics2-hbase.properties.j2
@@ -29,21 +29,21 @@
 hbase.extendedperiod = 3600
 
 # Configuration of the "hbase" context for timeline metrics service
-hbase.class=org.apache.hadoop.metrics2.sink.timeline.TimelineMetricsSink
+hbase.class=org.apache.hadoop.metrics2.sink.timeline.HadoopTimelineMetricsSink
 hbase.period=10
 hbase.collector={{timeline_server_hosts}}:8188
 
 # Configuration of the "jvm" context for timeline metrics service
-jvm.class=org.apache.hadoop.metrics2.sink.timeline.TimelineMetricsSink
+jvm.class=org.apache.hadoop.metrics2.sink.timeline.HadoopTimelineMetricsSink
 jvm.period=10
 jvm.collector={{timeline_server_hosts}}:8188
 
 # Configuration of the "rpc" context for timeline metrics service
-rpc.class=org.apache.hadoop.metrics2.sink.timeline.TimelineMetricsSink
+rpc.class=org.apache.hadoop.metrics2.sink.timeline.HadoopTimelineMetricsSink
 rpc.period=10
 rpc.collector={{timeline_server_hosts}}:8188
 
 # Following hadoop example
-hbase.sink.timeline.class=org.apache.hadoop.metrics2.sink.timeline.TimelineMetricsSink
+hbase.sink.timeline.class=org.apache.hadoop.metrics2.sink.timeline.HadoopTimelineMetricsSink
 hbase.sink.timeline.period=10
 hbase.sink.timeline.collector={{timeline_server_hosts}}:8188
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ambari/blob/b42150ca/ambari-metrics/ambari-metrics-hadoop-sink/src/main/conf/hadoop-metrics2.properties.j2
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-hadoop-sink/src/main/conf/hadoop-metrics2.properties.j2 b/ambari-metrics/ambari-metrics-hadoop-sink/src/main/conf/hadoop-metrics2.properties.j2
index 7a00a7e..76a00d1 100644
--- a/ambari-metrics/ambari-metrics-hadoop-sink/src/main/conf/hadoop-metrics2.properties.j2
+++ b/ambari-metrics/ambari-metrics-hadoop-sink/src/main/conf/hadoop-metrics2.properties.j2
@@ -37,7 +37,7 @@
 {% if has_ganglia_server %}
 *.period=60
 
-*.sink.timeline.class=org.apache.hadoop.metrics2.sink.timeline.TimelineMetricsSink
+*.sink.timeline.class=org.apache.hadoop.metrics2.sink.timeline.HadoopTimelineMetricsSink
 *.sink.timeline.period=10
 
 

http://git-wip-us.apache.org/repos/asf/ambari/blob/b42150ca/ambari-metrics/ambari-metrics-hadoop-sink/src/main/java/org/apache/hadoop/metrics2/sink/timeline/AbstractTimelineMetricsSink.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-hadoop-sink/src/main/java/org/apache/hadoop/metrics2/sink/timeline/AbstractTimelineMetricsSink.java b/ambari-metrics/ambari-metrics-hadoop-sink/src/main/java/org/apache/hadoop/metrics2/sink/timeline/AbstractTimelineMetricsSink.java
deleted file mode 100644
index 2c42274..0000000
--- a/ambari-metrics/ambari-metrics-hadoop-sink/src/main/java/org/apache/hadoop/metrics2/sink/timeline/AbstractTimelineMetricsSink.java
+++ /dev/null
@@ -1,101 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.metrics2.sink.timeline;
-
-import org.apache.commons.configuration.SubsetConfiguration;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.metrics2.MetricsSink;
-import org.apache.hadoop.metrics2.util.Servers;
-import org.apache.hadoop.net.DNS;
-import java.net.SocketAddress;
-import java.net.UnknownHostException;
-import java.util.List;
-
-public abstract class AbstractTimelineMetricsSink implements MetricsSink {
-
-  public final Log LOG = LogFactory.getLog(this.getClass());
-
-  private SubsetConfiguration conf;
-  private String hostName = "UNKNOWN.example.com";
-  private String serviceName = "";
-  private final String COLLECTOR_HOST_PROPERTY = "collector";
-  private final int DEFAULT_PORT = 8188;
-
-  private List<? extends SocketAddress> metricsServers;
-  private String collectorUri;
-
-  @Override
-  public void init(SubsetConfiguration conf) {
-    this.conf = conf;
-    LOG.info("Initializing Timeline metrics sink.");
-
-    // Take the hostname from the DNS class.
-    if (conf.getString("slave.host.name") != null) {
-      hostName = conf.getString("slave.host.name");
-    } else {
-      try {
-        hostName = DNS.getDefaultHost(
-          conf.getString("dfs.datanode.dns.interface", "default"),
-          conf.getString("dfs.datanode.dns.nameserver", "default"));
-      } catch (UnknownHostException uhe) {
-        LOG.error(uhe);
-        hostName = "UNKNOWN.example.com";
-      }
-    }
-
-    serviceName = getFirstConfigPrefix(conf);
-
-    // Load collector configs
-    metricsServers = Servers.parse(conf.getString(COLLECTOR_HOST_PROPERTY),
-      DEFAULT_PORT);
-
-    if (metricsServers == null || metricsServers.isEmpty()) {
-      LOG.error("No Metric collector configured.");
-    } else {
-      collectorUri = "http://" + conf.getString(COLLECTOR_HOST_PROPERTY).trim()
-        + "/ws/v1/timeline/metrics";
-    }
-  }
-
-  protected String getHostName() {
-    return hostName;
-  }
-
-  protected String getServiceName() {
-    return serviceName;
-  }
-
-  private String getFirstConfigPrefix(SubsetConfiguration conf) {
-    while (conf.getParent() instanceof SubsetConfiguration) {
-      conf = (SubsetConfiguration) conf.getParent();
-    }
-    return conf.getPrefix();
-  }
-
-  protected SocketAddress getServerSocketAddress() {
-    if (metricsServers != null && !metricsServers.isEmpty()) {
-      return metricsServers.get(0);
-    }
-    return null;
-  }
-
-  protected String getCollectorUri() {
-    return collectorUri;
-  }
-}

http://git-wip-us.apache.org/repos/asf/ambari/blob/b42150ca/ambari-metrics/ambari-metrics-hadoop-sink/src/main/java/org/apache/hadoop/metrics2/sink/timeline/HadoopTimelineMetricsSink.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-hadoop-sink/src/main/java/org/apache/hadoop/metrics2/sink/timeline/HadoopTimelineMetricsSink.java b/ambari-metrics/ambari-metrics-hadoop-sink/src/main/java/org/apache/hadoop/metrics2/sink/timeline/HadoopTimelineMetricsSink.java
new file mode 100644
index 0000000..8fcf464
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-hadoop-sink/src/main/java/org/apache/hadoop/metrics2/sink/timeline/HadoopTimelineMetricsSink.java
@@ -0,0 +1,211 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.metrics2.sink.timeline;
+
+import org.apache.commons.configuration.SubsetConfiguration;
+import org.apache.commons.lang.ClassUtils;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.metrics2.*;
+import org.apache.hadoop.metrics2.impl.MsInfo;
+import org.apache.hadoop.metrics2.sink.timeline.base.AbstractTimelineMetricsSink;
+import org.apache.hadoop.metrics2.sink.timeline.cache.TimelineMetricsCache;
+import org.apache.hadoop.metrics2.util.Servers;
+import org.apache.hadoop.net.DNS;
+
+import java.io.IOException;
+import java.net.SocketAddress;
+import java.net.UnknownHostException;
+import java.util.*;
+
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+public class HadoopTimelineMetricsSink extends AbstractTimelineMetricsSink implements MetricsSink {
+  private Map<String, Set<String>> useTagsMap = new HashMap<String, Set<String>>();
+  private TimelineMetricsCache metricsCache;
+  private String hostName = "UNKNOWN.example.com";
+  private String serviceName = "";
+  private List<? extends SocketAddress> metricsServers;
+  private String collectorUri;
+
+  @Override
+  public void init(SubsetConfiguration conf) {
+    LOG.info("Initializing Timeline metrics sink.");
+
+    // Take the hostname from the DNS class.
+    if (conf.getString("slave.host.name") != null) {
+      hostName = conf.getString("slave.host.name");
+    } else {
+      try {
+        hostName = DNS.getDefaultHost(
+            conf.getString("dfs.datanode.dns.interface", "default"),
+            conf.getString("dfs.datanode.dns.nameserver", "default"));
+      } catch (UnknownHostException uhe) {
+        LOG.error(uhe);
+        hostName = "UNKNOWN.example.com";
+      }
+    }
+
+    serviceName = getFirstConfigPrefix(conf);
+
+    // Load collector configs
+    metricsServers = Servers.parse(conf.getString(COLLECTOR_HOST_PROPERTY), 8188);
+
+    if (metricsServers == null || metricsServers.isEmpty()) {
+      LOG.error("No Metric collector configured.");
+    } else {
+      collectorUri = "http://" + conf.getString(COLLECTOR_HOST_PROPERTY).trim()
+          + "/ws/v1/timeline/metrics";
+    }
+
+    int maxRowCacheSize = conf.getInt(MAX_METRIC_ROW_CACHE_SIZE,
+      TimelineMetricsCache.MAX_RECS_PER_NAME_DEFAULT);
+    int metricsSendInterval = conf.getInt(METRICS_SEND_INTERVAL,
+      TimelineMetricsCache.MAX_EVICTION_TIME_MILLIS); // ~ 1 min
+    metricsCache = new TimelineMetricsCache(maxRowCacheSize, metricsSendInterval);
+
+    conf.setListDelimiter(',');
+    Iterator<String> it = (Iterator<String>) conf.getKeys();
+    while (it.hasNext()) {
+      String propertyName = it.next();
+      if (propertyName != null && propertyName.startsWith(TAGS_FOR_PREFIX_PROPERTY_PREFIX)) {
+        String contextName = propertyName.substring(TAGS_FOR_PREFIX_PROPERTY_PREFIX.length());
+        String[] tags = conf.getStringArray(propertyName);
+        boolean useAllTags = false;
+        Set<String> set = null;
+        if (tags.length > 0) {
+          set = new HashSet<String>();
+          for (String tag : tags) {
+            tag = tag.trim();
+            useAllTags |= tag.equals("*");
+            if (tag.length() > 0) {
+              set.add(tag);
+            }
+          }
+          if (useAllTags) {
+            set = null;
+          }
+        }
+        useTagsMap.put(contextName, set);
+      }
+    }
+  }
+
+  private String getFirstConfigPrefix(SubsetConfiguration conf) {
+    while (conf.getParent() instanceof SubsetConfiguration) {
+      conf = (SubsetConfiguration) conf.getParent();
+    }
+    return conf.getPrefix();
+  }
+
+  protected SocketAddress getServerSocketAddress() {
+    if (metricsServers != null && !metricsServers.isEmpty()) {
+      return metricsServers.get(0);
+    }
+    return null;
+  }
+
+  @Override
+  protected String getCollectorUri() {
+    return collectorUri;
+  }
+
+  @Override
+  public void putMetrics(MetricsRecord record) {
+    try {
+      String recordName = record.name();
+      String contextName = record.context();
+
+      StringBuilder sb = new StringBuilder();
+      sb.append(contextName);
+      sb.append('.');
+      sb.append(recordName);
+
+      appendPrefix(record, sb);
+      sb.append(".");
+      int sbBaseLen = sb.length();
+
+      Collection<AbstractMetric> metrics =
+        (Collection<AbstractMetric>) record.metrics();
+
+      List<TimelineMetric> metricList = new ArrayList<TimelineMetric>();
+
+      for (AbstractMetric metric : metrics) {
+        sb.append(metric.name());
+        String name = sb.toString();
+        TimelineMetric timelineMetric = new TimelineMetric();
+        timelineMetric.setMetricName(name);
+        timelineMetric.setHostName(hostName);
+        timelineMetric.setAppId(serviceName);
+        timelineMetric.setStartTime(record.timestamp());
+        timelineMetric.setType(ClassUtils.getShortCanonicalName(
+          metric.value(), "Number"));
+        timelineMetric.getMetricValues().put(record.timestamp(),
+          metric.value().doubleValue());
+        // Put intermediate values into the cache until it is time to send
+        metricsCache.putTimelineMetric(timelineMetric);
+
+        // Retrieve all values from cache if it is time to send
+        TimelineMetric cachedMetric = metricsCache.getTimelineMetric(name);
+
+        if (cachedMetric != null) {
+          metricList.add(cachedMetric);
+        }
+
+        sb.setLength(sbBaseLen);
+      }
+
+      TimelineMetrics timelineMetrics = new TimelineMetrics();
+      timelineMetrics.setMetrics(metricList);
+
+      if (!metricList.isEmpty()) {
+        emitMetrics(timelineMetrics);
+      }
+    } catch (IOException io) {
+      throw new MetricsException("Failed to putMetrics", io);
+    }
+  }
+
+  // Taken as is from Ganglia30 implementation
+  @InterfaceAudience.Private
+  public void appendPrefix(MetricsRecord record, StringBuilder sb) {
+    String contextName = record.context();
+    Collection<MetricsTag> tags = record.tags();
+    if (useTagsMap.containsKey(contextName)) {
+      Set<String> useTags = useTagsMap.get(contextName);
+      for (MetricsTag t : tags) {
+        if (useTags == null || useTags.contains(t.name())) {
+
+          // the context is always skipped here because it is always added
+
+          // the hostname is always skipped to avoid case-mismatches
+          // from different DNSes.
+
+          if (t.info() != MsInfo.Context && t.info() != MsInfo.Hostname && t.value() != null) {
+            sb.append('.').append(t.name()).append('=').append(t.value());
+          }
+        }
+      }
+    }
+  }
+
+  @Override
+  public void flush() {
+    // TODO: Buffering implementation
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/b42150ca/ambari-metrics/ambari-metrics-hadoop-sink/src/main/java/org/apache/hadoop/metrics2/sink/timeline/TimelineMetric.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-hadoop-sink/src/main/java/org/apache/hadoop/metrics2/sink/timeline/TimelineMetric.java b/ambari-metrics/ambari-metrics-hadoop-sink/src/main/java/org/apache/hadoop/metrics2/sink/timeline/TimelineMetric.java
deleted file mode 100644
index 68b4be8..0000000
--- a/ambari-metrics/ambari-metrics-hadoop-sink/src/main/java/org/apache/hadoop/metrics2/sink/timeline/TimelineMetric.java
+++ /dev/null
@@ -1,172 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.metrics2.sink.timeline;
-
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.classification.InterfaceStability;
-import javax.xml.bind.annotation.XmlAccessType;
-import javax.xml.bind.annotation.XmlAccessorType;
-import javax.xml.bind.annotation.XmlElement;
-import javax.xml.bind.annotation.XmlRootElement;
-import java.util.Map;
-import java.util.TreeMap;
-
-@XmlRootElement(name = "metric")
-@XmlAccessorType(XmlAccessType.NONE)
-@InterfaceAudience.Public
-@InterfaceStability.Unstable
-public class TimelineMetric implements Comparable<TimelineMetric> {
-
-  private String metricName;
-  private String appId;
-  private String instanceId;
-  private String hostName;
-  private long timestamp;
-  private long startTime;
-  private String type;
-  private Map<Long, Double> metricValues = new TreeMap<Long, Double>();
-
-  @XmlElement(name = "metricname")
-  public String getMetricName() {
-    return metricName;
-  }
-
-  public void setMetricName(String metricName) {
-    this.metricName = metricName;
-  }
-
-  @XmlElement(name = "appid")
-  public String getAppId() {
-    return appId;
-  }
-
-  public void setAppId(String appId) {
-    this.appId = appId;
-  }
-
-  @XmlElement(name = "instanceid")
-  public String getInstanceId() {
-    return instanceId;
-  }
-
-  public void setInstanceId(String instanceId) {
-    this.instanceId = instanceId;
-  }
-
-  @XmlElement(name = "hostname")
-  public String getHostName() {
-    return hostName;
-  }
-
-  public void setHostName(String hostName) {
-    this.hostName = hostName;
-  }
-
-  @XmlElement(name = "timestamp")
-  public long getTimestamp() {
-    return timestamp;
-  }
-
-  public void setTimestamp(long timestamp) {
-    this.timestamp = timestamp;
-  }
-
-  @XmlElement(name = "starttime")
-  public long getStartTime() {
-    return startTime;
-  }
-
-  public void setStartTime(long startTime) {
-    this.startTime = startTime;
-  }
-
-  @XmlElement(name = "type")
-  public String getType() {
-    return type;
-  }
-
-  public void setType(String type) {
-    this.type = type;
-  }
-
-  @XmlElement(name = "metrics")
-  public Map<Long, Double> getMetricValues() {
-    return metricValues;
-  }
-
-  public void setMetricValues(Map<Long, Double> metricValues) {
-    this.metricValues = metricValues;
-  }
-
-  public void addMetricValues(Map<Long, Double> metricValues) {
-    this.metricValues.putAll(metricValues);
-  }
-
-  @Override
-  public boolean equals(Object o) {
-    if (this == o) return true;
-    if (o == null || getClass() != o.getClass()) return false;
-
-    TimelineMetric metric = (TimelineMetric) o;
-
-    if (!metricName.equals(metric.metricName)) return false;
-    if (hostName != null ? !hostName.equals(metric.hostName) : metric.hostName != null)
-      return false;
-    if (appId != null ? !appId.equals(metric.appId) : metric.appId != null)
-      return false;
-    if (instanceId != null ? !instanceId.equals(metric.instanceId) : metric.instanceId != null)
-      return false;
-    if (timestamp != metric.timestamp) return false;
-    if (startTime != metric.startTime) return false;
-
-    return true;
-  }
-
-  public boolean equalsExceptTime(TimelineMetric metric) {
-    if (!metricName.equals(metric.metricName)) return false;
-    if (hostName != null ? !hostName.equals(metric.hostName) : metric.hostName != null)
-      return false;
-    if (appId != null ? !appId.equals(metric.appId) : metric.appId != null)
-      return false;
-    if (instanceId != null ? !instanceId.equals(metric.instanceId) : metric.instanceId != null)
-      return false;
-
-    return true;
-  }
-
-  @Override
-  public int hashCode() {
-    int result = metricName.hashCode();
-    result = 31 * result + (appId != null ? appId.hashCode() : 0);
-    result = 31 * result + (instanceId != null ? instanceId.hashCode() : 0);
-    result = 31 * result + (hostName != null ? hostName.hashCode() : 0);
-    result = 31 * result + (int) (timestamp ^ (timestamp >>> 32));
-    return result;
-  }
-
-  @Override
-  public int compareTo(TimelineMetric other) {
-    if (timestamp > other.timestamp) {
-      return -1;
-    } else if (timestamp < other.timestamp) {
-      return 1;
-    } else {
-      return metricName.compareTo(other.metricName);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/ambari/blob/b42150ca/ambari-metrics/ambari-metrics-hadoop-sink/src/main/java/org/apache/hadoop/metrics2/sink/timeline/TimelineMetrics.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-hadoop-sink/src/main/java/org/apache/hadoop/metrics2/sink/timeline/TimelineMetrics.java b/ambari-metrics/ambari-metrics-hadoop-sink/src/main/java/org/apache/hadoop/metrics2/sink/timeline/TimelineMetrics.java
deleted file mode 100644
index a6c925a..0000000
--- a/ambari-metrics/ambari-metrics-hadoop-sink/src/main/java/org/apache/hadoop/metrics2/sink/timeline/TimelineMetrics.java
+++ /dev/null
@@ -1,102 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.metrics2.sink.timeline;
-
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.classification.InterfaceStability;
-import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
-
-import javax.xml.bind.annotation.XmlAccessType;
-import javax.xml.bind.annotation.XmlAccessorType;
-import javax.xml.bind.annotation.XmlElement;
-import javax.xml.bind.annotation.XmlRootElement;
-import java.util.ArrayList;
-import java.util.List;
-
-/**
- * The class that hosts a list of timeline entities.
- */
-@XmlRootElement(name = "metrics")
-@XmlAccessorType(XmlAccessType.NONE)
-@InterfaceAudience.Public
-@InterfaceStability.Unstable
-public class TimelineMetrics {
-
-  private List<TimelineMetric> allMetrics = new ArrayList<TimelineMetric>();
-
-  public TimelineMetrics() {}
-
-  @XmlElement(name = "metrics")
-  public List<TimelineMetric> getMetrics() {
-    return allMetrics;
-  }
-
-  public void setMetrics(List<TimelineMetric> allMetrics) {
-    this.allMetrics = allMetrics;
-  }
-
-  private boolean isEqualTimelineMetrics(TimelineMetric metric1,
-                                         TimelineMetric metric2) {
-
-    boolean isEqual = true;
-
-    if (!metric1.getMetricName().equals(metric2.getMetricName())) {
-      return false;
-    }
-
-    if (metric1.getHostName() != null) {
-      isEqual = metric1.getHostName().equals(metric2.getHostName());
-    }
-
-    if (metric1.getAppId() != null) {
-      isEqual = metric1.getAppId().equals(metric2.getAppId());
-    }
-
-    return isEqual;
-  }
-
-  /**
-   * Merge with existing TimelineMetric if everything except startTime is
-   * the same.
-   * @param metric {@link TimelineMetric}
-   */
-  public void addOrMergeTimelineMetric(TimelineMetric metric) {
-    TimelineMetric metricToMerge = null;
-
-    if (!allMetrics.isEmpty()) {
-      for (TimelineMetric timelineMetric : allMetrics) {
-        if (timelineMetric.equalsExceptTime(metric)) {
-          metricToMerge = timelineMetric;
-          break;
-        }
-      }
-    }
-
-    if (metricToMerge != null) {
-      metricToMerge.addMetricValues(metric.getMetricValues());
-      if (metricToMerge.getTimestamp() > metric.getTimestamp()) {
-        metricToMerge.setTimestamp(metric.getTimestamp());
-      }
-      if (metricToMerge.getStartTime() > metric.getStartTime()) {
-        metricToMerge.setStartTime(metric.getStartTime());
-      }
-    } else {
-      allMetrics.add(metric);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/ambari/blob/b42150ca/ambari-metrics/ambari-metrics-hadoop-sink/src/main/java/org/apache/hadoop/metrics2/sink/timeline/TimelineMetricsCache.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-hadoop-sink/src/main/java/org/apache/hadoop/metrics2/sink/timeline/TimelineMetricsCache.java b/ambari-metrics/ambari-metrics-hadoop-sink/src/main/java/org/apache/hadoop/metrics2/sink/timeline/TimelineMetricsCache.java
deleted file mode 100644
index 36aaec2..0000000
--- a/ambari-metrics/ambari-metrics-hadoop-sink/src/main/java/org/apache/hadoop/metrics2/sink/timeline/TimelineMetricsCache.java
+++ /dev/null
@@ -1,128 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.metrics2.sink.timeline;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.classification.InterfaceStability;
-import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
-
-import java.util.LinkedHashMap;
-import java.util.Map;
-
-@InterfaceAudience.Public
-@InterfaceStability.Evolving
-public class TimelineMetricsCache {
-
-  private final TimelineMetricHolder timelineMetricCache = new TimelineMetricHolder();
-  private static final Log LOG = LogFactory.getLog(TimelineMetric.class);
-  static final int MAX_RECS_PER_NAME_DEFAULT = 10000;
-  static final int MAX_EVICTION_TIME_MILLIS = 59000; // ~ 1 min
-  private final int maxRecsPerName;
-  private final int maxEvictionTimeInMillis;
-
-  TimelineMetricsCache(int maxRecsPerName, int maxEvictionTimeInMillis) {
-    this.maxRecsPerName = maxRecsPerName;
-    this.maxEvictionTimeInMillis = maxEvictionTimeInMillis;
-  }
-
-  class TimelineMetricWrapper {
-    private long timeDiff = -1;
-    private long oldestTimestamp = -1;
-    private TimelineMetric timelineMetric;
-
-    TimelineMetricWrapper(TimelineMetric timelineMetric) {
-      this.timelineMetric = timelineMetric;
-      this.oldestTimestamp = timelineMetric.getStartTime();
-    }
-
-    private void updateTimeDiff(long timestamp) {
-      if (oldestTimestamp != -1 && timestamp > oldestTimestamp) {
-        timeDiff = timestamp - oldestTimestamp;
-      } else {
-        oldestTimestamp = timestamp;
-      }
-    }
-
-    public void putMetric(TimelineMetric metric) {
-      this.timelineMetric.addMetricValues(metric.getMetricValues());
-      updateTimeDiff(metric.getStartTime());
-    }
-
-    public long getTimeDiff() {
-      return timeDiff;
-    }
-
-    public TimelineMetric getTimelineMetric() {
-      return timelineMetric;
-    }
-  }
-
-  // TODO: Change to ConcurentHashMap with weighted eviction
-  class TimelineMetricHolder extends LinkedHashMap<String, TimelineMetricWrapper> {
-    private static final long serialVersionUID = 1L;
-    private boolean gotOverflow = false;
-
-    @Override
-    protected boolean removeEldestEntry(Map.Entry<String, TimelineMetricWrapper> eldest) {
-      boolean overflow = size() > maxRecsPerName;
-      if (overflow && !gotOverflow) {
-        LOG.warn("Metrics cache overflow at "+ size() +" for "+ eldest);
-        gotOverflow = true;
-      }
-      return overflow;
-    }
-
-    public TimelineMetric evict(String metricName) {
-      TimelineMetricWrapper metricWrapper = this.get(metricName);
-
-      if (metricWrapper == null
-        || metricWrapper.getTimeDiff() < maxEvictionTimeInMillis) {
-        return null;
-      }
-
-      TimelineMetric timelineMetric = metricWrapper.getTimelineMetric();
-      this.remove(metricName);
-
-      return timelineMetric;
-    }
-
-    public void put(String metricName, TimelineMetric timelineMetric) {
-
-      TimelineMetricWrapper metric = this.get(metricName);
-      if (metric == null) {
-        this.put(metricName, new TimelineMetricWrapper(timelineMetric));
-      } else {
-        metric.putMetric(timelineMetric);
-      }
-    }
-  }
-
-  public TimelineMetric getTimelineMetric(String metricName) {
-    if (timelineMetricCache.containsKey(metricName)) {
-      return timelineMetricCache.evict(metricName);
-    }
-
-    return null;
-  }
-
-  public void putTimelineMetric(TimelineMetric timelineMetric) {
-    timelineMetricCache.put(timelineMetric.getMetricName(), timelineMetric);
-  }
-}