You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by mp...@apache.org on 2014/12/05 20:42:13 UTC
[2/2] ambari git commit: AMBARI-8522. Enable Flume metrics sink to
AMS. (Szilard Nemethy via mpapirkovskyy)
AMBARI-8522. Enable Flume metrics sink to AMS. (Szilard Nemethy via mpapirkovskyy)
Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/b42150ca
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/b42150ca
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/b42150ca
Branch: refs/heads/trunk
Commit: b42150cad1fb21d777519e029154614072a5b219
Parents: 9cdd4e2
Author: Myroslav Papirkovskyy <mp...@hortonworks.com>
Authored: Fri Dec 5 21:34:58 2014 +0200
Committer: Myroslav Papirkovskyy <mp...@hortonworks.com>
Committed: Fri Dec 5 21:42:06 2014 +0200
----------------------------------------------------------------------
ambari-metrics/ambari-metrics-common/pom.xml | 81 +++++++
.../metrics2/sink/timeline/TimelineMetric.java | 172 +++++++++++++++
.../metrics2/sink/timeline/TimelineMetrics.java | 101 +++++++++
.../base/AbstractTimelineMetricsSink.java | 79 +++++++
.../timeline/cache/TimelineMetricsCache.java | 128 +++++++++++
.../timeline/configuration/Configuration.java | 62 ++++++
.../ambari-metrics-flume-sink/pom.xml | 181 ++++++++++++++++
.../src/main/assemblies/empty.xml | 21 ++
.../src/main/assemblies/sink.xml | 34 +++
.../src/main/conf/flume-metrics2.properties.j2 | 22 ++
.../sink/flume/FlumeTimelineMetricsSink.java | 176 ++++++++++++++++
.../flume/FlumeTimelineMetricsSinkTest.java | 117 ++++++++++
.../ambari-metrics-hadoop-sink/pom.xml | 12 +-
.../conf/hadoop-metrics2-hbase.properties.j2 | 8 +-
.../src/main/conf/hadoop-metrics2.properties.j2 | 2 +-
.../timeline/AbstractTimelineMetricsSink.java | 101 ---------
.../timeline/HadoopTimelineMetricsSink.java | 211 +++++++++++++++++++
.../metrics2/sink/timeline/TimelineMetric.java | 172 ---------------
.../metrics2/sink/timeline/TimelineMetrics.java | 102 ---------
.../sink/timeline/TimelineMetricsCache.java | 128 -----------
.../sink/timeline/TimelineMetricsSink.java | 211 -------------------
.../ambari-metrics-timelineservice/pom.xml | 2 +-
ambari-metrics/pom.xml | 2 +
23 files changed, 1399 insertions(+), 726 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ambari/blob/b42150ca/ambari-metrics/ambari-metrics-common/pom.xml
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-common/pom.xml b/ambari-metrics/ambari-metrics-common/pom.xml
new file mode 100644
index 0000000..786ad93
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-common/pom.xml
@@ -0,0 +1,81 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <parent>
+ <artifactId>ambari-metrics</artifactId>
+ <groupId>org.apache.ambari</groupId>
+ <version>0.1.0-SNAPSHOT</version>
+ </parent>
+ <modelVersion>4.0.0</modelVersion>
+ <artifactId>ambari-metrics-common</artifactId>
+ <build>
+ <plugins>
+ <plugin>
+ <artifactId>maven-compiler-plugin</artifactId>
+ <version>3.0</version>
+ </plugin>
+ <plugin>
+ <groupId>org.codehaus.mojo</groupId>
+ <artifactId>build-helper-maven-plugin</artifactId>
+ <version>1.8</version>
+ <executions>
+ <execution>
+ <id>parse-version</id>
+ <phase>validate</phase>
+ <goals>
+ <goal>parse-version</goal>
+ </goals>
+ </execution>
+ <execution>
+ <id>regex-property</id>
+ <goals>
+ <goal>regex-property</goal>
+ </goals>
+ <configuration>
+ <name>ambariVersion</name>
+ <value>${project.version}</value>
+ <regex>^([0-9]+)\.([0-9]+)\.([0-9]+)(\.|-).*</regex>
+ <replacement>$1.$2.$3</replacement>
+ <failIfNoMatch>false</failIfNoMatch>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+
+ <dependencies>
+ <dependency>
+ <groupId>commons-logging</groupId>
+ <artifactId>commons-logging</artifactId>
+ <version>1.1.1</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-common</artifactId>
+ <version>2.4.0</version>
+ </dependency>
+ <dependency>
+ <groupId>org.codehaus.jackson</groupId>
+ <artifactId>jackson-mapper-asl</artifactId>
+ <version>1.8.0</version>
+ </dependency>
+ </dependencies>
+</project>
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ambari/blob/b42150ca/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/TimelineMetric.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/TimelineMetric.java b/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/TimelineMetric.java
new file mode 100644
index 0000000..68b4be8
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/TimelineMetric.java
@@ -0,0 +1,172 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.metrics2.sink.timeline;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import javax.xml.bind.annotation.XmlAccessType;
+import javax.xml.bind.annotation.XmlAccessorType;
+import javax.xml.bind.annotation.XmlElement;
+import javax.xml.bind.annotation.XmlRootElement;
+import java.util.Map;
+import java.util.TreeMap;
+
+@XmlRootElement(name = "metric")
+@XmlAccessorType(XmlAccessType.NONE)
+@InterfaceAudience.Public
+@InterfaceStability.Unstable
+public class TimelineMetric implements Comparable<TimelineMetric> {
+
+ private String metricName;
+ private String appId;
+ private String instanceId;
+ private String hostName;
+ private long timestamp;
+ private long startTime;
+ private String type;
+ private Map<Long, Double> metricValues = new TreeMap<Long, Double>();
+
+ @XmlElement(name = "metricname")
+ public String getMetricName() {
+ return metricName;
+ }
+
+ public void setMetricName(String metricName) {
+ this.metricName = metricName;
+ }
+
+ @XmlElement(name = "appid")
+ public String getAppId() {
+ return appId;
+ }
+
+ public void setAppId(String appId) {
+ this.appId = appId;
+ }
+
+ @XmlElement(name = "instanceid")
+ public String getInstanceId() {
+ return instanceId;
+ }
+
+ public void setInstanceId(String instanceId) {
+ this.instanceId = instanceId;
+ }
+
+ @XmlElement(name = "hostname")
+ public String getHostName() {
+ return hostName;
+ }
+
+ public void setHostName(String hostName) {
+ this.hostName = hostName;
+ }
+
+ @XmlElement(name = "timestamp")
+ public long getTimestamp() {
+ return timestamp;
+ }
+
+ public void setTimestamp(long timestamp) {
+ this.timestamp = timestamp;
+ }
+
+ @XmlElement(name = "starttime")
+ public long getStartTime() {
+ return startTime;
+ }
+
+ public void setStartTime(long startTime) {
+ this.startTime = startTime;
+ }
+
+ @XmlElement(name = "type")
+ public String getType() {
+ return type;
+ }
+
+ public void setType(String type) {
+ this.type = type;
+ }
+
+ @XmlElement(name = "metrics")
+ public Map<Long, Double> getMetricValues() {
+ return metricValues;
+ }
+
+ public void setMetricValues(Map<Long, Double> metricValues) {
+ this.metricValues = metricValues;
+ }
+
+ public void addMetricValues(Map<Long, Double> metricValues) {
+ this.metricValues.putAll(metricValues);
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+
+ TimelineMetric metric = (TimelineMetric) o;
+
+ if (!metricName.equals(metric.metricName)) return false;
+ if (hostName != null ? !hostName.equals(metric.hostName) : metric.hostName != null)
+ return false;
+ if (appId != null ? !appId.equals(metric.appId) : metric.appId != null)
+ return false;
+ if (instanceId != null ? !instanceId.equals(metric.instanceId) : metric.instanceId != null)
+ return false;
+ if (timestamp != metric.timestamp) return false;
+ if (startTime != metric.startTime) return false;
+
+ return true;
+ }
+
+ public boolean equalsExceptTime(TimelineMetric metric) {
+ if (!metricName.equals(metric.metricName)) return false;
+ if (hostName != null ? !hostName.equals(metric.hostName) : metric.hostName != null)
+ return false;
+ if (appId != null ? !appId.equals(metric.appId) : metric.appId != null)
+ return false;
+ if (instanceId != null ? !instanceId.equals(metric.instanceId) : metric.instanceId != null)
+ return false;
+
+ return true;
+ }
+
+ @Override
+ public int hashCode() {
+ int result = metricName.hashCode();
+ result = 31 * result + (appId != null ? appId.hashCode() : 0);
+ result = 31 * result + (instanceId != null ? instanceId.hashCode() : 0);
+ result = 31 * result + (hostName != null ? hostName.hashCode() : 0);
+ result = 31 * result + (int) (timestamp ^ (timestamp >>> 32));
+ return result;
+ }
+
+ @Override
+ public int compareTo(TimelineMetric other) {
+ if (timestamp > other.timestamp) {
+ return -1;
+ } else if (timestamp < other.timestamp) {
+ return 1;
+ } else {
+ return metricName.compareTo(other.metricName);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/b42150ca/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/TimelineMetrics.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/TimelineMetrics.java b/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/TimelineMetrics.java
new file mode 100644
index 0000000..4355fb1
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/TimelineMetrics.java
@@ -0,0 +1,101 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.metrics2.sink.timeline;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+
+import javax.xml.bind.annotation.XmlAccessType;
+import javax.xml.bind.annotation.XmlAccessorType;
+import javax.xml.bind.annotation.XmlElement;
+import javax.xml.bind.annotation.XmlRootElement;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * The class that hosts a list of timeline entities.
+ */
+@XmlRootElement(name = "metrics")
+@XmlAccessorType(XmlAccessType.NONE)
+@InterfaceAudience.Public
+@InterfaceStability.Unstable
+public class TimelineMetrics {
+
+ private List<TimelineMetric> allMetrics = new ArrayList<TimelineMetric>();
+
+ public TimelineMetrics() {}
+
+ @XmlElement(name = "metrics")
+ public List<TimelineMetric> getMetrics() {
+ return allMetrics;
+ }
+
+ public void setMetrics(List<TimelineMetric> allMetrics) {
+ this.allMetrics = allMetrics;
+ }
+
+ private boolean isEqualTimelineMetrics(TimelineMetric metric1,
+ TimelineMetric metric2) {
+
+ boolean isEqual = true;
+
+ if (!metric1.getMetricName().equals(metric2.getMetricName())) {
+ return false;
+ }
+
+ if (metric1.getHostName() != null) {
+ isEqual = metric1.getHostName().equals(metric2.getHostName());
+ }
+
+ if (metric1.getAppId() != null) {
+ isEqual = metric1.getAppId().equals(metric2.getAppId());
+ }
+
+ return isEqual;
+ }
+
+ /**
+ * Merge with existing TimelineMetric if everything except startTime is
+ * the same.
+ * @param metric {@link TimelineMetric}
+ */
+ public void addOrMergeTimelineMetric(TimelineMetric metric) {
+ TimelineMetric metricToMerge = null;
+
+ if (!allMetrics.isEmpty()) {
+ for (TimelineMetric timelineMetric : allMetrics) {
+ if (timelineMetric.equalsExceptTime(metric)) {
+ metricToMerge = timelineMetric;
+ break;
+ }
+ }
+ }
+
+ if (metricToMerge != null) {
+ metricToMerge.addMetricValues(metric.getMetricValues());
+ if (metricToMerge.getTimestamp() > metric.getTimestamp()) {
+ metricToMerge.setTimestamp(metric.getTimestamp());
+ }
+ if (metricToMerge.getStartTime() > metric.getStartTime()) {
+ metricToMerge.setStartTime(metric.getStartTime());
+ }
+ } else {
+ allMetrics.add(metric);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/b42150ca/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/base/AbstractTimelineMetricsSink.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/base/AbstractTimelineMetricsSink.java b/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/base/AbstractTimelineMetricsSink.java
new file mode 100644
index 0000000..d51ee67
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/base/AbstractTimelineMetricsSink.java
@@ -0,0 +1,79 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.metrics2.sink.timeline.base;
+
+import org.apache.commons.httpclient.HttpClient;
+import org.apache.commons.httpclient.methods.PostMethod;
+import org.apache.commons.httpclient.methods.StringRequestEntity;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics;
+import org.codehaus.jackson.map.AnnotationIntrospector;
+import org.codehaus.jackson.map.ObjectMapper;
+import org.codehaus.jackson.map.annotate.JsonSerialize;
+import org.codehaus.jackson.xc.JaxbAnnotationIntrospector;
+
+import java.io.IOException;
+import java.net.SocketAddress;
+
+public abstract class AbstractTimelineMetricsSink {
+ public static final String TAGS_FOR_PREFIX_PROPERTY_PREFIX = "tagsForPrefix.";
+ public static final String MAX_METRIC_ROW_CACHE_SIZE = "maxRowCacheSize";
+ public static final String METRICS_SEND_INTERVAL = "sendInterval";
+ public static final String COLLECTOR_HOST_PROPERTY = "collector";
+
+ protected final Log LOG = LogFactory.getLog(this.getClass());
+ private HttpClient httpClient = new HttpClient();
+
+ protected static ObjectMapper mapper;
+
+ static {
+ mapper = new ObjectMapper();
+ AnnotationIntrospector introspector = new JaxbAnnotationIntrospector();
+ mapper.setAnnotationIntrospector(introspector);
+ mapper.getSerializationConfig()
+ .setSerializationInclusion(JsonSerialize.Inclusion.NON_NULL);
+ }
+
+ protected void emitMetrics(TimelineMetrics metrics) throws IOException {
+ String jsonData = mapper.writeValueAsString(metrics);
+
+ SocketAddress socketAddress = getServerSocketAddress();
+
+ if (socketAddress != null) {
+ StringRequestEntity requestEntity = new StringRequestEntity(jsonData, "application/json", "UTF-8");
+
+ PostMethod postMethod = new PostMethod(getCollectorUri());
+ postMethod.setRequestEntity(requestEntity);
+ int statusCode = httpClient.executeMethod(postMethod);
+ if (statusCode != 200) {
+ LOG.info("Unable to POST metrics to collector, " + getCollectorUri());
+ } else {
+ LOG.debug("Metrics posted to Collector " + getCollectorUri());
+ }
+ }
+ }
+
+ public void setHttpClient(HttpClient httpClient) {
+ this.httpClient = httpClient;
+ }
+
+ abstract protected SocketAddress getServerSocketAddress();
+
+ abstract protected String getCollectorUri();
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/b42150ca/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/cache/TimelineMetricsCache.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/cache/TimelineMetricsCache.java b/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/cache/TimelineMetricsCache.java
new file mode 100644
index 0000000..06c3441
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/cache/TimelineMetricsCache.java
@@ -0,0 +1,128 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.metrics2.sink.timeline.cache;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
+
+import java.util.LinkedHashMap;
+import java.util.Map;
+
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+public class TimelineMetricsCache {
+
+ private final TimelineMetricHolder timelineMetricCache = new TimelineMetricHolder();
+ private static final Log LOG = LogFactory.getLog(TimelineMetric.class);
+ public static final int MAX_RECS_PER_NAME_DEFAULT = 10000;
+ public static final int MAX_EVICTION_TIME_MILLIS = 59000; // ~ 1 min
+ private final int maxRecsPerName;
+ private final int maxEvictionTimeInMillis;
+
+ public TimelineMetricsCache(int maxRecsPerName, int maxEvictionTimeInMillis) {
+ this.maxRecsPerName = maxRecsPerName;
+ this.maxEvictionTimeInMillis = maxEvictionTimeInMillis;
+ }
+
+ class TimelineMetricWrapper {
+ private long timeDiff = -1;
+ private long oldestTimestamp = -1;
+ private TimelineMetric timelineMetric;
+
+ TimelineMetricWrapper(TimelineMetric timelineMetric) {
+ this.timelineMetric = timelineMetric;
+ this.oldestTimestamp = timelineMetric.getStartTime();
+ }
+
+ private void updateTimeDiff(long timestamp) {
+ if (oldestTimestamp != -1 && timestamp > oldestTimestamp) {
+ timeDiff = timestamp - oldestTimestamp;
+ } else {
+ oldestTimestamp = timestamp;
+ }
+ }
+
+ public void putMetric(TimelineMetric metric) {
+ this.timelineMetric.addMetricValues(metric.getMetricValues());
+ updateTimeDiff(metric.getStartTime());
+ }
+
+ public long getTimeDiff() {
+ return timeDiff;
+ }
+
+ public TimelineMetric getTimelineMetric() {
+ return timelineMetric;
+ }
+ }
+
+ // TODO: Change to ConcurentHashMap with weighted eviction
+ class TimelineMetricHolder extends LinkedHashMap<String, TimelineMetricWrapper> {
+ private static final long serialVersionUID = 1L;
+ private boolean gotOverflow = false;
+
+ @Override
+ protected boolean removeEldestEntry(Map.Entry<String, TimelineMetricWrapper> eldest) {
+ boolean overflow = size() > maxRecsPerName;
+ if (overflow && !gotOverflow) {
+ LOG.warn("Metrics cache overflow at "+ size() +" for "+ eldest);
+ gotOverflow = true;
+ }
+ return overflow;
+ }
+
+ public TimelineMetric evict(String metricName) {
+ TimelineMetricWrapper metricWrapper = this.get(metricName);
+
+ if (metricWrapper == null
+ || metricWrapper.getTimeDiff() < maxEvictionTimeInMillis) {
+ return null;
+ }
+
+ TimelineMetric timelineMetric = metricWrapper.getTimelineMetric();
+ this.remove(metricName);
+
+ return timelineMetric;
+ }
+
+ public void put(String metricName, TimelineMetric timelineMetric) {
+
+ TimelineMetricWrapper metric = this.get(metricName);
+ if (metric == null) {
+ this.put(metricName, new TimelineMetricWrapper(timelineMetric));
+ } else {
+ metric.putMetric(timelineMetric);
+ }
+ }
+ }
+
+ public TimelineMetric getTimelineMetric(String metricName) {
+ if (timelineMetricCache.containsKey(metricName)) {
+ return timelineMetricCache.evict(metricName);
+ }
+
+ return null;
+ }
+
+ public void putTimelineMetric(TimelineMetric timelineMetric) {
+ timelineMetricCache.put(timelineMetric.getMetricName(), timelineMetric);
+ }
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/b42150ca/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/configuration/Configuration.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/configuration/Configuration.java b/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/configuration/Configuration.java
new file mode 100644
index 0000000..940ea75
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/configuration/Configuration.java
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.metrics2.sink.timeline.configuration;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Properties;
+
+public class Configuration {
+ public final Log LOG = LogFactory.getLog(this.getClass());
+ private final Properties properties;
+
+ public Configuration(String configFile) {
+ properties = new Properties();
+
+ //Get property file stream from classpath
+ InputStream inputStream = Configuration.class.getResourceAsStream(configFile);
+
+ if (inputStream == null) {
+ throw new IllegalArgumentException(configFile + " not found in classpath");
+ }
+
+ // load the properties
+ try {
+ properties.load(inputStream);
+ inputStream.close();
+ } catch (FileNotFoundException fnf) {
+ LOG.info("No configuration file " + configFile + " found in classpath.", fnf);
+ } catch (IOException ie) {
+ throw new IllegalArgumentException("Can't read configuration file " +
+ configFile, ie);
+ }
+ }
+
+ public String getProperty(String key) {
+ return properties.getProperty(key);
+ }
+
+ public String getProperty(String key, String defaultValue) {
+ return properties.getProperty(key, defaultValue);
+ }
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/b42150ca/ambari-metrics/ambari-metrics-flume-sink/pom.xml
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-flume-sink/pom.xml b/ambari-metrics/ambari-metrics-flume-sink/pom.xml
new file mode 100644
index 0000000..3021438
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-flume-sink/pom.xml
@@ -0,0 +1,181 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements. See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
+ http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <parent>
+ <artifactId>ambari-metrics</artifactId>
+ <groupId>org.apache.ambari</groupId>
+ <version>0.1.0-SNAPSHOT</version>
+ </parent>
+ <modelVersion>4.0.0</modelVersion>
+ <artifactId>ambari-metrics-flume-sink</artifactId>
+ <version>0.1.0-SNAPSHOT</version>
+ <packaging>jar</packaging>
+ <build>
+ <plugins>
+ <plugin>
+ <artifactId>maven-dependency-plugin</artifactId>
+ <executions>
+ <execution>
+ <phase>package</phase>
+ <goals>
+ <goal>copy-dependencies</goal>
+ </goals>
+ <configuration>
+ <outputDirectory>${project.build.directory}/lib</outputDirectory>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <artifactId>maven-assembly-plugin</artifactId>
+ <configuration>
+ <descriptors>
+ <descriptor>src/main/assemblies/sink.xml</descriptor>
+ </descriptors>
+ <tarLongFileMode>gnu</tarLongFileMode>
+ </configuration>
+ <executions>
+ <execution>
+ <id>build-tarball</id>
+ <phase>package</phase>
+ <goals>
+ <goal>single</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <artifactId>maven-compiler-plugin</artifactId>
+ <version>3.0</version>
+ </plugin>
+ <plugin>
+ <groupId>org.codehaus.mojo</groupId>
+ <artifactId>build-helper-maven-plugin</artifactId>
+ <version>1.8</version>
+ <executions>
+ <execution>
+ <id>parse-version</id>
+ <phase>validate</phase>
+ <goals>
+ <goal>parse-version</goal>
+ </goals>
+ </execution>
+ <execution>
+ <id>regex-property</id>
+ <goals>
+ <goal>regex-property</goal>
+ </goals>
+ <configuration>
+ <name>ambariVersion</name>
+ <value>${project.version}</value>
+ <regex>^([0-9]+)\.([0-9]+)\.([0-9]+)(\.|-).*</regex>
+ <replacement>$1.$2.$3</replacement>
+ <failIfNoMatch>false</failIfNoMatch>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <groupId>com.github.goldin</groupId>
+ <artifactId>copy-maven-plugin</artifactId>
+ <version>0.2.5</version>
+ <executions>
+ <execution>
+ <id>create-archive</id>
+ <phase>none</phase>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <groupId>org.codehaus.mojo</groupId>
+ <artifactId>rpm-maven-plugin</artifactId>
+ <version>2.0.1</version>
+ <executions>
+ <execution>
+ <phase>none</phase>
+ <goals>
+ <goal>rpm</goal>
+ </goals>
+ </execution>
+ </executions>
+ <configuration>
+ <name>ambari-metrics-flume-sink</name>
+ <copyright>2012, Apache Software Foundation</copyright>
+ <group>Development</group>
+ <description>Maven Recipe: RPM Package.</description>
+ <mappings>
+ <mapping>
+ <directory>/usr/lib/flume/lib</directory>
+ <filemode>644</filemode>
+ <username>root</username>
+ <groupname>root</groupname>
+ <sources>
+ <source>
+ <location>target/${project.artifactId}-${project.version}.jar</location>
+ </source>
+ <source>
+ <location>target/lib</location>
+ </source>
+ </sources>
+ </mapping>
+ </mappings>
+ </configuration>
+ </plugin>
+ </plugins>
+ </build>
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.flume</groupId>
+ <artifactId>flume-ng-core</artifactId>
+ <version>1.5.1</version>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.ambari</groupId>
+ <artifactId>ambari-metrics-common</artifactId>
+ <version>0.1.0-SNAPSHOT</version>
+ </dependency>
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <scope>test</scope>
+ <version>4.10</version>
+ </dependency>
+ <dependency>
+ <groupId>org.easymock</groupId>
+ <artifactId>easymock</artifactId>
+ <version>3.2</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.powermock</groupId>
+ <artifactId>powermock-api-easymock</artifactId>
+ <version>1.4.9</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.powermock</groupId>
+ <artifactId>powermock-module-junit4</artifactId>
+ <version>1.4.9</version>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+</project>
http://git-wip-us.apache.org/repos/asf/ambari/blob/b42150ca/ambari-metrics/ambari-metrics-flume-sink/src/main/assemblies/empty.xml
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-flume-sink/src/main/assemblies/empty.xml b/ambari-metrics/ambari-metrics-flume-sink/src/main/assemblies/empty.xml
new file mode 100644
index 0000000..35738b1
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-flume-sink/src/main/assemblies/empty.xml
@@ -0,0 +1,21 @@
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<assembly>
+ <id>empty</id>
+ <formats/>
+</assembly>
http://git-wip-us.apache.org/repos/asf/ambari/blob/b42150ca/ambari-metrics/ambari-metrics-flume-sink/src/main/assemblies/sink.xml
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-flume-sink/src/main/assemblies/sink.xml b/ambari-metrics/ambari-metrics-flume-sink/src/main/assemblies/sink.xml
new file mode 100644
index 0000000..21a6b36
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-flume-sink/src/main/assemblies/sink.xml
@@ -0,0 +1,34 @@
+<?xml version="1.0"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<assembly>
+ <!--This 'all' id is not appended to the produced bundle because we do this:
+ http://maven.apache.org/plugins/maven-assembly-plugin/faq.html#required-classifiers
+ -->
+ <id>dist</id>
+ <formats>
+ <format>dir</format>
+ <format>tar.gz</format>
+ </formats>
+ <includeBaseDirectory>false</includeBaseDirectory>
+ <files>
+ <file>
+ <source>${project.build.directory}/${artifact.artifactId}-${artifact.version}.jar</source>
+ <outputDirectory>ambari-metrics-${project.version}/lib/ambari-metrics</outputDirectory>
+ </file>
+ </files>
+</assembly>
http://git-wip-us.apache.org/repos/asf/ambari/blob/b42150ca/ambari-metrics/ambari-metrics-flume-sink/src/main/conf/flume-metrics2.properties.j2
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-flume-sink/src/main/conf/flume-metrics2.properties.j2 b/ambari-metrics/ambari-metrics-flume-sink/src/main/conf/flume-metrics2.properties.j2
new file mode 100644
index 0000000..7458bf8
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-flume-sink/src/main/conf/flume-metrics2.properties.j2
@@ -0,0 +1,22 @@
+{#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#}
+
+collector={{metric_collector_host}}:8188
+collectionFrequency=60000
+maxRowCacheSize=10000
+sendInterval=59000
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ambari/blob/b42150ca/ambari-metrics/ambari-metrics-flume-sink/src/main/java/org/apache/hadoop/metrics2/sink/flume/FlumeTimelineMetricsSink.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-flume-sink/src/main/java/org/apache/hadoop/metrics2/sink/flume/FlumeTimelineMetricsSink.java b/ambari-metrics/ambari-metrics-flume-sink/src/main/java/org/apache/hadoop/metrics2/sink/flume/FlumeTimelineMetricsSink.java
new file mode 100644
index 0000000..87c4ab8
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-flume-sink/src/main/java/org/apache/hadoop/metrics2/sink/flume/FlumeTimelineMetricsSink.java
@@ -0,0 +1,176 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.metrics2.sink.flume;
+
+import org.apache.commons.lang.ClassUtils;
+import org.apache.commons.lang.math.NumberUtils;
+import org.apache.flume.Context;
+import org.apache.flume.FlumeException;
+import org.apache.flume.instrumentation.MonitorService;
+import org.apache.flume.instrumentation.util.JMXPollUtil;
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics;
+import org.apache.hadoop.metrics2.sink.timeline.base.AbstractTimelineMetricsSink;
+import org.apache.hadoop.metrics2.sink.timeline.cache.TimelineMetricsCache;
+import org.apache.hadoop.metrics2.sink.timeline.configuration.Configuration;
+import org.apache.hadoop.metrics2.util.Servers;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.net.UnknownHostException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+public class FlumeTimelineMetricsSink extends AbstractTimelineMetricsSink implements MonitorService {
+ private SocketAddress socketAddress;
+ private String collectorUri;
+ private TimelineMetricsCache metricsCache;
+ private ScheduledExecutorService scheduledExecutorService;
+ private long pollFrequency;
+ private String hostname;
+
+ @Override
+ public void start() {
+ LOG.info("Starting Flume Metrics Sink");
+ TimelineMetricsCollector timelineMetricsCollector = new TimelineMetricsCollector();
+ if (scheduledExecutorService == null || scheduledExecutorService.isShutdown() || scheduledExecutorService.isTerminated()) {
+ scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
+ }
+ scheduledExecutorService.scheduleWithFixedDelay(timelineMetricsCollector, 0,
+ pollFrequency, TimeUnit.MILLISECONDS);
+ }
+
+ @Override
+ public void stop() {
+ LOG.info("Stopping Flume Metrics Sink");
+ scheduledExecutorService.shutdown();
+ }
+
+ @Override
+ public void configure(Context context) {
+ LOG.info("Context parameters " + context);
+ try {
+ hostname = InetAddress.getLocalHost().getHostName();
+ } catch (UnknownHostException e) {
+ LOG.error("Could not identify hostname.");
+ throw new FlumeException("Could not identify hostname.", e);
+ }
+ Configuration configuration = new Configuration("/flume-metrics2.properties");
+ int maxRowCacheSize = Integer.parseInt(configuration.getProperty(MAX_METRIC_ROW_CACHE_SIZE,
+ String.valueOf(TimelineMetricsCache.MAX_RECS_PER_NAME_DEFAULT)));
+ int metricsSendInterval = Integer.parseInt(configuration.getProperty(METRICS_SEND_INTERVAL,
+ String.valueOf(TimelineMetricsCache.MAX_EVICTION_TIME_MILLIS)));
+ metricsCache = new TimelineMetricsCache(maxRowCacheSize, metricsSendInterval);
+ collectorUri = "http://" + configuration.getProperty(COLLECTOR_HOST_PROPERTY) + "/ws/v1/timeline/metrics";
+ List<InetSocketAddress> socketAddresses =
+ Servers.parse(configuration.getProperty(configuration.getProperty(COLLECTOR_HOST_PROPERTY)), 8188);
+ if (socketAddresses != null && !socketAddresses.isEmpty()) {
+ socketAddress = socketAddresses.get(0);
+ }
+ pollFrequency = Long.parseLong(configuration.getProperty("collectionFrequency"));
+ }
+
+ @Override
+ public SocketAddress getServerSocketAddress() {
+ return socketAddress;
+ }
+
+ @Override
+ public String getCollectorUri() {
+ return collectorUri;
+ }
+
+ public void setPollFrequency(long pollFrequency) {
+ this.pollFrequency = pollFrequency;
+ }
+
+ public void setMetricsCache(TimelineMetricsCache metricsCache) {
+ this.metricsCache = metricsCache;
+ }
+
+ /**
+ * Worker which polls JMX for all mbeans with
+ * {@link javax.management.ObjectName} within the flume namespace:
+ * org.apache.flume. All attributes of such beans are sent
+ * to the metrics collector service.
+ */
+ private class TimelineMetricsCollector implements Runnable {
+ @Override
+ public void run() {
+ LOG.debug("Collecting Metrics for Flume");
+ try {
+ Map<String, Map<String, String>> metricsMap =
+ JMXPollUtil.getAllMBeans();
+ long currentTimeMillis = System.currentTimeMillis();
+ for (String component : metricsMap.keySet()) {
+ Map<String, String> attributeMap = metricsMap.get(component);
+ LOG.info("Attributes for component " + component);
+ processComponentAttributes(currentTimeMillis, component, attributeMap);
+ }
+ } catch (Exception e) {
+ LOG.error("Unexpected error", e);
+ }
+ LOG.debug("Finished collecting Metrics for Flume");
+ }
+
+ private void processComponentAttributes(long currentTimeMillis, String component, Map<String, String> attributeMap) throws IOException {
+ List<TimelineMetric> metricList = new ArrayList<TimelineMetric>();
+ for (String attributeName : attributeMap.keySet()) {
+ String attributeValue = attributeMap.get(attributeName);
+ if (NumberUtils.isNumber(attributeValue)) {
+ LOG.info(attributeName + " = " + attributeValue);
+ TimelineMetric timelineMetric = createTimelineMetric(currentTimeMillis,
+ component, attributeName, attributeValue);
+ // Put intermediate values into the cache until it is time to send
+ metricsCache.putTimelineMetric(timelineMetric);
+
+ TimelineMetric cachedMetric = metricsCache.getTimelineMetric(attributeName);
+
+ if (cachedMetric != null) {
+ metricList.add(cachedMetric);
+ }
+ }
+ }
+
+ if (!metricList.isEmpty()) {
+ TimelineMetrics timelineMetrics = new TimelineMetrics();
+ timelineMetrics.setMetrics(metricList);
+ emitMetrics(timelineMetrics);
+ }
+ }
+
+ private TimelineMetric createTimelineMetric(long currentTimeMillis, String component, String attributeName, String attributeValue) {
+ TimelineMetric timelineMetric = new TimelineMetric();
+ timelineMetric.setMetricName(attributeName);
+ timelineMetric.setHostName(hostname);
+ timelineMetric.setAppId("flume." + component);
+ timelineMetric.setStartTime(currentTimeMillis);
+ timelineMetric.setType(ClassUtils.getShortCanonicalName(
+ attributeValue, "Number"));
+ timelineMetric.getMetricValues().put(currentTimeMillis, Double.parseDouble(attributeValue));
+ return timelineMetric;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/b42150ca/ambari-metrics/ambari-metrics-flume-sink/src/test/org/apache/hadoop/metrics2/sink/flume/FlumeTimelineMetricsSinkTest.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-flume-sink/src/test/org/apache/hadoop/metrics2/sink/flume/FlumeTimelineMetricsSinkTest.java b/ambari-metrics/ambari-metrics-flume-sink/src/test/org/apache/hadoop/metrics2/sink/flume/FlumeTimelineMetricsSinkTest.java
new file mode 100644
index 0000000..0275db6
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-flume-sink/src/test/org/apache/hadoop/metrics2/sink/flume/FlumeTimelineMetricsSinkTest.java
@@ -0,0 +1,117 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.metrics2.sink.flume;
+
+import org.apache.commons.httpclient.HttpClient;
+import org.apache.flume.instrumentation.util.JMXPollUtil;
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
+import org.apache.hadoop.metrics2.sink.timeline.cache.TimelineMetricsCache;
+import org.easymock.EasyMock;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import java.util.Collections;
+
+import static org.powermock.api.easymock.PowerMock.*;
+
+@RunWith(PowerMockRunner.class)
+@PrepareForTest(JMXPollUtil.class)
+public class FlumeTimelineMetricsSinkTest {
+ @Test
+ public void testNonNumericMetricMetricExclusion() throws InterruptedException {
+ FlumeTimelineMetricsSink flumeTimelineMetricsSink = new FlumeTimelineMetricsSink();
+ TimelineMetricsCache timelineMetricsCache = getTimelineMetricsCache(flumeTimelineMetricsSink);
+ flumeTimelineMetricsSink.setPollFrequency(1);
+ HttpClient httpClient = EasyMock.createNiceMock(HttpClient.class);
+ flumeTimelineMetricsSink.setHttpClient(httpClient);
+ mockStatic(JMXPollUtil.class);
+ EasyMock.expect(JMXPollUtil.getAllMBeans()).andReturn(
+ Collections.singletonMap("component1", Collections.singletonMap("key1", "value1"))).once();
+ replay(JMXPollUtil.class, timelineMetricsCache, httpClient);
+ flumeTimelineMetricsSink.start();
+ Thread.sleep(5);
+ flumeTimelineMetricsSink.stop();
+ verifyAll();
+ }
+
+ @Test
+ public void testNumericMetricMetricSubmission() throws InterruptedException {
+ FlumeTimelineMetricsSink flumeTimelineMetricsSink = new FlumeTimelineMetricsSink();
+ TimelineMetricsCache timelineMetricsCache = getTimelineMetricsCache(flumeTimelineMetricsSink);
+ flumeTimelineMetricsSink.setPollFrequency(1);
+ HttpClient httpClient = EasyMock.createNiceMock(HttpClient.class);
+ flumeTimelineMetricsSink.setHttpClient(httpClient);
+ mockStatic(JMXPollUtil.class);
+ EasyMock.expect(JMXPollUtil.getAllMBeans()).andReturn(
+ Collections.singletonMap("component1", Collections.singletonMap("key1", "42"))).once();
+ replay(JMXPollUtil.class, timelineMetricsCache, httpClient);
+ flumeTimelineMetricsSink.start();
+ Thread.sleep(5);
+ flumeTimelineMetricsSink.stop();
+ verifyAll();
+ }
+
+ private TimelineMetricsCache getTimelineMetricsCache(FlumeTimelineMetricsSink flumeTimelineMetricsSink) {
+ TimelineMetricsCache timelineMetricsCache = EasyMock.createNiceMock(TimelineMetricsCache.class);
+ flumeTimelineMetricsSink.setMetricsCache(timelineMetricsCache);
+ EasyMock.expect(timelineMetricsCache.getTimelineMetric("key1"))
+ .andReturn(new TimelineMetric()).once();
+ timelineMetricsCache.putTimelineMetric(EasyMock.anyObject(TimelineMetric.class));
+ EasyMock.expectLastCall().once();
+ return timelineMetricsCache;
+ }
+
+ @Test
+ public void testMonitorRestart() throws InterruptedException {
+ FlumeTimelineMetricsSink flumeTimelineMetricsSink = new FlumeTimelineMetricsSink();
+ TimelineMetricsCache timelineMetricsCache = getTimelineMetricsCache(flumeTimelineMetricsSink);
+ flumeTimelineMetricsSink.setPollFrequency(1);
+ HttpClient httpClient = EasyMock.createNiceMock(HttpClient.class);
+ flumeTimelineMetricsSink.setHttpClient(httpClient);
+ mockStatic(JMXPollUtil.class);
+ EasyMock.expect(JMXPollUtil.getAllMBeans()).andReturn(
+ Collections.singletonMap("component1", Collections.singletonMap("key1", "42"))).once();
+ flumeTimelineMetricsSink.start();
+ flumeTimelineMetricsSink.stop();
+ replay(JMXPollUtil.class, timelineMetricsCache, httpClient);
+ flumeTimelineMetricsSink.start();
+ Thread.sleep(5);
+ flumeTimelineMetricsSink.stop();
+ verifyAll();
+ }
+
+ @Test
+ public void testMetricsRetrievalExceptionTolerance() throws InterruptedException {
+ FlumeTimelineMetricsSink flumeTimelineMetricsSink = new FlumeTimelineMetricsSink();
+ TimelineMetricsCache timelineMetricsCache = getTimelineMetricsCache(flumeTimelineMetricsSink);
+ flumeTimelineMetricsSink.setPollFrequency(1);
+ HttpClient httpClient = EasyMock.createNiceMock(HttpClient.class);
+ flumeTimelineMetricsSink.setHttpClient(httpClient);
+ mockStatic(JMXPollUtil.class);
+ EasyMock.expect(JMXPollUtil.getAllMBeans()).
+ andThrow(new RuntimeException("Failed to retrieve Flume Properties")).once();
+ replay(JMXPollUtil.class, timelineMetricsCache, httpClient);
+ flumeTimelineMetricsSink.start();
+ Thread.sleep(5);
+ flumeTimelineMetricsSink.stop();
+ verifyAll();
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ambari/blob/b42150ca/ambari-metrics/ambari-metrics-hadoop-sink/pom.xml
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-hadoop-sink/pom.xml b/ambari-metrics/ambari-metrics-hadoop-sink/pom.xml
index 0397e2e..1e854e2 100644
--- a/ambari-metrics/ambari-metrics-hadoop-sink/pom.xml
+++ b/ambari-metrics/ambari-metrics-hadoop-sink/pom.xml
@@ -133,6 +133,11 @@ limitations under the License.
<dependencies>
<dependency>
+ <groupId>org.apache.ambari</groupId>
+ <artifactId>ambari-metrics-common</artifactId>
+ <version>0.1.0-SNAPSHOT</version>
+ </dependency>
+ <dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>2.4.0</version>
@@ -176,12 +181,7 @@ limitations under the License.
<dependency>
<groupId>org.codehaus.jackson</groupId>
<artifactId>jackson-core-asl</artifactId>
- <version>1.9.9</version>
- </dependency>
- <dependency>
- <groupId>org.codehaus.jackson</groupId>
- <artifactId>jackson-mapper-asl</artifactId>
- <version>1.9.13</version>
+ <version>1.8.0</version>
</dependency>
</dependencies>
http://git-wip-us.apache.org/repos/asf/ambari/blob/b42150ca/ambari-metrics/ambari-metrics-hadoop-sink/src/main/conf/hadoop-metrics2-hbase.properties.j2
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-hadoop-sink/src/main/conf/hadoop-metrics2-hbase.properties.j2 b/ambari-metrics/ambari-metrics-hadoop-sink/src/main/conf/hadoop-metrics2-hbase.properties.j2
index 6e64421..0d13498 100644
--- a/ambari-metrics/ambari-metrics-hadoop-sink/src/main/conf/hadoop-metrics2-hbase.properties.j2
+++ b/ambari-metrics/ambari-metrics-hadoop-sink/src/main/conf/hadoop-metrics2-hbase.properties.j2
@@ -29,21 +29,21 @@
hbase.extendedperiod = 3600
# Configuration of the "hbase" context for timeline metrics service
-hbase.class=org.apache.hadoop.metrics2.sink.timeline.TimelineMetricsSink
+hbase.class=org.apache.hadoop.metrics2.sink.timeline.HadoopTimelineMetricsSink
hbase.period=10
hbase.collector={{timeline_server_hosts}}:8188
# Configuration of the "jvm" context for timeline metrics service
-jvm.class=org.apache.hadoop.metrics2.sink.timeline.TimelineMetricsSink
+jvm.class=org.apache.hadoop.metrics2.sink.timeline.HadoopTimelineMetricsSink
jvm.period=10
jvm.collector={{timeline_server_hosts}}:8188
# Configuration of the "rpc" context for timeline metrics service
-rpc.class=org.apache.hadoop.metrics2.sink.timeline.TimelineMetricsSink
+rpc.class=org.apache.hadoop.metrics2.sink.timeline.HadoopTimelineMetricsSink
rpc.period=10
rpc.collector={{timeline_server_hosts}}:8188
# Following hadoop example
-hbase.sink.timeline.class=org.apache.hadoop.metrics2.sink.timeline.TimelineMetricsSink
+hbase.sink.timeline.class=org.apache.hadoop.metrics2.sink.timeline.HadoopTimelineMetricsSink
hbase.sink.timeline.period=10
hbase.sink.timeline.collector={{timeline_server_hosts}}:8188
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ambari/blob/b42150ca/ambari-metrics/ambari-metrics-hadoop-sink/src/main/conf/hadoop-metrics2.properties.j2
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-hadoop-sink/src/main/conf/hadoop-metrics2.properties.j2 b/ambari-metrics/ambari-metrics-hadoop-sink/src/main/conf/hadoop-metrics2.properties.j2
index 7a00a7e..76a00d1 100644
--- a/ambari-metrics/ambari-metrics-hadoop-sink/src/main/conf/hadoop-metrics2.properties.j2
+++ b/ambari-metrics/ambari-metrics-hadoop-sink/src/main/conf/hadoop-metrics2.properties.j2
@@ -37,7 +37,7 @@
{% if has_ganglia_server %}
*.period=60
-*.sink.timeline.class=org.apache.hadoop.metrics2.sink.timeline.TimelineMetricsSink
+*.sink.timeline.class=org.apache.hadoop.metrics2.sink.timeline.HadoopTimelineMetricsSink
*.sink.timeline.period=10
http://git-wip-us.apache.org/repos/asf/ambari/blob/b42150ca/ambari-metrics/ambari-metrics-hadoop-sink/src/main/java/org/apache/hadoop/metrics2/sink/timeline/AbstractTimelineMetricsSink.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-hadoop-sink/src/main/java/org/apache/hadoop/metrics2/sink/timeline/AbstractTimelineMetricsSink.java b/ambari-metrics/ambari-metrics-hadoop-sink/src/main/java/org/apache/hadoop/metrics2/sink/timeline/AbstractTimelineMetricsSink.java
deleted file mode 100644
index 2c42274..0000000
--- a/ambari-metrics/ambari-metrics-hadoop-sink/src/main/java/org/apache/hadoop/metrics2/sink/timeline/AbstractTimelineMetricsSink.java
+++ /dev/null
@@ -1,101 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.metrics2.sink.timeline;
-
-import org.apache.commons.configuration.SubsetConfiguration;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.metrics2.MetricsSink;
-import org.apache.hadoop.metrics2.util.Servers;
-import org.apache.hadoop.net.DNS;
-import java.net.SocketAddress;
-import java.net.UnknownHostException;
-import java.util.List;
-
-public abstract class AbstractTimelineMetricsSink implements MetricsSink {
-
- public final Log LOG = LogFactory.getLog(this.getClass());
-
- private SubsetConfiguration conf;
- private String hostName = "UNKNOWN.example.com";
- private String serviceName = "";
- private final String COLLECTOR_HOST_PROPERTY = "collector";
- private final int DEFAULT_PORT = 8188;
-
- private List<? extends SocketAddress> metricsServers;
- private String collectorUri;
-
- @Override
- public void init(SubsetConfiguration conf) {
- this.conf = conf;
- LOG.info("Initializing Timeline metrics sink.");
-
- // Take the hostname from the DNS class.
- if (conf.getString("slave.host.name") != null) {
- hostName = conf.getString("slave.host.name");
- } else {
- try {
- hostName = DNS.getDefaultHost(
- conf.getString("dfs.datanode.dns.interface", "default"),
- conf.getString("dfs.datanode.dns.nameserver", "default"));
- } catch (UnknownHostException uhe) {
- LOG.error(uhe);
- hostName = "UNKNOWN.example.com";
- }
- }
-
- serviceName = getFirstConfigPrefix(conf);
-
- // Load collector configs
- metricsServers = Servers.parse(conf.getString(COLLECTOR_HOST_PROPERTY),
- DEFAULT_PORT);
-
- if (metricsServers == null || metricsServers.isEmpty()) {
- LOG.error("No Metric collector configured.");
- } else {
- collectorUri = "http://" + conf.getString(COLLECTOR_HOST_PROPERTY).trim()
- + "/ws/v1/timeline/metrics";
- }
- }
-
- protected String getHostName() {
- return hostName;
- }
-
- protected String getServiceName() {
- return serviceName;
- }
-
- private String getFirstConfigPrefix(SubsetConfiguration conf) {
- while (conf.getParent() instanceof SubsetConfiguration) {
- conf = (SubsetConfiguration) conf.getParent();
- }
- return conf.getPrefix();
- }
-
- protected SocketAddress getServerSocketAddress() {
- if (metricsServers != null && !metricsServers.isEmpty()) {
- return metricsServers.get(0);
- }
- return null;
- }
-
- protected String getCollectorUri() {
- return collectorUri;
- }
-}
http://git-wip-us.apache.org/repos/asf/ambari/blob/b42150ca/ambari-metrics/ambari-metrics-hadoop-sink/src/main/java/org/apache/hadoop/metrics2/sink/timeline/HadoopTimelineMetricsSink.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-hadoop-sink/src/main/java/org/apache/hadoop/metrics2/sink/timeline/HadoopTimelineMetricsSink.java b/ambari-metrics/ambari-metrics-hadoop-sink/src/main/java/org/apache/hadoop/metrics2/sink/timeline/HadoopTimelineMetricsSink.java
new file mode 100644
index 0000000..8fcf464
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-hadoop-sink/src/main/java/org/apache/hadoop/metrics2/sink/timeline/HadoopTimelineMetricsSink.java
@@ -0,0 +1,211 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.metrics2.sink.timeline;
+
+import org.apache.commons.configuration.SubsetConfiguration;
+import org.apache.commons.lang.ClassUtils;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.metrics2.*;
+import org.apache.hadoop.metrics2.impl.MsInfo;
+import org.apache.hadoop.metrics2.sink.timeline.base.AbstractTimelineMetricsSink;
+import org.apache.hadoop.metrics2.sink.timeline.cache.TimelineMetricsCache;
+import org.apache.hadoop.metrics2.util.Servers;
+import org.apache.hadoop.net.DNS;
+
+import java.io.IOException;
+import java.net.SocketAddress;
+import java.net.UnknownHostException;
+import java.util.*;
+
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+public class HadoopTimelineMetricsSink extends AbstractTimelineMetricsSink implements MetricsSink {
+ private Map<String, Set<String>> useTagsMap = new HashMap<String, Set<String>>();
+ private TimelineMetricsCache metricsCache;
+ private String hostName = "UNKNOWN.example.com";
+ private String serviceName = "";
+ private List<? extends SocketAddress> metricsServers;
+ private String collectorUri;
+
+ @Override
+ public void init(SubsetConfiguration conf) {
+ LOG.info("Initializing Timeline metrics sink.");
+
+ // Take the hostname from the DNS class.
+ if (conf.getString("slave.host.name") != null) {
+ hostName = conf.getString("slave.host.name");
+ } else {
+ try {
+ hostName = DNS.getDefaultHost(
+ conf.getString("dfs.datanode.dns.interface", "default"),
+ conf.getString("dfs.datanode.dns.nameserver", "default"));
+ } catch (UnknownHostException uhe) {
+ LOG.error(uhe);
+ hostName = "UNKNOWN.example.com";
+ }
+ }
+
+ serviceName = getFirstConfigPrefix(conf);
+
+ // Load collector configs
+ metricsServers = Servers.parse(conf.getString(COLLECTOR_HOST_PROPERTY), 8188);
+
+ if (metricsServers == null || metricsServers.isEmpty()) {
+ LOG.error("No Metric collector configured.");
+ } else {
+ collectorUri = "http://" + conf.getString(COLLECTOR_HOST_PROPERTY).trim()
+ + "/ws/v1/timeline/metrics";
+ }
+
+ int maxRowCacheSize = conf.getInt(MAX_METRIC_ROW_CACHE_SIZE,
+ TimelineMetricsCache.MAX_RECS_PER_NAME_DEFAULT);
+ int metricsSendInterval = conf.getInt(METRICS_SEND_INTERVAL,
+ TimelineMetricsCache.MAX_EVICTION_TIME_MILLIS); // ~ 1 min
+ metricsCache = new TimelineMetricsCache(maxRowCacheSize, metricsSendInterval);
+
+ conf.setListDelimiter(',');
+ Iterator<String> it = (Iterator<String>) conf.getKeys();
+ while (it.hasNext()) {
+ String propertyName = it.next();
+ if (propertyName != null && propertyName.startsWith(TAGS_FOR_PREFIX_PROPERTY_PREFIX)) {
+ String contextName = propertyName.substring(TAGS_FOR_PREFIX_PROPERTY_PREFIX.length());
+ String[] tags = conf.getStringArray(propertyName);
+ boolean useAllTags = false;
+ Set<String> set = null;
+ if (tags.length > 0) {
+ set = new HashSet<String>();
+ for (String tag : tags) {
+ tag = tag.trim();
+ useAllTags |= tag.equals("*");
+ if (tag.length() > 0) {
+ set.add(tag);
+ }
+ }
+ if (useAllTags) {
+ set = null;
+ }
+ }
+ useTagsMap.put(contextName, set);
+ }
+ }
+ }
+
+ private String getFirstConfigPrefix(SubsetConfiguration conf) {
+ while (conf.getParent() instanceof SubsetConfiguration) {
+ conf = (SubsetConfiguration) conf.getParent();
+ }
+ return conf.getPrefix();
+ }
+
+ protected SocketAddress getServerSocketAddress() {
+ if (metricsServers != null && !metricsServers.isEmpty()) {
+ return metricsServers.get(0);
+ }
+ return null;
+ }
+
+ @Override
+ protected String getCollectorUri() {
+ return collectorUri;
+ }
+
+ @Override
+ public void putMetrics(MetricsRecord record) {
+ try {
+ String recordName = record.name();
+ String contextName = record.context();
+
+ StringBuilder sb = new StringBuilder();
+ sb.append(contextName);
+ sb.append('.');
+ sb.append(recordName);
+
+ appendPrefix(record, sb);
+ sb.append(".");
+ int sbBaseLen = sb.length();
+
+ Collection<AbstractMetric> metrics =
+ (Collection<AbstractMetric>) record.metrics();
+
+ List<TimelineMetric> metricList = new ArrayList<TimelineMetric>();
+
+ for (AbstractMetric metric : metrics) {
+ sb.append(metric.name());
+ String name = sb.toString();
+ TimelineMetric timelineMetric = new TimelineMetric();
+ timelineMetric.setMetricName(name);
+ timelineMetric.setHostName(hostName);
+ timelineMetric.setAppId(serviceName);
+ timelineMetric.setStartTime(record.timestamp());
+ timelineMetric.setType(ClassUtils.getShortCanonicalName(
+ metric.value(), "Number"));
+ timelineMetric.getMetricValues().put(record.timestamp(),
+ metric.value().doubleValue());
+ // Put intermediate values into the cache until it is time to send
+ metricsCache.putTimelineMetric(timelineMetric);
+
+ // Retrieve all values from cache if it is time to send
+ TimelineMetric cachedMetric = metricsCache.getTimelineMetric(name);
+
+ if (cachedMetric != null) {
+ metricList.add(cachedMetric);
+ }
+
+ sb.setLength(sbBaseLen);
+ }
+
+ TimelineMetrics timelineMetrics = new TimelineMetrics();
+ timelineMetrics.setMetrics(metricList);
+
+ if (!metricList.isEmpty()) {
+ emitMetrics(timelineMetrics);
+ }
+ } catch (IOException io) {
+ throw new MetricsException("Failed to putMetrics", io);
+ }
+ }
+
+ // Taken as is from Ganglia30 implementation
+ @InterfaceAudience.Private
+ public void appendPrefix(MetricsRecord record, StringBuilder sb) {
+ String contextName = record.context();
+ Collection<MetricsTag> tags = record.tags();
+ if (useTagsMap.containsKey(contextName)) {
+ Set<String> useTags = useTagsMap.get(contextName);
+ for (MetricsTag t : tags) {
+ if (useTags == null || useTags.contains(t.name())) {
+
+ // the context is always skipped here because it is always added
+
+ // the hostname is always skipped to avoid case-mismatches
+ // from different DNSes.
+
+ if (t.info() != MsInfo.Context && t.info() != MsInfo.Hostname && t.value() != null) {
+ sb.append('.').append(t.name()).append('=').append(t.value());
+ }
+ }
+ }
+ }
+ }
+
+ @Override
+ public void flush() {
+ // TODO: Buffering implementation
+ }
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/b42150ca/ambari-metrics/ambari-metrics-hadoop-sink/src/main/java/org/apache/hadoop/metrics2/sink/timeline/TimelineMetric.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-hadoop-sink/src/main/java/org/apache/hadoop/metrics2/sink/timeline/TimelineMetric.java b/ambari-metrics/ambari-metrics-hadoop-sink/src/main/java/org/apache/hadoop/metrics2/sink/timeline/TimelineMetric.java
deleted file mode 100644
index 68b4be8..0000000
--- a/ambari-metrics/ambari-metrics-hadoop-sink/src/main/java/org/apache/hadoop/metrics2/sink/timeline/TimelineMetric.java
+++ /dev/null
@@ -1,172 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.metrics2.sink.timeline;
-
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.classification.InterfaceStability;
-import javax.xml.bind.annotation.XmlAccessType;
-import javax.xml.bind.annotation.XmlAccessorType;
-import javax.xml.bind.annotation.XmlElement;
-import javax.xml.bind.annotation.XmlRootElement;
-import java.util.Map;
-import java.util.TreeMap;
-
-@XmlRootElement(name = "metric")
-@XmlAccessorType(XmlAccessType.NONE)
-@InterfaceAudience.Public
-@InterfaceStability.Unstable
-public class TimelineMetric implements Comparable<TimelineMetric> {
-
- private String metricName;
- private String appId;
- private String instanceId;
- private String hostName;
- private long timestamp;
- private long startTime;
- private String type;
- private Map<Long, Double> metricValues = new TreeMap<Long, Double>();
-
- @XmlElement(name = "metricname")
- public String getMetricName() {
- return metricName;
- }
-
- public void setMetricName(String metricName) {
- this.metricName = metricName;
- }
-
- @XmlElement(name = "appid")
- public String getAppId() {
- return appId;
- }
-
- public void setAppId(String appId) {
- this.appId = appId;
- }
-
- @XmlElement(name = "instanceid")
- public String getInstanceId() {
- return instanceId;
- }
-
- public void setInstanceId(String instanceId) {
- this.instanceId = instanceId;
- }
-
- @XmlElement(name = "hostname")
- public String getHostName() {
- return hostName;
- }
-
- public void setHostName(String hostName) {
- this.hostName = hostName;
- }
-
- @XmlElement(name = "timestamp")
- public long getTimestamp() {
- return timestamp;
- }
-
- public void setTimestamp(long timestamp) {
- this.timestamp = timestamp;
- }
-
- @XmlElement(name = "starttime")
- public long getStartTime() {
- return startTime;
- }
-
- public void setStartTime(long startTime) {
- this.startTime = startTime;
- }
-
- @XmlElement(name = "type")
- public String getType() {
- return type;
- }
-
- public void setType(String type) {
- this.type = type;
- }
-
- @XmlElement(name = "metrics")
- public Map<Long, Double> getMetricValues() {
- return metricValues;
- }
-
- public void setMetricValues(Map<Long, Double> metricValues) {
- this.metricValues = metricValues;
- }
-
- public void addMetricValues(Map<Long, Double> metricValues) {
- this.metricValues.putAll(metricValues);
- }
-
- @Override
- public boolean equals(Object o) {
- if (this == o) return true;
- if (o == null || getClass() != o.getClass()) return false;
-
- TimelineMetric metric = (TimelineMetric) o;
-
- if (!metricName.equals(metric.metricName)) return false;
- if (hostName != null ? !hostName.equals(metric.hostName) : metric.hostName != null)
- return false;
- if (appId != null ? !appId.equals(metric.appId) : metric.appId != null)
- return false;
- if (instanceId != null ? !instanceId.equals(metric.instanceId) : metric.instanceId != null)
- return false;
- if (timestamp != metric.timestamp) return false;
- if (startTime != metric.startTime) return false;
-
- return true;
- }
-
- public boolean equalsExceptTime(TimelineMetric metric) {
- if (!metricName.equals(metric.metricName)) return false;
- if (hostName != null ? !hostName.equals(metric.hostName) : metric.hostName != null)
- return false;
- if (appId != null ? !appId.equals(metric.appId) : metric.appId != null)
- return false;
- if (instanceId != null ? !instanceId.equals(metric.instanceId) : metric.instanceId != null)
- return false;
-
- return true;
- }
-
- @Override
- public int hashCode() {
- int result = metricName.hashCode();
- result = 31 * result + (appId != null ? appId.hashCode() : 0);
- result = 31 * result + (instanceId != null ? instanceId.hashCode() : 0);
- result = 31 * result + (hostName != null ? hostName.hashCode() : 0);
- result = 31 * result + (int) (timestamp ^ (timestamp >>> 32));
- return result;
- }
-
- @Override
- public int compareTo(TimelineMetric other) {
- if (timestamp > other.timestamp) {
- return -1;
- } else if (timestamp < other.timestamp) {
- return 1;
- } else {
- return metricName.compareTo(other.metricName);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/ambari/blob/b42150ca/ambari-metrics/ambari-metrics-hadoop-sink/src/main/java/org/apache/hadoop/metrics2/sink/timeline/TimelineMetrics.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-hadoop-sink/src/main/java/org/apache/hadoop/metrics2/sink/timeline/TimelineMetrics.java b/ambari-metrics/ambari-metrics-hadoop-sink/src/main/java/org/apache/hadoop/metrics2/sink/timeline/TimelineMetrics.java
deleted file mode 100644
index a6c925a..0000000
--- a/ambari-metrics/ambari-metrics-hadoop-sink/src/main/java/org/apache/hadoop/metrics2/sink/timeline/TimelineMetrics.java
+++ /dev/null
@@ -1,102 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.metrics2.sink.timeline;
-
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.classification.InterfaceStability;
-import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
-
-import javax.xml.bind.annotation.XmlAccessType;
-import javax.xml.bind.annotation.XmlAccessorType;
-import javax.xml.bind.annotation.XmlElement;
-import javax.xml.bind.annotation.XmlRootElement;
-import java.util.ArrayList;
-import java.util.List;
-
-/**
- * The class that hosts a list of timeline entities.
- */
-@XmlRootElement(name = "metrics")
-@XmlAccessorType(XmlAccessType.NONE)
-@InterfaceAudience.Public
-@InterfaceStability.Unstable
-public class TimelineMetrics {
-
- private List<TimelineMetric> allMetrics = new ArrayList<TimelineMetric>();
-
- public TimelineMetrics() {}
-
- @XmlElement(name = "metrics")
- public List<TimelineMetric> getMetrics() {
- return allMetrics;
- }
-
- public void setMetrics(List<TimelineMetric> allMetrics) {
- this.allMetrics = allMetrics;
- }
-
- private boolean isEqualTimelineMetrics(TimelineMetric metric1,
- TimelineMetric metric2) {
-
- boolean isEqual = true;
-
- if (!metric1.getMetricName().equals(metric2.getMetricName())) {
- return false;
- }
-
- if (metric1.getHostName() != null) {
- isEqual = metric1.getHostName().equals(metric2.getHostName());
- }
-
- if (metric1.getAppId() != null) {
- isEqual = metric1.getAppId().equals(metric2.getAppId());
- }
-
- return isEqual;
- }
-
- /**
- * Merge with existing TimelineMetric if everything except startTime is
- * the same.
- * @param metric {@link TimelineMetric}
- */
- public void addOrMergeTimelineMetric(TimelineMetric metric) {
- TimelineMetric metricToMerge = null;
-
- if (!allMetrics.isEmpty()) {
- for (TimelineMetric timelineMetric : allMetrics) {
- if (timelineMetric.equalsExceptTime(metric)) {
- metricToMerge = timelineMetric;
- break;
- }
- }
- }
-
- if (metricToMerge != null) {
- metricToMerge.addMetricValues(metric.getMetricValues());
- if (metricToMerge.getTimestamp() > metric.getTimestamp()) {
- metricToMerge.setTimestamp(metric.getTimestamp());
- }
- if (metricToMerge.getStartTime() > metric.getStartTime()) {
- metricToMerge.setStartTime(metric.getStartTime());
- }
- } else {
- allMetrics.add(metric);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/ambari/blob/b42150ca/ambari-metrics/ambari-metrics-hadoop-sink/src/main/java/org/apache/hadoop/metrics2/sink/timeline/TimelineMetricsCache.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-hadoop-sink/src/main/java/org/apache/hadoop/metrics2/sink/timeline/TimelineMetricsCache.java b/ambari-metrics/ambari-metrics-hadoop-sink/src/main/java/org/apache/hadoop/metrics2/sink/timeline/TimelineMetricsCache.java
deleted file mode 100644
index 36aaec2..0000000
--- a/ambari-metrics/ambari-metrics-hadoop-sink/src/main/java/org/apache/hadoop/metrics2/sink/timeline/TimelineMetricsCache.java
+++ /dev/null
@@ -1,128 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.metrics2.sink.timeline;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.classification.InterfaceStability;
-import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
-
-import java.util.LinkedHashMap;
-import java.util.Map;
-
-@InterfaceAudience.Public
-@InterfaceStability.Evolving
-public class TimelineMetricsCache {
-
- private final TimelineMetricHolder timelineMetricCache = new TimelineMetricHolder();
- private static final Log LOG = LogFactory.getLog(TimelineMetric.class);
- static final int MAX_RECS_PER_NAME_DEFAULT = 10000;
- static final int MAX_EVICTION_TIME_MILLIS = 59000; // ~ 1 min
- private final int maxRecsPerName;
- private final int maxEvictionTimeInMillis;
-
- TimelineMetricsCache(int maxRecsPerName, int maxEvictionTimeInMillis) {
- this.maxRecsPerName = maxRecsPerName;
- this.maxEvictionTimeInMillis = maxEvictionTimeInMillis;
- }
-
- class TimelineMetricWrapper {
- private long timeDiff = -1;
- private long oldestTimestamp = -1;
- private TimelineMetric timelineMetric;
-
- TimelineMetricWrapper(TimelineMetric timelineMetric) {
- this.timelineMetric = timelineMetric;
- this.oldestTimestamp = timelineMetric.getStartTime();
- }
-
- private void updateTimeDiff(long timestamp) {
- if (oldestTimestamp != -1 && timestamp > oldestTimestamp) {
- timeDiff = timestamp - oldestTimestamp;
- } else {
- oldestTimestamp = timestamp;
- }
- }
-
- public void putMetric(TimelineMetric metric) {
- this.timelineMetric.addMetricValues(metric.getMetricValues());
- updateTimeDiff(metric.getStartTime());
- }
-
- public long getTimeDiff() {
- return timeDiff;
- }
-
- public TimelineMetric getTimelineMetric() {
- return timelineMetric;
- }
- }
-
- // TODO: Change to ConcurentHashMap with weighted eviction
- class TimelineMetricHolder extends LinkedHashMap<String, TimelineMetricWrapper> {
- private static final long serialVersionUID = 1L;
- private boolean gotOverflow = false;
-
- @Override
- protected boolean removeEldestEntry(Map.Entry<String, TimelineMetricWrapper> eldest) {
- boolean overflow = size() > maxRecsPerName;
- if (overflow && !gotOverflow) {
- LOG.warn("Metrics cache overflow at "+ size() +" for "+ eldest);
- gotOverflow = true;
- }
- return overflow;
- }
-
- public TimelineMetric evict(String metricName) {
- TimelineMetricWrapper metricWrapper = this.get(metricName);
-
- if (metricWrapper == null
- || metricWrapper.getTimeDiff() < maxEvictionTimeInMillis) {
- return null;
- }
-
- TimelineMetric timelineMetric = metricWrapper.getTimelineMetric();
- this.remove(metricName);
-
- return timelineMetric;
- }
-
- public void put(String metricName, TimelineMetric timelineMetric) {
-
- TimelineMetricWrapper metric = this.get(metricName);
- if (metric == null) {
- this.put(metricName, new TimelineMetricWrapper(timelineMetric));
- } else {
- metric.putMetric(timelineMetric);
- }
- }
- }
-
- public TimelineMetric getTimelineMetric(String metricName) {
- if (timelineMetricCache.containsKey(metricName)) {
- return timelineMetricCache.evict(metricName);
- }
-
- return null;
- }
-
- public void putTimelineMetric(TimelineMetric timelineMetric) {
- timelineMetricCache.put(timelineMetric.getMetricName(), timelineMetric);
- }
-}