You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by mp...@apache.org on 2015/03/03 17:18:10 UTC

ambari git commit: AMBARI-9511. Remove hadoop-common*.jar dep form sinks. (mpapyrkovskyy)

Repository: ambari
Updated Branches:
  refs/heads/branch-2.0.0 24b2af40f -> 2f8801209


AMBARI-9511. Remove hadoop-common*.jar dep form sinks. (mpapyrkovskyy)


Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/2f880120
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/2f880120
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/2f880120

Branch: refs/heads/branch-2.0.0
Commit: 2f8801209443b57f150d079de646165f82b543fe
Parents: 24b2af4
Author: Myroslav Papirkovskyy <mp...@hortonworks.com>
Authored: Thu Feb 19 15:55:03 2015 +0200
Committer: Myroslav Papirkovskyy <mp...@hortonworks.com>
Committed: Tue Mar 3 18:02:33 2015 +0200

----------------------------------------------------------------------
 ambari-metrics/ambari-metrics-common/pom.xml    |  16 ++-
 .../timeline/AbstractTimelineMetricsSink.java   |   3 +
 .../metrics2/sink/timeline/TimelineMetric.java  |  10 +-
 .../metrics2/sink/timeline/TimelineMetrics.java |   9 +-
 .../timeline/cache/TimelineMetricsCache.java    |   9 +-
 .../timeline/configuration/Configuration.java   |   6 +-
 .../hadoop/metrics2/sink/util/Servers.java      | 111 +++++++++++++++++++
 .../cache/TimelineMetricsCacheTest.java         |   9 +-
 .../ambari-metrics-flume-sink/pom.xml           |   5 +
 .../sink/flume/FlumeTimelineMetricsSink.java    |  12 +-
 .../flume/FlumeTimelineMetricsSinkTest.java     |  12 +-
 .../ambari-metrics-hadoop-sink/pom.xml          |   5 +
 .../timeline/HadoopTimelineMetricsSink.java     |  27 +++--
 .../timeline/HadoopTimelineMetricsSinkTest.java |  41 +++----
 .../ambari-metrics-kafka-sink/pom.xml           |  12 +-
 .../kafka/KafkaTimelineMetricsReporter.java     |   2 +-
 .../ambari-metrics-storm-sink/pom.xml           |   5 +
 .../storm/StormTimelineMetricsReporter.java     |   3 +-
 .../sink/storm/StormTimelineMetricsSink.java    |   2 +-
 .../storm/StormTimelineMetricsSinkTest.java     |  20 ++--
 20 files changed, 240 insertions(+), 79 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ambari/blob/2f880120/ambari-metrics/ambari-metrics-common/pom.xml
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-common/pom.xml b/ambari-metrics/ambari-metrics-common/pom.xml
index 4658cfe..9cad5d4 100644
--- a/ambari-metrics/ambari-metrics-common/pom.xml
+++ b/ambari-metrics/ambari-metrics-common/pom.xml
@@ -41,14 +41,24 @@
       <version>1.1.1</version>
     </dependency>
     <dependency>
+      <groupId>commons-httpclient</groupId>
+      <artifactId>commons-httpclient</artifactId>
+      <version>3.1</version>
+    </dependency>
+    <dependency>
+      <groupId>org.codehaus.jackson</groupId>
+      <artifactId>jackson-xc</artifactId>
+      <version>1.9.13</version>
+    </dependency>
+    <dependency>
       <groupId>org.apache.hadoop</groupId>
-      <artifactId>hadoop-common</artifactId>
-      <version>2.4.0</version>
+      <artifactId>hadoop-annotations</artifactId>
+      <version>2.6.0</version>
     </dependency>
     <dependency>
       <groupId>org.codehaus.jackson</groupId>
       <artifactId>jackson-mapper-asl</artifactId>
-      <version>1.8.0</version>
+      <version>1.9.13</version>
     </dependency>
     <dependency>
       <groupId>junit</groupId>

http://git-wip-us.apache.org/repos/asf/ambari/blob/2f880120/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/AbstractTimelineMetricsSink.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/AbstractTimelineMetricsSink.java b/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/AbstractTimelineMetricsSink.java
index 17560ac..4f5c6a1 100644
--- a/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/AbstractTimelineMetricsSink.java
+++ b/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/AbstractTimelineMetricsSink.java
@@ -21,6 +21,9 @@ import java.io.IOException;
 import java.net.ConnectException;
 import java.net.SocketAddress;
 
+import java.io.IOException;
+import java.net.SocketAddress;
+
 import org.apache.commons.httpclient.HttpClient;
 import org.apache.commons.httpclient.methods.PostMethod;
 import org.apache.commons.httpclient.methods.StringRequestEntity;

http://git-wip-us.apache.org/repos/asf/ambari/blob/2f880120/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/TimelineMetric.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/TimelineMetric.java b/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/TimelineMetric.java
index 68b4be8..f482e54 100644
--- a/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/TimelineMetric.java
+++ b/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/TimelineMetric.java
@@ -17,14 +17,16 @@
  */
 package org.apache.hadoop.metrics2.sink.timeline;
 
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.classification.InterfaceStability;
+import java.util.Map;
+import java.util.TreeMap;
+
 import javax.xml.bind.annotation.XmlAccessType;
 import javax.xml.bind.annotation.XmlAccessorType;
 import javax.xml.bind.annotation.XmlElement;
 import javax.xml.bind.annotation.XmlRootElement;
-import java.util.Map;
-import java.util.TreeMap;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
 
 @XmlRootElement(name = "metric")
 @XmlAccessorType(XmlAccessType.NONE)

http://git-wip-us.apache.org/repos/asf/ambari/blob/2f880120/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/TimelineMetrics.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/TimelineMetrics.java b/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/TimelineMetrics.java
index 4355fb1..3eb0e89 100644
--- a/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/TimelineMetrics.java
+++ b/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/TimelineMetrics.java
@@ -17,15 +17,16 @@
  */
 package org.apache.hadoop.metrics2.sink.timeline;
 
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.classification.InterfaceStability;
+import java.util.ArrayList;
+import java.util.List;
 
 import javax.xml.bind.annotation.XmlAccessType;
 import javax.xml.bind.annotation.XmlAccessorType;
 import javax.xml.bind.annotation.XmlElement;
 import javax.xml.bind.annotation.XmlRootElement;
-import java.util.ArrayList;
-import java.util.List;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
 
 /**
  * The class that hosts a list of timeline entities.

http://git-wip-us.apache.org/repos/asf/ambari/blob/2f880120/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/cache/TimelineMetricsCache.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/cache/TimelineMetricsCache.java b/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/cache/TimelineMetricsCache.java
index 0f2c9a3..224b490 100644
--- a/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/cache/TimelineMetricsCache.java
+++ b/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/cache/TimelineMetricsCache.java
@@ -17,12 +17,10 @@
  */
 package org.apache.hadoop.metrics2.sink.timeline.cache;
 
-import com.google.common.base.Optional;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
-import org.apache.hadoop.metrics2.MetricType;
 import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
 
 import java.util.HashMap;
@@ -160,7 +158,8 @@ public class TimelineMetricsCache {
     String metricName = timelineMetric.getMetricName();
     double firstValue = timelineMetric.getMetricValues().size() > 0
         ? timelineMetric.getMetricValues().entrySet().iterator().next().getValue() : 0;
-    double previousValue = Optional.fromNullable(counterMetricLastValue.get(metricName)).or(firstValue);
+    Double value = counterMetricLastValue.get(metricName);
+    double previousValue = value != null ? value : firstValue;
     Map<Long, Double> metricValues = timelineMetric.getMetricValues();
     Map<Long, Double>   newMetricValues = new TreeMap<Long, Double>();
     for (Map.Entry<Long, Double> entry : metricValues.entrySet()) {
@@ -171,8 +170,8 @@ public class TimelineMetricsCache {
     counterMetricLastValue.put(metricName, previousValue);
   }
 
-  public void putTimelineMetric(TimelineMetric timelineMetric, MetricType type) {
-    if (type == MetricType.COUNTER) {
+  public void putTimelineMetric(TimelineMetric timelineMetric, boolean isCounter) {
+    if (isCounter) {
       transformMetricValuesToDerivative(timelineMetric);
     }
     putTimelineMetric(timelineMetric);

http://git-wip-us.apache.org/repos/asf/ambari/blob/2f880120/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/configuration/Configuration.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/configuration/Configuration.java b/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/configuration/Configuration.java
index 940ea75..a0380e1 100644
--- a/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/configuration/Configuration.java
+++ b/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/configuration/Configuration.java
@@ -18,14 +18,14 @@
 
 package org.apache.hadoop.metrics2.sink.timeline.configuration;
 
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-
 import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.io.InputStream;
 import java.util.Properties;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
 public class Configuration {
   public final Log LOG = LogFactory.getLog(this.getClass());
   private final Properties properties;

http://git-wip-us.apache.org/repos/asf/ambari/blob/2f880120/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/util/Servers.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/util/Servers.java b/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/util/Servers.java
new file mode 100644
index 0000000..76da0a2
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/util/Servers.java
@@ -0,0 +1,111 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.metrics2.sink.util;
+
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.URI;
+import java.net.UnknownHostException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+
+/**
+ * Helpers to handle server addresses
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+public class Servers {
+  /**
+   * This class is not intended to be instantiated
+   */
+  private Servers() {}
+
+  /**
+   * Parses a space and/or comma separated sequence of server specifications
+   * of the form <i>hostname</i> or <i>hostname:port</i>.  If
+   * the specs string is null, defaults to localhost:defaultPort.
+   *
+   * @param specs   server specs (see description)
+   * @param defaultPort the default port if not specified
+   * @return a list of InetSocketAddress objects.
+   */
+  public static List<InetSocketAddress> parse(String specs, int defaultPort) {
+    List<InetSocketAddress> result = new ArrayList<InetSocketAddress>();
+    if (specs == null) {
+      result.add(new InetSocketAddress("localhost", defaultPort));
+    } else {
+      String[] specStrings = specs.split("[ ,]+");
+      for (String specString : specStrings) {
+        result.add(createSocketAddr(specString, defaultPort));
+      }
+    }
+    return result;
+  }
+
+  /**
+   * @param host
+   * @param port
+   * @return a InetSocketAddress created with the specified host and port
+   */
+  private static InetSocketAddress createSocketAddr(String target, int defaultPort) {
+    String helpText = "";
+    if (target == null) {
+      throw new IllegalArgumentException("Target address cannot be null." + helpText);
+    }
+    boolean hasScheme = target.contains("://");
+    URI uri = null;
+    try {
+      uri = hasScheme ? URI.create(target) : URI.create("dummyscheme://" + target);
+    } catch (IllegalArgumentException e) {
+      throw new IllegalArgumentException("Does not contain a valid host:port authority: " + target + helpText);
+    }
+
+    String host = uri.getHost();
+    int port = uri.getPort();
+    if (port == -1) {
+      port = defaultPort;
+    }
+    String path = uri.getPath();
+
+    if ((host == null) || (port < 0) || (!hasScheme && path != null && !path.isEmpty())) {
+      throw new IllegalArgumentException("Does not contain a valid host:port authority: " + target + helpText);
+    }
+    return createSocketAddrForHost(host, port);
+  }
+
+  /**
+   * @param host
+   * @param port
+   * @return a InetSocketAddress created with the specified host and port
+   */
+  private static InetSocketAddress createSocketAddrForHost(String host, int port) {
+    InetSocketAddress addr;
+    try {
+      InetAddress iaddr = InetAddress.getByName(host);
+      iaddr = InetAddress.getByAddress(host, iaddr.getAddress());
+      addr = new InetSocketAddress(iaddr, port);
+    } catch (UnknownHostException e) {
+      addr = InetSocketAddress.createUnresolved(host, port);
+    }
+    return addr;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/2f880120/ambari-metrics/ambari-metrics-common/src/test/java/org/apache/hadoop/metrics2/sink/timeline/cache/TimelineMetricsCacheTest.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-common/src/test/java/org/apache/hadoop/metrics2/sink/timeline/cache/TimelineMetricsCacheTest.java b/ambari-metrics/ambari-metrics-common/src/test/java/org/apache/hadoop/metrics2/sink/timeline/cache/TimelineMetricsCacheTest.java
index 8f07a27..4a13d63 100644
--- a/ambari-metrics/ambari-metrics-common/src/test/java/org/apache/hadoop/metrics2/sink/timeline/cache/TimelineMetricsCacheTest.java
+++ b/ambari-metrics/ambari-metrics-common/src/test/java/org/apache/hadoop/metrics2/sink/timeline/cache/TimelineMetricsCacheTest.java
@@ -17,7 +17,6 @@
  */
 package org.apache.hadoop.metrics2.sink.timeline.cache;
 
-import org.apache.hadoop.metrics2.MetricType;
 import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
 import org.junit.Test;
 
@@ -41,13 +40,13 @@ public class TimelineMetricsCacheTest {
     TimelineMetric metric = createTimelineMetric(new TreeMap<Long, Double>() {{
       put(1L, 10.0);
     }}, DEFAULT_START_TIME);
-    timelineMetricsCache.putTimelineMetric(metric, MetricType.COUNTER);
+    timelineMetricsCache.putTimelineMetric(metric, true);
     metric = createTimelineMetric(new TreeMap<Long, Double>() {{
       put(2L, 10.0);
       put(3L, 20.0);
       put(4L, 30.0);
     }}, DEFAULT_START_TIME + 2 * TimelineMetricsCache.MAX_EVICTION_TIME_MILLIS);
-    timelineMetricsCache.putTimelineMetric(metric, MetricType.COUNTER);
+    timelineMetricsCache.putTimelineMetric(metric, true);
     TimelineMetric cachedMetric
         = timelineMetricsCache.getTimelineMetric(METRIC_NAME);
     assertEquals(0, cachedMetric.getMetricValues().get(1L), delta);
@@ -60,12 +59,12 @@ public class TimelineMetricsCacheTest {
       put(6L, 120.0);
       put(7L, 230.0);
     }}, DEFAULT_START_TIME + 3 * TimelineMetricsCache.MAX_EVICTION_TIME_MILLIS);
-    timelineMetricsCache.putTimelineMetric(metric, MetricType.COUNTER);
+    timelineMetricsCache.putTimelineMetric(metric, true);
     metric = createTimelineMetric(new TreeMap<Long, Double>() {{
       put(8L, 300.0);
     }}, DEFAULT_START_TIME + 5 * TimelineMetricsCache.MAX_EVICTION_TIME_MILLIS);
 
-    timelineMetricsCache.putTimelineMetric(metric, MetricType.COUNTER);
+    timelineMetricsCache.putTimelineMetric(metric, true);
     cachedMetric = timelineMetricsCache.getTimelineMetric(METRIC_NAME);
     assertEquals(70, cachedMetric.getMetricValues().get(5L), delta);
     assertEquals(20, cachedMetric.getMetricValues().get(6L), delta);

http://git-wip-us.apache.org/repos/asf/ambari/blob/2f880120/ambari-metrics/ambari-metrics-flume-sink/pom.xml
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-flume-sink/pom.xml b/ambari-metrics/ambari-metrics-flume-sink/pom.xml
index f11b8b2..a83d7b0 100644
--- a/ambari-metrics/ambari-metrics-flume-sink/pom.xml
+++ b/ambari-metrics/ambari-metrics-flume-sink/pom.xml
@@ -108,6 +108,11 @@ limitations under the License.
       <version>${project.version}</version>
     </dependency>
     <dependency>
+      <groupId>org.codehaus.jackson</groupId>
+      <artifactId>jackson-mapper-asl</artifactId>
+      <version>1.9.13</version>
+    </dependency>
+    <dependency>
       <groupId>junit</groupId>
       <artifactId>junit</artifactId>
       <scope>test</scope>

http://git-wip-us.apache.org/repos/asf/ambari/blob/2f880120/ambari-metrics/ambari-metrics-flume-sink/src/main/java/org/apache/hadoop/metrics2/sink/flume/FlumeTimelineMetricsSink.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-flume-sink/src/main/java/org/apache/hadoop/metrics2/sink/flume/FlumeTimelineMetricsSink.java b/ambari-metrics/ambari-metrics-flume-sink/src/main/java/org/apache/hadoop/metrics2/sink/flume/FlumeTimelineMetricsSink.java
index 0231b24..9e66c99 100644
--- a/ambari-metrics/ambari-metrics-flume-sink/src/main/java/org/apache/hadoop/metrics2/sink/flume/FlumeTimelineMetricsSink.java
+++ b/ambari-metrics/ambari-metrics-flume-sink/src/main/java/org/apache/hadoop/metrics2/sink/flume/FlumeTimelineMetricsSink.java
@@ -24,14 +24,13 @@ import org.apache.flume.Context;
 import org.apache.flume.FlumeException;
 import org.apache.flume.instrumentation.MonitorService;
 import org.apache.flume.instrumentation.util.JMXPollUtil;
-import org.apache.hadoop.metrics2.MetricType;
 import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
 import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics;
 import org.apache.hadoop.metrics2.sink.timeline.AbstractTimelineMetricsSink;
 import org.apache.hadoop.metrics2.sink.timeline.UnableToConnectException;
 import org.apache.hadoop.metrics2.sink.timeline.cache.TimelineMetricsCache;
 import org.apache.hadoop.metrics2.sink.timeline.configuration.Configuration;
-import org.apache.hadoop.metrics2.util.Servers;
+import org.apache.hadoop.metrics2.sink.util.Servers;
 
 import java.io.IOException;
 import java.net.InetAddress;
@@ -48,6 +47,8 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 
+
+
 public class FlumeTimelineMetricsSink extends AbstractTimelineMetricsSink implements MonitorService {
   private SocketAddress socketAddress;
   private String collectorUri;
@@ -158,7 +159,7 @@ public class FlumeTimelineMetricsSink extends AbstractTimelineMetricsSink implem
           TimelineMetric timelineMetric = createTimelineMetric(currentTimeMillis,
               component, attributeName, attributeValue);
           // Put intermediate values into the cache until it is time to send
-          metricsCache.putTimelineMetric(timelineMetric, getMetricType(attributeName));
+          metricsCache.putTimelineMetric(timelineMetric, isCounterMetric(attributeName));
 
           TimelineMetric cachedMetric = metricsCache.getTimelineMetric(attributeName);
 
@@ -189,8 +190,7 @@ public class FlumeTimelineMetricsSink extends AbstractTimelineMetricsSink implem
     }
   }
 
-  private MetricType getMetricType(String attributeName) {
-    return counterMetrics.contains(attributeName) ?
-      MetricType.COUNTER : MetricType.GAUGE;
+  private boolean isCounterMetric(String attributeName) {
+    return counterMetrics.contains(attributeName);
   }
 }

http://git-wip-us.apache.org/repos/asf/ambari/blob/2f880120/ambari-metrics/ambari-metrics-flume-sink/src/test/java/org/apache/hadoop/metrics2/sink/flume/FlumeTimelineMetricsSinkTest.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-flume-sink/src/test/java/org/apache/hadoop/metrics2/sink/flume/FlumeTimelineMetricsSinkTest.java b/ambari-metrics/ambari-metrics-flume-sink/src/test/java/org/apache/hadoop/metrics2/sink/flume/FlumeTimelineMetricsSinkTest.java
index 90831bf..647e026 100644
--- a/ambari-metrics/ambari-metrics-flume-sink/src/test/java/org/apache/hadoop/metrics2/sink/flume/FlumeTimelineMetricsSinkTest.java
+++ b/ambari-metrics/ambari-metrics-flume-sink/src/test/java/org/apache/hadoop/metrics2/sink/flume/FlumeTimelineMetricsSinkTest.java
@@ -18,6 +18,12 @@
 
 package org.apache.hadoop.metrics2.sink.flume;
 
+import static org.powermock.api.easymock.PowerMock.mockStatic;
+import static org.powermock.api.easymock.PowerMock.replay;
+import static org.powermock.api.easymock.PowerMock.verifyAll;
+
+import java.util.Collections;
+
 import org.apache.commons.httpclient.HttpClient;
 import org.apache.flume.instrumentation.util.JMXPollUtil;
 import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
@@ -27,10 +33,6 @@ import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.powermock.core.classloader.annotations.PrepareForTest;
 import org.powermock.modules.junit4.PowerMockRunner;
-import java.util.Collections;
-import static org.powermock.api.easymock.PowerMock.mockStatic;
-import static org.powermock.api.easymock.PowerMock.replay;
-import static org.powermock.api.easymock.PowerMock.verifyAll;
 
 @RunWith(PowerMockRunner.class)
 @PrepareForTest(JMXPollUtil.class)
@@ -102,4 +104,4 @@ public class FlumeTimelineMetricsSinkTest {
     collector.run();
     verifyAll();
   }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/2f880120/ambari-metrics/ambari-metrics-hadoop-sink/pom.xml
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-hadoop-sink/pom.xml b/ambari-metrics/ambari-metrics-hadoop-sink/pom.xml
index 848a8f2..ee3bcd8 100644
--- a/ambari-metrics/ambari-metrics-hadoop-sink/pom.xml
+++ b/ambari-metrics/ambari-metrics-hadoop-sink/pom.xml
@@ -158,6 +158,11 @@ limitations under the License.
       <scope>compile</scope>
     </dependency>
     <dependency>
+      <groupId>org.codehaus.jackson</groupId>
+      <artifactId>jackson-mapper-asl</artifactId>
+      <version>1.9.13</version>
+    </dependency>
+    <dependency>
       <groupId>junit</groupId>
       <artifactId>junit</artifactId>
       <scope>test</scope>

http://git-wip-us.apache.org/repos/asf/ambari/blob/2f880120/ambari-metrics/ambari-metrics-hadoop-sink/src/main/java/org/apache/hadoop/metrics2/sink/timeline/HadoopTimelineMetricsSink.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-hadoop-sink/src/main/java/org/apache/hadoop/metrics2/sink/timeline/HadoopTimelineMetricsSink.java b/ambari-metrics/ambari-metrics-hadoop-sink/src/main/java/org/apache/hadoop/metrics2/sink/timeline/HadoopTimelineMetricsSink.java
index 25058b3..06f6011 100644
--- a/ambari-metrics/ambari-metrics-hadoop-sink/src/main/java/org/apache/hadoop/metrics2/sink/timeline/HadoopTimelineMetricsSink.java
+++ b/ambari-metrics/ambari-metrics-hadoop-sink/src/main/java/org/apache/hadoop/metrics2/sink/timeline/HadoopTimelineMetricsSink.java
@@ -17,21 +17,33 @@
  */
 package org.apache.hadoop.metrics2.sink.timeline;
 
+import java.io.IOException;
+import java.net.SocketAddress;
+import java.net.UnknownHostException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
 import org.apache.commons.configuration.SubsetConfiguration;
 import org.apache.commons.lang.ClassUtils;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
-import org.apache.hadoop.metrics2.*;
+import org.apache.hadoop.metrics2.AbstractMetric;
+import org.apache.hadoop.metrics2.MetricsException;
+import org.apache.hadoop.metrics2.MetricsRecord;
+import org.apache.hadoop.metrics2.MetricsSink;
+import org.apache.hadoop.metrics2.MetricsTag;
+import org.apache.hadoop.metrics2.MetricType;
 import org.apache.hadoop.metrics2.impl.MsInfo;
 import org.apache.hadoop.metrics2.sink.timeline.cache.TimelineMetricsCache;
 import org.apache.hadoop.metrics2.util.Servers;
 import org.apache.hadoop.net.DNS;
 
-import java.io.IOException;
-import java.net.SocketAddress;
-import java.net.UnknownHostException;
-import java.util.*;
-
 @InterfaceAudience.Public
 @InterfaceStability.Evolving
 public class HadoopTimelineMetricsSink extends AbstractTimelineMetricsSink implements MetricsSink {
@@ -164,7 +176,8 @@ public class HadoopTimelineMetricsSink extends AbstractTimelineMetricsSink imple
         timelineMetric.setType(ClassUtils.getShortCanonicalName(value, "Number"));
         timelineMetric.getMetricValues().put(startTime, value.doubleValue());
         // Put intermediate values into the cache until it is time to send
-        metricsCache.putTimelineMetric(timelineMetric, metric.type());
+        boolean isCounter = MetricType.COUNTER == metric.type();
+        metricsCache.putTimelineMetric(timelineMetric, isCounter);
 
         // Retrieve all values from cache if it is time to send
         TimelineMetric cachedMetric = metricsCache.getTimelineMetric(name);

http://git-wip-us.apache.org/repos/asf/ambari/blob/2f880120/ambari-metrics/ambari-metrics-hadoop-sink/src/test/java/org/apache/hadoop/metrics2/sink/timeline/HadoopTimelineMetricsSinkTest.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-hadoop-sink/src/test/java/org/apache/hadoop/metrics2/sink/timeline/HadoopTimelineMetricsSinkTest.java b/ambari-metrics/ambari-metrics-hadoop-sink/src/test/java/org/apache/hadoop/metrics2/sink/timeline/HadoopTimelineMetricsSinkTest.java
index d7b5d73..dddbbd0 100644
--- a/ambari-metrics/ambari-metrics-hadoop-sink/src/test/java/org/apache/hadoop/metrics2/sink/timeline/HadoopTimelineMetricsSinkTest.java
+++ b/ambari-metrics/ambari-metrics-hadoop-sink/src/test/java/org/apache/hadoop/metrics2/sink/timeline/HadoopTimelineMetricsSinkTest.java
@@ -18,43 +18,34 @@
 
 package org.apache.hadoop.metrics2.sink.timeline;
 
-import org.apache.commons.configuration.SubsetConfiguration;
-import org.apache.commons.httpclient.HttpClient;
-import org.apache.commons.httpclient.methods.PostMethod;
-import org.apache.hadoop.metrics2.AbstractMetric;
-import org.apache.hadoop.metrics2.MetricsRecord;
-import org.apache.hadoop.metrics2.sink.timeline.cache.TimelineMetricsCache;
-import org.easymock.Capture;
-import org.easymock.EasyMock;
-import org.easymock.IAnswer;
-import org.easymock.IArgumentMatcher;
-import org.junit.Assert;
-import org.junit.Test;
-
-import java.lang.reflect.Field;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Set;
-
 import static org.apache.hadoop.metrics2.sink.timeline.AbstractTimelineMetricsSink.COLLECTOR_HOST_PROPERTY;
 import static org.apache.hadoop.metrics2.sink.timeline.AbstractTimelineMetricsSink.MAX_METRIC_ROW_CACHE_SIZE;
 import static org.apache.hadoop.metrics2.sink.timeline.AbstractTimelineMetricsSink.METRICS_SEND_INTERVAL;
 import static org.easymock.EasyMock.anyInt;
 import static org.easymock.EasyMock.anyObject;
-import static org.easymock.EasyMock.capture;
 import static org.easymock.EasyMock.createMockBuilder;
 import static org.easymock.EasyMock.createNiceMock;
 import static org.easymock.EasyMock.eq;
 import static org.easymock.EasyMock.expect;
 import static org.easymock.EasyMock.expectLastCall;
 import static org.easymock.EasyMock.replay;
-import static org.easymock.EasyMock.reportMatcher;
 import static org.easymock.EasyMock.verify;
 
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.commons.configuration.SubsetConfiguration;
+import org.apache.commons.httpclient.HttpClient;
+import org.apache.commons.httpclient.methods.PostMethod;
+import org.apache.hadoop.metrics2.AbstractMetric;
+import org.apache.hadoop.metrics2.MetricsRecord;
+import org.easymock.EasyMock;
+import org.easymock.IAnswer;
+import org.junit.Assert;
+import org.junit.Test;
+
 public class HadoopTimelineMetricsSinkTest {
 
   @Test
@@ -249,4 +240,4 @@ public class HadoopTimelineMetricsSinkTest {
     Assert.assertEquals(new Double(5.0), values.next());
     Assert.assertEquals(new Double(6.0), values.next());
   }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/2f880120/ambari-metrics/ambari-metrics-kafka-sink/pom.xml
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-kafka-sink/pom.xml b/ambari-metrics/ambari-metrics-kafka-sink/pom.xml
index e385935..2aefe1d 100644
--- a/ambari-metrics/ambari-metrics-kafka-sink/pom.xml
+++ b/ambari-metrics/ambari-metrics-kafka-sink/pom.xml
@@ -42,7 +42,7 @@ limitations under the License.
               <goal>copy-dependencies</goal>
             </goals>
             <configuration>
-              <includeArtifactIds>commons-codec,commons-collections,commons-httpclient,commons-lang,commons-logging,guava,jackson-core-asl,jackson-mapper-asl,jackson-xc,hadoop-common</includeArtifactIds>
+              <includeArtifactIds>commons-codec,commons-collections,commons-httpclient,commons-lang,commons-logging,guava,jackson-core-asl,jackson-mapper-asl,jackson-xc</includeArtifactIds>
               <outputDirectory>${project.build.directory}/lib</outputDirectory>
             </configuration>
           </execution>
@@ -130,6 +130,16 @@ limitations under the License.
       <version>2.2.0</version>
     </dependency>
     <dependency>
+      <groupId>commons-lang</groupId>
+      <artifactId>commons-lang</artifactId>
+      <version>2.6</version>
+    </dependency>
+    <dependency>
+      <groupId>org.codehaus.jackson</groupId>
+      <artifactId>jackson-mapper-asl</artifactId>
+      <version>1.9.13</version>
+    </dependency>
+    <dependency>
       <groupId>junit</groupId>
       <artifactId>junit</artifactId>
       <scope>test</scope>

http://git-wip-us.apache.org/repos/asf/ambari/blob/2f880120/ambari-metrics/ambari-metrics-kafka-sink/src/main/java/org/apache/hadoop/metrics2/sink/kafka/KafkaTimelineMetricsReporter.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-kafka-sink/src/main/java/org/apache/hadoop/metrics2/sink/kafka/KafkaTimelineMetricsReporter.java b/ambari-metrics/ambari-metrics-kafka-sink/src/main/java/org/apache/hadoop/metrics2/sink/kafka/KafkaTimelineMetricsReporter.java
index 63097e5..1f44494 100644
--- a/ambari-metrics/ambari-metrics-kafka-sink/src/main/java/org/apache/hadoop/metrics2/sink/kafka/KafkaTimelineMetricsReporter.java
+++ b/ambari-metrics/ambari-metrics-kafka-sink/src/main/java/org/apache/hadoop/metrics2/sink/kafka/KafkaTimelineMetricsReporter.java
@@ -40,7 +40,7 @@ import org.apache.hadoop.metrics2.sink.timeline.AbstractTimelineMetricsSink;
 import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
 import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics;
 import org.apache.hadoop.metrics2.sink.timeline.cache.TimelineMetricsCache;
-import org.apache.hadoop.metrics2.util.Servers;
+import org.apache.hadoop.metrics2.sink.util.Servers;
 
 import com.yammer.metrics.Metrics;
 import com.yammer.metrics.core.Counter;

http://git-wip-us.apache.org/repos/asf/ambari/blob/2f880120/ambari-metrics/ambari-metrics-storm-sink/pom.xml
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-storm-sink/pom.xml b/ambari-metrics/ambari-metrics-storm-sink/pom.xml
index d069622..70bdec5 100644
--- a/ambari-metrics/ambari-metrics-storm-sink/pom.xml
+++ b/ambari-metrics/ambari-metrics-storm-sink/pom.xml
@@ -113,6 +113,11 @@ limitations under the License.
       <version>${project.version}</version>
     </dependency>
     <dependency>
+      <groupId>org.codehaus.jackson</groupId>
+      <artifactId>jackson-mapper-asl</artifactId>
+      <version>1.9.13</version>
+    </dependency>
+    <dependency>
       <groupId>junit</groupId>
       <artifactId>junit</artifactId>
       <scope>test</scope>

http://git-wip-us.apache.org/repos/asf/ambari/blob/2f880120/ambari-metrics/ambari-metrics-storm-sink/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsReporter.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-storm-sink/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsReporter.java b/ambari-metrics/ambari-metrics-storm-sink/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsReporter.java
index c969299..2d4baa3 100644
--- a/ambari-metrics/ambari-metrics-storm-sink/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsReporter.java
+++ b/ambari-metrics/ambari-metrics-storm-sink/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsReporter.java
@@ -29,8 +29,7 @@ import org.apache.commons.lang.Validate;
 import org.apache.hadoop.metrics2.sink.timeline.AbstractTimelineMetricsSink;
 import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
 import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics;
-import org.apache.hadoop.metrics2.util.Servers;
-import org.codehaus.jackson.map.AnnotationIntrospector;
+import org.apache.hadoop.metrics2.sink.util.Servers;
 
 import java.net.InetAddress;
 import java.net.InetSocketAddress;

http://git-wip-us.apache.org/repos/asf/ambari/blob/2f880120/ambari-metrics/ambari-metrics-storm-sink/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSink.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-storm-sink/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSink.java b/ambari-metrics/ambari-metrics-storm-sink/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSink.java
index ffcc6ed..dd0e72f 100644
--- a/ambari-metrics/ambari-metrics-storm-sink/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSink.java
+++ b/ambari-metrics/ambari-metrics-storm-sink/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSink.java
@@ -30,7 +30,7 @@ import org.apache.hadoop.metrics2.sink.timeline.AbstractTimelineMetricsSink;
 import org.apache.hadoop.metrics2.sink.timeline.UnableToConnectException;
 import org.apache.hadoop.metrics2.sink.timeline.cache.TimelineMetricsCache;
 import org.apache.hadoop.metrics2.sink.timeline.configuration.Configuration;
-import org.apache.hadoop.metrics2.util.Servers;
+import org.apache.hadoop.metrics2.sink.util.Servers;
 
 import java.io.IOException;
 import java.net.InetAddress;

http://git-wip-us.apache.org/repos/asf/ambari/blob/2f880120/ambari-metrics/ambari-metrics-storm-sink/src/test/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSinkTest.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-storm-sink/src/test/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSinkTest.java b/ambari-metrics/ambari-metrics-storm-sink/src/test/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSinkTest.java
index 95a9329..15021e5 100644
--- a/ambari-metrics/ambari-metrics-storm-sink/src/test/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSinkTest.java
+++ b/ambari-metrics/ambari-metrics-storm-sink/src/test/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSinkTest.java
@@ -18,18 +18,24 @@
 
 package org.apache.hadoop.metrics2.sink.storm;
 
-import backtype.storm.metric.api.IMetricsConsumer;
+import static org.easymock.EasyMock.anyObject;
+import static org.easymock.EasyMock.createNiceMock;
+import static org.easymock.EasyMock.expect;
+import static org.easymock.EasyMock.expectLastCall;
+import static org.easymock.EasyMock.replay;
+import static org.easymock.EasyMock.verify;
+
+import java.io.IOException;
+import java.net.SocketAddress;
+import java.util.Collections;
+
 import org.apache.commons.httpclient.HttpClient;
 import org.apache.commons.httpclient.methods.PostMethod;
 import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
 import org.apache.hadoop.metrics2.sink.timeline.cache.TimelineMetricsCache;
 import org.junit.Test;
 
-import java.io.IOException;
-import java.net.SocketAddress;
-import java.util.Collections;
-
-import static org.easymock.EasyMock.*;
+import backtype.storm.metric.api.IMetricsConsumer;
 
 public class StormTimelineMetricsSinkTest {
   @Test
@@ -65,4 +71,4 @@ public class StormTimelineMetricsSinkTest {
         Collections.singleton(new IMetricsConsumer.DataPoint("key1", 42)));
     verify(timelineMetricsCache, httpClient);
   }
-}
\ No newline at end of file
+}