You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by av...@apache.org on 2018/01/08 18:19:35 UTC

[09/28] ambari git commit: AMBARI-22740 : Fix integration test for HBase in branch-3.0-ams due to UUID changes. (avijayan)

http://git-wip-us.apache.org/repos/asf/ambari/blob/42112e28/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/webapp/TimelineWebServices.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/webapp/TimelineWebServices.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/webapp/TimelineWebServices.java
deleted file mode 100644
index d3da500..0000000
--- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/webapp/TimelineWebServices.java
+++ /dev/null
@@ -1,513 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.yarn.server.applicationhistoryservice.webapp;
-
-import java.io.IOException;
-import java.sql.SQLException;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.EnumSet;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.SortedSet;
-import java.util.TreeSet;
-
-import javax.servlet.http.HttpServletRequest;
-import javax.servlet.http.HttpServletResponse;
-import javax.ws.rs.Consumes;
-import javax.ws.rs.GET;
-import javax.ws.rs.POST;
-import javax.ws.rs.Path;
-import javax.ws.rs.Produces;
-import javax.ws.rs.QueryParam;
-import javax.ws.rs.WebApplicationException;
-import javax.ws.rs.core.Context;
-import javax.ws.rs.core.MediaType;
-import javax.ws.rs.core.Response;
-import javax.xml.bind.annotation.XmlAccessType;
-import javax.xml.bind.annotation.XmlAccessorType;
-import javax.xml.bind.annotation.XmlElement;
-import javax.xml.bind.annotation.XmlRootElement;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.classification.InterfaceAudience.Public;
-import org.apache.hadoop.classification.InterfaceStability.Unstable;
-import org.apache.hadoop.metrics2.sink.timeline.AggregationResult;
-import org.apache.hadoop.metrics2.sink.timeline.ContainerMetric;
-import org.apache.hadoop.metrics2.sink.timeline.Precision;
-import org.apache.hadoop.metrics2.sink.timeline.PrecisionLimitExceededException;
-import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
-import org.apache.hadoop.metrics2.sink.timeline.TimelineMetricMetadata;
-import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics;
-import org.apache.hadoop.metrics2.sink.timeline.TopNConfig;
-import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse;
-import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricStore;
-import org.apache.hadoop.yarn.server.applicationhistoryservice.timeline.GenericObjectMapper;
-import org.apache.hadoop.yarn.server.applicationhistoryservice.timeline.NameValuePair;
-import org.apache.hadoop.yarn.server.applicationhistoryservice.timeline.TimelineReader.Field;
-import org.apache.hadoop.yarn.util.timeline.TimelineUtils;
-import org.apache.hadoop.yarn.webapp.BadRequestException;
-
-import com.google.inject.Inject;
-import com.google.inject.Singleton;
-
-@Singleton
-@Path("/ws/v1/timeline")
-public class TimelineWebServices {
-  private static final Log LOG = LogFactory.getLog(TimelineWebServices.class);
-  
-  private TimelineMetricStore timelineMetricStore;
-
-  @Inject
-  public TimelineWebServices(TimelineMetricStore timelineMetricStore) {
-    this.timelineMetricStore = timelineMetricStore;
-  }
-
-  @XmlRootElement(name = "about")
-  @XmlAccessorType(XmlAccessType.NONE)
-  @Public
-  @Unstable
-  public static class AboutInfo {
-
-    private String about;
-
-    public AboutInfo() {
-
-    }
-
-    public AboutInfo(String about) {
-      this.about = about;
-    }
-
-    @XmlElement(name = "About")
-    public String getAbout() {
-      return about;
-    }
-
-    public void setAbout(String about) {
-      this.about = about;
-    }
-
-  }
-
-  /**
-   * Return the description of the timeline web services.
-   */
-  @GET
-  @Produces({ MediaType.APPLICATION_JSON /* , MediaType.APPLICATION_XML */})
-  public AboutInfo about(
-      @Context HttpServletRequest req,
-      @Context HttpServletResponse res) {
-    init(res);
-    return new AboutInfo("AMS API");
-  }
-
-  /**
-   * Store the given metrics into the timeline store, and return errors that
-   * happened during storing.
-   */
-  @Path("/metrics")
-  @POST
-  @Consumes({ MediaType.APPLICATION_JSON /* , MediaType.APPLICATION_XML */})
-  public TimelinePutResponse postMetrics(
-    @Context HttpServletRequest req,
-    @Context HttpServletResponse res,
-    TimelineMetrics metrics) {
-
-    init(res);
-    if (metrics == null) {
-      return new TimelinePutResponse();
-    }
-
-    try {
-
-      // TODO: Check ACLs for MetricEntity using the TimelineACLManager.
-      // TODO: Save owner of the MetricEntity.
-
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("Storing metrics: " +
-          TimelineUtils.dumpTimelineRecordtoJSON(metrics, true));
-      }
-
-      return timelineMetricStore.putMetrics(metrics);
-
-    } catch (Exception e) {
-      LOG.error("Error saving metrics.", e);
-      throw new WebApplicationException(e, Response.Status.INTERNAL_SERVER_ERROR);
-    }
-  }
-
-  /**
-   * Store the given metrics into the timeline store, and return errors that
-   * happened during storing.
-   */
-  @Path("/metrics/aggregated")
-  @POST
-  @Consumes({ MediaType.APPLICATION_JSON /* , MediaType.APPLICATION_XML */})
-  public TimelinePutResponse postAggregatedMetrics(
-    @Context HttpServletRequest req,
-    @Context HttpServletResponse res,
-    AggregationResult metrics) {
-
-    init(res);
-    if (metrics == null) {
-      return new TimelinePutResponse();
-    }
-
-    try {
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("Storing aggregated metrics: " +
-                TimelineUtils.dumpTimelineRecordtoJSON(metrics, true));
-      }
-
-      return timelineMetricStore.putHostAggregatedMetrics(metrics);
-    } catch (Exception e) {
-      LOG.error("Error saving metrics.", e);
-      throw new WebApplicationException(e, Response.Status.INTERNAL_SERVER_ERROR);
-    }
-  }
-
-  @Path("/containermetrics")
-  @POST
-  @Consumes({ MediaType.APPLICATION_JSON /* , MediaType.APPLICATION_XML */})
-  public TimelinePutResponse postContainerMetrics(
-      @Context HttpServletRequest req,
-      @Context HttpServletResponse res,
-      List<ContainerMetric> metrics) {
-    init(res);
-    if (metrics == null || metrics.isEmpty()) {
-      return new TimelinePutResponse();
-    }
-
-    try {
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("Storing container metrics: " + TimelineUtils
-            .dumpTimelineRecordtoJSON(metrics, true));
-      }
-
-      return timelineMetricStore.putContainerMetrics(metrics);
-
-    } catch (Exception e) {
-      LOG.error("Error saving metrics.", e);
-      throw new WebApplicationException(e, Response.Status.INTERNAL_SERVER_ERROR);
-    }
-  }
-
-  /**
-   * Query for a set of different metrics satisfying the filter criteria.
-   * All query params are optional. The default limit will apply if none
-   * specified.
-   *
-   * @param metricNames Comma separated list of metrics to retrieve.
-   * @param appId Application Id for the requested metrics.
-   * @param instanceId Application instance id.
-   * @param hostname Hostname where the metrics originated.
-   * @param startTime Start time for the metric records retrieved.
-   * @param precision Precision [ seconds, minutes, hours ]
-   * @param limit limit on total number of {@link TimelineMetric} records
-   *              retrieved.
-   * @return {@link @TimelineMetrics}
-   */
-  @GET
-  @Path("/metrics")
-  @Produces({ MediaType.APPLICATION_JSON })
-  public TimelineMetrics getTimelineMetrics(
-    @Context HttpServletRequest req,
-    @Context HttpServletResponse res,
-    @QueryParam("metricNames") String metricNames,
-    @QueryParam("appId") String appId,
-    @QueryParam("instanceId") String instanceId,
-    @QueryParam("hostname") String hostname,
-    @QueryParam("startTime") String startTime,
-    @QueryParam("endTime") String endTime,
-    @QueryParam("precision") String precision,
-    @QueryParam("limit") String limit,
-    @QueryParam("grouped") String grouped,
-    @QueryParam("topN") String topN,
-    @QueryParam("topNFunction") String topNFunction,
-    @QueryParam("isBottomN") String isBottomN,
-    @QueryParam("seriesAggregateFunction") String seriesAggregateFunction
-  ) {
-    init(res);
-    try {
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("Request for metrics => metricNames: " + metricNames + ", " +
-          "appId: " + appId + ", instanceId: " + instanceId + ", " +
-          "hostname: " + hostname + ", startTime: " + startTime + ", " +
-          "endTime: " + endTime + ", " +
-          "precision: " + precision + "seriesAggregateFunction: " + seriesAggregateFunction);
-      }
-
-      return timelineMetricStore.getTimelineMetrics(
-        parseListStr(metricNames, ","), parseListStr(hostname, ","), appId, parseStr(instanceId),
-        parseLongStr(startTime), parseLongStr(endTime),
-        Precision.getPrecision(precision), parseIntStr(limit),
-        parseBoolean(grouped), parseTopNConfig(topN, topNFunction, isBottomN),
-        seriesAggregateFunction);
-
-    } catch (NumberFormatException ne) {
-      throw new BadRequestException("startTime and limit should be numeric " +
-        "values");
-    } catch (Precision.PrecisionFormatException pfe) {
-      throw new BadRequestException("precision should be seconds, minutes " +
-        "or hours");
-    } catch (PrecisionLimitExceededException iae) {
-      throw new PrecisionLimitExceededException(iae.getMessage());
-    } catch (IllegalArgumentException iae) {
-      throw new BadRequestException(iae.getMessage());
-    } catch (SQLException | IOException e) {
-      throw new WebApplicationException(e,
-        Response.Status.INTERNAL_SERVER_ERROR);
-    }
-  }
-
-  @GET
-  @Path("/metrics/metadata")
-  @Produces({ MediaType.APPLICATION_JSON })
-  public Map<String, List<TimelineMetricMetadata>> getTimelineMetricMetadata(
-    @Context HttpServletRequest req,
-    @Context HttpServletResponse res,
-    @QueryParam("appId") String appId,
-    @QueryParam("metricName") String metricPattern,
-    @QueryParam("includeAll") String includeBlacklistedMetrics
-    ) {
-    init(res);
-
-    try {
-      return timelineMetricStore.getTimelineMetricMetadata(
-        parseStr(appId),
-        parseStr(metricPattern),
-        parseBoolean(includeBlacklistedMetrics));
-    } catch (Exception e) {
-      throw new WebApplicationException(e, Response.Status.INTERNAL_SERVER_ERROR);
-    }
-  }
-
-  @GET
-  @Path("/metrics/hosts")
-  @Produces({ MediaType.APPLICATION_JSON })
-  public Map<String, Set<String>> getHostedAppsMetadata(
-    @Context HttpServletRequest req,
-    @Context HttpServletResponse res
-  ) {
-    init(res);
-
-    try {
-      return timelineMetricStore.getHostAppsMetadata();
-    } catch (Exception e) {
-      throw new WebApplicationException(e, Response.Status.INTERNAL_SERVER_ERROR);
-    }
-  }
-
-  @GET
-  @Path("/metrics/instances")
-  @Produces({ MediaType.APPLICATION_JSON })
-  public Map<String, Map<String, Set<String>>> getClusterHostsMetadata(
-    @Context HttpServletRequest req,
-    @Context HttpServletResponse res,
-    @QueryParam("appId") String appId,
-    @QueryParam("instanceId") String instanceId
-  ) {
-    init(res);
-
-    try {
-      return timelineMetricStore.getInstanceHostsMetadata(instanceId, appId);
-    } catch (Exception e) {
-      throw new WebApplicationException(e, Response.Status.INTERNAL_SERVER_ERROR);
-    }
-  }
-
-  @GET
-  @Path("/metrics/uuid")
-  @Produces({ MediaType.APPLICATION_JSON })
-  public byte[] getUuid(
-    @Context HttpServletRequest req,
-    @Context HttpServletResponse res,
-    @QueryParam("metricName") String metricName,
-    @QueryParam("appId") String appId,
-    @QueryParam("instanceId") String instanceId,
-    @QueryParam("hostname") String hostname
-    ) {
-    init(res);
-
-    try {
-      return timelineMetricStore.getUuid(metricName, appId, instanceId, hostname);
-    } catch (Exception e) {
-      throw new WebApplicationException(e, Response.Status.INTERNAL_SERVER_ERROR);
-    }
-  }
-
-  /**
-   * This is a discovery endpoint that advertises known live collector
-   * instances. Note: It will always answer with current instance as live.
-   * This can be utilized as a liveliness pinger endpoint since the instance
-   * names are cached and thereby no synchronous calls result from this API
-   *
-   * @return List<String> hostnames</String>
-   */
-  @GET
-  @Path("/metrics/livenodes")
-  @Produces({ MediaType.APPLICATION_JSON })
-  public List<String> getLiveCollectorNodes(
-    @Context HttpServletRequest req,
-    @Context HttpServletResponse res
-  ) {
-    init(res);
-
-    return timelineMetricStore.getLiveInstances();
-  }
-
-  private void init(HttpServletResponse response) {
-    response.setContentType(null);
-  }
-
-  private static SortedSet<String> parseArrayStr(String str, String delimiter) {
-    if (str == null) {
-      return null;
-    }
-    SortedSet<String> strSet = new TreeSet<String>();
-    String[] strs = str.split(delimiter);
-    for (String aStr : strs) {
-      strSet.add(aStr.trim());
-    }
-    return strSet;
-  }
-
-  private static NameValuePair parsePairStr(String str, String delimiter) {
-    if (str == null) {
-      return null;
-    }
-    String[] strs = str.split(delimiter, 2);
-    try {
-      return new NameValuePair(strs[0].trim(),
-          GenericObjectMapper.OBJECT_READER.readValue(strs[1].trim()));
-    } catch (Exception e) {
-      // didn't work as an Object, keep it as a String
-      return new NameValuePair(strs[0].trim(), strs[1].trim());
-    }
-  }
-
-  private static Collection<NameValuePair> parsePairsStr(
-      String str, String aDelimiter, String pDelimiter) {
-    if (str == null) {
-      return null;
-    }
-    String[] strs = str.split(aDelimiter);
-    Set<NameValuePair> pairs = new HashSet<NameValuePair>();
-    for (String aStr : strs) {
-      pairs.add(parsePairStr(aStr, pDelimiter));
-    }
-    return pairs;
-  }
-
-  private static EnumSet<Field> parseFieldsStr(String str, String delimiter) {
-    if (str == null) {
-      return null;
-    }
-    String[] strs = str.split(delimiter);
-    List<Field> fieldList = new ArrayList<Field>();
-    for (String s : strs) {
-      s = s.trim().toUpperCase();
-      if (s.equals("EVENTS")) {
-        fieldList.add(Field.EVENTS);
-      } else if (s.equals("LASTEVENTONLY")) {
-        fieldList.add(Field.LAST_EVENT_ONLY);
-      } else if (s.equals("RELATEDENTITIES")) {
-        fieldList.add(Field.RELATED_ENTITIES);
-      } else if (s.equals("PRIMARYFILTERS")) {
-        fieldList.add(Field.PRIMARY_FILTERS);
-      } else if (s.equals("OTHERINFO")) {
-        fieldList.add(Field.OTHER_INFO);
-      } else {
-        throw new IllegalArgumentException("Requested nonexistent field " + s);
-      }
-    }
-    if (fieldList.size() == 0) {
-      return null;
-    }
-    Field f1 = fieldList.remove(fieldList.size() - 1);
-    if (fieldList.size() == 0) {
-      return EnumSet.of(f1);
-    } else {
-      return EnumSet.of(f1, fieldList.toArray(new Field[fieldList.size()]));
-    }
-  }
-
-  private static Long parseLongStr(String str) {
-    return str == null ? null : Long.parseLong(str.trim());
-  }
-
-  private static Integer parseIntStr(String str) {
-    return str == null ? null : Integer.parseInt(str.trim());
-  }
-
-  private static boolean parseBoolean(String booleanStr) {
-    return booleanStr == null || Boolean.parseBoolean(booleanStr);
-  }
-
-  private static TopNConfig parseTopNConfig(String topN, String topNFunction,
-                                            String bottomN) {
-    if (topN == null || topN.isEmpty()) {
-      return null;
-    }
-    Integer topNValue = parseIntStr(topN);
-
-    if (topNValue == 0) {
-      LOG.info("Invalid Input for TopN query. Ignoring TopN Request.");
-      return null;
-    }
-
-    Boolean isBottomN = (bottomN != null && Boolean.parseBoolean(bottomN));
-    return new TopNConfig(topNValue, topNFunction, isBottomN);
-  }
-
-  /**
-   * Parses delimited string to list of strings. It skips strings that are
-   * effectively empty (i.e. only whitespace).
-   *
-   */
-  private static List<String> parseListStr(String str, String delimiter) {
-    if (str == null || str.trim().isEmpty()){
-      return null;
-    }
-
-    String[] split = str.trim().split(delimiter);
-    List<String> list = new ArrayList<String>(split.length);
-    for (String s : split) {
-      if (!s.trim().isEmpty()){
-        list.add(s);
-      }
-    }
-
-    return list;
-  }
-
-  private static String parseStr(String str) {
-    String trimmedInstance = (str == null) ? null : str.trim();
-    if (trimmedInstance != null) {
-      if (trimmedInstance.isEmpty() || trimmedInstance.equalsIgnoreCase("undefined")) {
-        trimmedInstance = null;
-      }
-    }
-    return trimmedInstance;
-  }
-}

http://git-wip-us.apache.org/repos/asf/ambari/blob/42112e28/ambari-metrics/ambari-metrics-timelineservice/src/main/python/main.py
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/python/main.py b/ambari-metrics/ambari-metrics-timelineservice/src/main/python/main.py
index b6b4e0b..5faeffc 100644
--- a/ambari-metrics/ambari-metrics-timelineservice/src/main/python/main.py
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/python/main.py
@@ -39,13 +39,13 @@ SERVER_START_CMD = \
   "-cp {0} {1} " + \
   "-Djava.net.preferIPv4Stack=true " \
   "-Dproc_timelineserver " + \
-  "org.apache.hadoop.yarn.server.applicationhistoryservice.ApplicationHistoryServer"
+  "org.apache.ambari.metrics.AMSApplicationServer"
 SERVER_START_CMD_DEBUG = \
   "-cp {0} {1} " + \
   "-Djava.net.preferIPv4Stack=true " \
   "-Dproc_timelineserver " + \
   " -Xdebug -Xrunjdwp:transport=dt_socket,address=5005,server=y,suspend={2} " + \
-  "org.apache.hadoop.yarn.server.applicationhistoryservice.ApplicationHistoryServer"
+  "org.apache.ambari.metrics.AMSApplicationServer"
 
 AMC_DIE_MSG = "Ambari Metrics Collector java process died with exitcode {0}. Check {1} for more information."
 

http://git-wip-us.apache.org/repos/asf/ambari/blob/42112e28/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/loadsimulator/data/TestAppMetrics.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/loadsimulator/data/TestAppMetrics.java b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/loadsimulator/data/TestAppMetrics.java
new file mode 100644
index 0000000..89a5759
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/loadsimulator/data/TestAppMetrics.java
@@ -0,0 +1,134 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.metrics.core.loadsimulator.data;
+
+import org.apache.ambari.metrics.core.loadsimulator.util.Json;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+
+import static org.junit.Assert.assertEquals;
+
+public class TestAppMetrics {
+  private static final String SAMPLE_SINGLE_METRIC_HOST_JSON = "{\n" +
+    "  \"metrics\" : [ {\n" +
+    "    \"instanceid\" : \"\",\n" +
+    "    \"hostname\" : \"localhost\",\n" +
+    "    \"metrics\" : {\n" +
+    "      \"0\" : \"5.35\",\n" +
+    "      \"5000\" : \"5.35\",\n" +
+    "      \"10000\" : \"5.35\",\n" +
+    "      \"15000\" : \"5.35\"\n" +
+    "    },\n" +
+    "    \"starttime\" : \"1411663170112\",\n" +
+    "    \"appid\" : \"HOST\",\n" +
+    "    \"metricname\" : \"disk_free\"\n" +
+    "  } ]\n" +
+    "}";
+
+  private static final String SAMPLE_TWO_METRIC_HOST_JSON = "{\n" +
+    "  \"metrics\" : [ {\n" +
+    "    \"instanceid\" : \"\",\n" +
+    "    \"hostname\" : \"localhost\",\n" +
+    "    \"metrics\" : {\n" +
+    "      \"0\" : \"5.35\",\n" +
+    "      \"5000\" : \"5.35\",\n" +
+    "      \"10000\" : \"5.35\",\n" +
+    "      \"15000\" : \"5.35\"\n" +
+    "    },\n" +
+    "    \"starttime\" : \"0\",\n" +
+    "    \"appid\" : \"HOST\",\n" +
+    "    \"metricname\" : \"disk_free\"\n" +
+    "  }, {\n" +
+    "    \"instanceid\" : \"\",\n" +
+    "    \"hostname\" : \"localhost\",\n" +
+    "    \"metrics\" : {\n" +
+    "      \"0\" : \"94.0\",\n" +
+    "      \"5000\" : \"94.0\",\n" +
+    "      \"10000\" : \"94.0\",\n" +
+    "      \"15000\" : \"94.0\"\n" +
+    "    },\n" +
+    "    \"starttime\" : \"0\",\n" +
+    "    \"appid\" : \"HOST\",\n" +
+    "    \"metricname\" : \"mem_cached\"\n" +
+    "  } ]\n" +
+    "}";
+
+  private long[] timestamps;
+
+  @Before
+  public void setUp() throws Exception {
+    timestamps = new long[4];
+    timestamps[0] = 0;
+    timestamps[1] = timestamps[0] + 5000;
+    timestamps[2] = timestamps[1] + 5000;
+    timestamps[3] = timestamps[2] + 5000;
+
+  }
+
+  @Test
+  public void testHostDiskMetricsSerialization() throws IOException {
+    long timestamp = 1411663170112L;
+    AppMetrics appMetrics = new AppMetrics(new ApplicationInstance("localhost", AppID.HOST, ""), timestamp);
+
+    Metric diskFree = appMetrics.createMetric("disk_free");
+    double value = 5.35;
+
+    diskFree.putMetric(timestamps[0], Double.toString(value));
+    diskFree.putMetric(timestamps[1], Double.toString(value));
+    diskFree.putMetric(timestamps[2], Double.toString(value));
+    diskFree.putMetric(timestamps[3], Double.toString(value));
+
+    appMetrics.addMetric(diskFree);
+
+    String expected = SAMPLE_SINGLE_METRIC_HOST_JSON;
+    String s = new Json(true).serialize(appMetrics);
+
+    assertEquals("Serialized Host Metrics", expected, s);
+  }
+
+
+  @Test
+  public void testSingleHostManyMetricsSerialization() throws IOException {
+    AppMetrics appMetrics = new AppMetrics(new ApplicationInstance("localhost", AppID.HOST, ""), timestamps[0]);
+
+    Metric diskFree = appMetrics.createMetric("disk_free");
+    double value = 5.35;
+    diskFree.putMetric(timestamps[0], Double.toString(value));
+    diskFree.putMetric(timestamps[1], Double.toString(value));
+    diskFree.putMetric(timestamps[2], Double.toString(value));
+    diskFree.putMetric(timestamps[3], Double.toString(value));
+
+    appMetrics.addMetric(diskFree);
+
+    Metric memCache = appMetrics.createMetric("mem_cached");
+    double memVal = 94;
+    memCache.putMetric(timestamps[0], Double.toString(memVal));
+    memCache.putMetric(timestamps[1], Double.toString(memVal));
+    memCache.putMetric(timestamps[2], Double.toString(memVal));
+    memCache.putMetric(timestamps[3], Double.toString(memVal));
+
+    appMetrics.addMetric(memCache);
+
+    String expected = SAMPLE_TWO_METRIC_HOST_JSON;
+    String s = new Json(true).serialize(appMetrics);
+
+    assertEquals("Serialized Host Metrics", expected, s);
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ambari/blob/42112e28/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/loadsimulator/data/TestMetric.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/loadsimulator/data/TestMetric.java b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/loadsimulator/data/TestMetric.java
new file mode 100644
index 0000000..79e4b8f
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/loadsimulator/data/TestMetric.java
@@ -0,0 +1,80 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.metrics.core.loadsimulator.data;
+
+import org.apache.ambari.metrics.core.loadsimulator.util.Json;
+import org.junit.Test;
+
+import java.io.IOException;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.entry;
+import static org.junit.Assert.assertEquals;
+
+public class TestMetric {
+  private static final String SAMPLE_METRIC_IN_JSON = "{\n" +
+    "  \"instanceid\" : \"\",\n" +
+    "  \"hostname\" : \"localhost\",\n" +
+    "  \"metrics\" : {\n" +
+    "    \"0\" : \"5.35\",\n" +
+    "    \"5000\" : \"5.35\",\n" +
+    "    \"10000\" : \"5.35\",\n" +
+    "    \"15000\" : \"5.35\"\n" +
+    "  },\n" +
+    "  \"starttime\" : \"0\",\n" +
+    "  \"appid\" : \"HOST\",\n" +
+    "  \"metricname\" : \"disk_free\"\n" +
+    "}";
+
+  @Test
+  public void testSerializeToJson() throws IOException {
+    Metric diskOnHostMetric = new Metric(new ApplicationInstance("localhost", AppID.HOST, ""), "disk_free", 0);
+
+    long timestamp = 0;
+    double value = 5.35;
+
+    diskOnHostMetric.putMetric(timestamp, Double.toString(value));
+    diskOnHostMetric.putMetric(timestamp + 5000, Double.toString(value));
+    diskOnHostMetric.putMetric(timestamp + 10000, Double.toString(value));
+    diskOnHostMetric.putMetric(timestamp + 15000, Double.toString(value));
+
+    String expected = SAMPLE_METRIC_IN_JSON;
+    String s = new Json(true).serialize(diskOnHostMetric);
+
+    assertEquals("Json should match", expected, s);
+  }
+
+  @Test
+  public void testDeserializeObjectFromString() throws IOException {
+    String source = SAMPLE_METRIC_IN_JSON;
+
+    Metric m = new Json().deserialize(source, Metric.class);
+
+    assertEquals("localhost", m.getHostname());
+    assertEquals("HOST", m.getAppid());
+    assertEquals("", m.getInstanceid());
+    assertEquals("disk_free", m.getMetricname());
+    assertEquals("0", m.getStarttime());
+
+    assertThat(m.getMetrics()).isNotEmpty().hasSize(4).contains(
+      entry("0", "5.35"),
+      entry("5000", "5.35"),
+      entry("10000", "5.35"),
+      entry("15000", "5.35"));
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ambari/blob/42112e28/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/loadsimulator/jmetertest/jmetertest/AMSJMeterLoadTest.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/loadsimulator/jmetertest/jmetertest/AMSJMeterLoadTest.java b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/loadsimulator/jmetertest/jmetertest/AMSJMeterLoadTest.java
new file mode 100644
index 0000000..8d2d38e
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/loadsimulator/jmetertest/jmetertest/AMSJMeterLoadTest.java
@@ -0,0 +1,198 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.metrics.core.loadsimulator.jmetertest.jmetertest;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.ambari.metrics.core.loadsimulator.MetricsLoadSimulator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedReader;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+
+public class AMSJMeterLoadTest {
+
+  private final static Logger LOG = LoggerFactory.getLogger(AMSJMeterLoadTest.class);
+  private static String PROPERTIES_FILE = "loadsimulator/ams-jmeter.properties";
+  private ScheduledExecutorService scheduledExecutorService = null;
+  private List<AppGetMetric> appGetMetrics;
+  private Properties amsJmeterProperties = null;
+
+  public AMSJMeterLoadTest(Map<String, String> args) {
+
+    String testType = args.get("type");
+    String userDefinedPropertiesFile = args.get("amsJmeterPropertiesFile");
+    if (null == userDefinedPropertiesFile || userDefinedPropertiesFile.isEmpty()) {
+      this.amsJmeterProperties = readProperties(PROPERTIES_FILE);
+    } else {
+      this.amsJmeterProperties = readProperties(userDefinedPropertiesFile);
+    }
+
+    if ("U".equals(testType)) { //GET metrics simulator
+      int numInstances = Integer.valueOf(amsJmeterProperties.getProperty("num-ui-instances"));
+      this.scheduledExecutorService = Executors.newScheduledThreadPool(numInstances);
+      this.appGetMetrics = initializeGetMetricsPayload(amsJmeterProperties);
+      this.runTest(numInstances);
+    } else {                    //PUT Metrics simulator
+      Map<String, String> mapArgs = new HashMap<String, String>();
+      mapArgs.put("hostName", (args.get("host-prefix") != null) ? args.get("host-prefix") : amsJmeterProperties.getProperty("host-prefix"));
+      mapArgs.put("minHostIndex", (args.get("min-host-index") != null) ? args.get("min-host-index") : amsJmeterProperties.getProperty("min-host-index"));
+      mapArgs.put("numberOfHosts", (args.get("num-hosts") != null) ? args.get("num-hosts") : amsJmeterProperties.getProperty("num-hosts"));
+      mapArgs.put("metricsHostName", (args.get("ams-host-port") != null) ? args.get("ams-host-port") : amsJmeterProperties.getProperty("ams-host-port"));
+      mapArgs.put("collectInterval", (args.get("collection-interval") != null) ? args.get("collection-interval") : amsJmeterProperties.getProperty("collection-interval"));
+      mapArgs.put("sendInterval", (args.get("send-interval") != null) ? args.get("send-interval") : amsJmeterProperties.getProperty("send-interval"));
+      mapArgs.put("master", (args.get("create-master") != null) ? args.get("create-master") : amsJmeterProperties.getProperty("create-master"));
+      System.out.println("AMS Load Simulation Parameters : " + mapArgs);
+      MetricsLoadSimulator.startTest(mapArgs);
+    }
+  }
+
+  public static Properties readProperties(String propertiesFile) {
+    try {
+      Properties properties = new Properties();
+      InputStream inputStream = ClassLoader.getSystemResourceAsStream(propertiesFile);
+      if (inputStream == null) {
+        inputStream = new FileInputStream(propertiesFile);
+      }
+      properties.load(inputStream);
+      return properties;
+    } catch (IOException ioEx) {
+      LOG.error("Error reading properties file for jmeter");
+      return null;
+    }
+  }
+
+  private static List<GetMetricRequestInfo> readMetricsFromFile(String app) {
+    InputStream input = null;
+    List<GetMetricRequestInfo> metricList = new ArrayList<>();
+    String fileName = "ui_metrics_def/" + app + ".dat";
+
+    try {
+      input = ClassLoader.getSystemResourceAsStream(fileName);
+      BufferedReader reader = new BufferedReader(new InputStreamReader(input));
+      String line;
+      List<String> metrics = new ArrayList<>();
+      while ((line = reader.readLine()) != null) {
+
+        if (line.startsWith("|")) {
+          boolean needsTimestamps = line.contains("startTime");
+          boolean needsHost = line.contains("hostname");
+          metricList.add(new GetMetricRequestInfo(metrics, needsTimestamps, needsHost));
+          metrics.clear();
+        } else {
+          metrics.add(line);
+        }
+      }
+      return metricList;
+    } catch (IOException e) {
+      LOG.error("Cannot read file " + fileName + " for appID " + app, e);
+    } finally {
+      if (input != null) {
+        try {
+          input.close();
+        } catch (IOException ex) {
+        }
+      }
+    }
+    return null;
+  }
+
+  private static List<AppGetMetric> initializeGetMetricsPayload(Properties amsJmeterProperties) {
+
+    List<AppGetMetric> appGetMetrics = new ArrayList<AppGetMetric>();
+    String appsToTest = amsJmeterProperties.getProperty("apps-to-test");
+    String[] apps;
+
+    if (appsToTest != null && !appsToTest.isEmpty()) {
+      apps = StringUtils.split(appsToTest, ",");
+    } else {
+      apps = new String[JmeterTestPlanTask.ClientApp.values().length];
+      int ctr = 0;
+      for (JmeterTestPlanTask.ClientApp app : JmeterTestPlanTask.ClientApp.values())
+        apps[ctr++] = app.getId();
+    }
+
+    for (String app : apps) {
+
+      int interval = Integer.valueOf(amsJmeterProperties.getProperty("get-interval"));
+      String intervalString = amsJmeterProperties.getProperty(app + "-get-interval");
+      if (intervalString != null && !intervalString.isEmpty()) {
+        interval = Integer.valueOf(intervalString);
+      }
+      appGetMetrics.add(new AppGetMetric(readMetricsFromFile(app), interval, app));
+    }
+
+    return appGetMetrics;
+  }
+
+  public void runTest(int numInstances) {
+
+    int appRefreshRate = Integer.valueOf(amsJmeterProperties.getProperty("app-refresh-rate"));
+    for (int i = 0; i < numInstances; i++) {
+      ScheduledFuture future = scheduledExecutorService.scheduleAtFixedRate(new JmeterTestPlanTask(appGetMetrics,
+        amsJmeterProperties), 0, appRefreshRate, TimeUnit.MILLISECONDS);
+    }
+  }
+
+  /**
+   * Sample Usage:
+   * java -cp "lib/*":ambari-metrics-timelineservice-2.1.1.0.jar org.apache.ambari.metrics
+   * .core.loadsimulator.jmeter.AMSJMeterLoadTest
+   * -t UI -p ambari-metrics-timelineservice/src/main/resources/jmeter/ams-jmeter.properties
+   */
+  public static void main(String[] args) {
+    Map<String, String> mapArgs = parseArgs(args);
+    new AMSJMeterLoadTest(mapArgs);
+  }
+
+  private static Map<String, String> parseArgs(String[] args) {
+    Map<String, String> mapProps = new HashMap<String, String>();
+    if (args.length == 0) {
+      printUsage();
+      throw new RuntimeException("Unexpected argument, See usage message.");
+    } else {
+      for (int i = 0; i < args.length; i += 2) {
+        String arg = args[i];
+        mapProps.put(arg.substring(1), args[i+1]);
+      }
+    }
+    return mapProps;
+  }
+
+  public static void printUsage() {
+    System.err.println("Usage: java AMSJmeterLoadTest [OPTIONS]");
+    System.err.println("Options: ");
+    System.err.println("[--t type (S=>Sink/U=>UI)] [-ams-host-port localhost:6188] [-min-host-index 2] [-host-prefix TestHost.] [-num-hosts 2] " +
+      "[-create-master true] [-collection-interval 10000 ] [-send-interval 60000 ] [-p amsJmeterPropertiesFile (Optional)]");
+  }
+
+}
+
+

http://git-wip-us.apache.org/repos/asf/ambari/blob/42112e28/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/loadsimulator/jmetertest/jmetertest/AppGetMetric.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/loadsimulator/jmetertest/jmetertest/AppGetMetric.java b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/loadsimulator/jmetertest/jmetertest/AppGetMetric.java
new file mode 100644
index 0000000..bc6428e
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/loadsimulator/jmetertest/jmetertest/AppGetMetric.java
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.metrics.core.loadsimulator.jmetertest.jmetertest;
+
+import java.util.List;
+
+public class AppGetMetric {
+
+  private String app;
+  private int interval;
+  private List<GetMetricRequestInfo> requests;
+
+  public AppGetMetric(List<GetMetricRequestInfo> requests, int interval, String app) {
+    this.setMetricRequests(requests);
+    this.setInterval(interval);
+    this.setApp(app);
+  }
+
+  public List<GetMetricRequestInfo> getMetricRequests() {
+    return requests;
+  }
+
+  public void setMetricRequests(List<GetMetricRequestInfo> requests) {
+    this.requests = requests;
+  }
+
+  public int getInterval() {
+    return interval;
+  }
+
+  public void setInterval(int interval) {
+    this.interval = interval;
+  }
+
+  public String getApp() {
+    return app;
+  }
+
+  public void setApp(String app) {
+    this.app = app;
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/42112e28/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/loadsimulator/jmetertest/jmetertest/GetMetricRequestInfo.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/loadsimulator/jmetertest/jmetertest/GetMetricRequestInfo.java b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/loadsimulator/jmetertest/jmetertest/GetMetricRequestInfo.java
new file mode 100644
index 0000000..60ed3eb
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/loadsimulator/jmetertest/jmetertest/GetMetricRequestInfo.java
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.metrics.core.loadsimulator.jmetertest.jmetertest;
+
+import org.apache.commons.lang.StringUtils;
+
+import java.util.List;
+
+
+public class GetMetricRequestInfo {
+
+  private String metricStringPayload;
+  private boolean needsTimestamps;
+  private boolean needsHost;
+
+  public GetMetricRequestInfo(List<String> metrics, boolean needsTimestamps, boolean needsHost) {
+
+    this.setMetricStringPayload(StringUtils.join(metrics, ","));
+    this.setNeedsTimestamps(needsTimestamps);
+    this.setNeedsHost(needsHost);
+  }
+
+  public String getMetricStringPayload() {
+    return metricStringPayload;
+  }
+
+  public void setMetricStringPayload(String metricStringPayload) {
+    this.metricStringPayload = metricStringPayload;
+  }
+
+  public boolean needsTimestamps() {
+    return needsTimestamps;
+  }
+
+  public void setNeedsTimestamps(boolean needsTimestamps) {
+    this.needsTimestamps = needsTimestamps;
+  }
+
+  public boolean needsHost() {
+    return needsHost;
+  }
+
+  public void setNeedsHost(boolean needsHost) {
+    this.needsHost = needsHost;
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/42112e28/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/loadsimulator/jmetertest/jmetertest/JmeterTestPlanTask.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/loadsimulator/jmetertest/jmetertest/JmeterTestPlanTask.java b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/loadsimulator/jmetertest/jmetertest/JmeterTestPlanTask.java
new file mode 100644
index 0000000..0590c73
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/loadsimulator/jmetertest/jmetertest/JmeterTestPlanTask.java
@@ -0,0 +1,276 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.metrics.core.loadsimulator.jmetertest.jmetertest;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.jmeter.control.LoopController;
+import org.apache.jmeter.engine.StandardJMeterEngine;
+import org.apache.jmeter.protocol.http.sampler.HTTPSampler;
+import org.apache.jmeter.protocol.http.util.HTTPConstants;
+import org.apache.jmeter.reporters.ResultCollector;
+import org.apache.jmeter.reporters.Summariser;
+import org.apache.jmeter.testelement.TestElement;
+import org.apache.jmeter.testelement.TestPlan;
+import org.apache.jmeter.threads.JMeterContextService;
+import org.apache.jmeter.threads.ThreadGroup;
+import org.apache.jmeter.timers.ConstantTimer;
+import org.apache.jmeter.util.JMeterUtils;
+import org.apache.jorphan.collections.HashTree;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.lang.reflect.Field;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Random;
+
+public class JmeterTestPlanTask implements Runnable {
+
+  private static StandardJMeterEngine jmeterEngine = null;
+  private final static Logger LOG = LoggerFactory.getLogger(JmeterTestPlanTask.class);
+  private List<AppGetMetric> appGetMetrics;
+  private Properties amsJmeterProperties;
+  private HashTree amsTestPlanTree;
+  private TestPlan amsTestPlan;
+  private static final String JMETER_HOME = "loadsimulator";
+  private static final String JMETER_PROPERTIES_FILE = JMETER_HOME + "/jmeter.properties";
+  private static final String SAVESERVICE_PROPERTIES_FILE = JMETER_HOME + "/saveservice.properties";
+
+  public enum ClientApp {
+    HOST("HOST"),
+    NAMENODE("NAMENODE"),
+    HBASE("HBASE"),
+    NIMBUS("NIMBUS"),
+    KAFKA_BROKER("KAFKA_BROKER"),
+    FLUME_HANDLER("FLUME_HANDLER"),
+    AMS_HBASE("AMS-HBASE"),
+    NODEMANAGER("NODEMANAGER"),
+    RESOURCEMANAGER("RESOURCEMANAGER"),
+    DATANODE("DATANODE");
+
+    private String id;
+
+    private ClientApp(String id) {
+      this.id = id;
+    }
+
+    public String getId() {
+      return id;
+    }
+  }
+
+  public JmeterTestPlanTask(List<AppGetMetric> appGetMetrics, Properties amsJmeterProperties) {
+    this.appGetMetrics = appGetMetrics;
+    this.amsJmeterProperties = amsJmeterProperties;
+    amsTestPlanTree = new HashTree();
+    amsTestPlan = new TestPlan("AMS JMeter Load Test plan");
+    System.out.println("Starting AMS Jmeter load testing");
+  }
+
+  public void run() {
+    if (jmeterEngine != null) {
+
+      Object[] threadGroups = amsTestPlanTree.getArray(amsTestPlan);
+      for (Object threadGroupObj : threadGroups) {
+        if (threadGroupObj instanceof ThreadGroup) {
+          ThreadGroup threadGroup = (ThreadGroup) threadGroupObj;
+          threadGroup.stop();
+        }
+      }
+      amsTestPlanTree.clear();
+      jmeterEngine.askThreadsToStop();
+      jmeterEngine.stopTest();
+      JMeterContextService.endTest();
+    }
+
+    //Start the new test plan for the new app.
+    try {
+      //Initialize Jmeter essentials
+      jmeterEngine = new StandardJMeterEngine();
+      JMeterContextService.getContext().setEngine(jmeterEngine);
+
+      //Workaround to supply JMeterUtils with jmeter.prooperties from JAR.
+      JMeterUtils.setJMeterHome("");
+      Field f = new JMeterUtils().getClass().getDeclaredField("appProperties");
+      f.setAccessible(true);
+      f.set(null, AMSJMeterLoadTest.readProperties(JMETER_PROPERTIES_FILE));
+
+      //Copy saveservices.properties file to tmp dir for JMeter to consume.
+      InputStream inputStream = ClassLoader.getSystemResourceAsStream(SAVESERVICE_PROPERTIES_FILE);
+      if (inputStream == null) {
+        inputStream = new FileInputStream(SAVESERVICE_PROPERTIES_FILE);
+      }
+      String tmpDir = System.getProperty("java.io.tmpdir");
+      OutputStream outputStream = new FileOutputStream(tmpDir + "/saveservice.properties");
+      IOUtils.copy(inputStream, outputStream);
+      outputStream.close();
+      JMeterUtils.setProperty("saveservice_properties", tmpDir + "/saveservice.properties");
+
+      //Initialize Test plan
+      amsTestPlan.setProperty(TestElement.TEST_CLASS, TestPlan.class.getName());
+      amsTestPlanTree.add("AMS Test plan", amsTestPlan);
+
+      //Choose a random APP to run the perform GET metrics request.
+      int currentAppIndex = new Random().nextInt(appGetMetrics.size());
+
+      //Create ThreadGroup for the App
+      createThreadGroupHashTree(currentAppIndex, amsJmeterProperties, amsTestPlanTree, amsTestPlan);
+
+      //Geneates the JMX file that you can use through the GUI mode.
+      //SaveService.saveTree(amsTestPlanTree, new FileOutputStream(JMETER_HOME + "/" + "amsTestPlan.jmx"));
+
+      //Summarizer output to get test progress in stdout like.
+      Summariser summariser = null;
+      String summariserName = JMeterUtils.getPropDefault("summariser.name", "summary");
+      if (summariserName.length() > 0) {
+        summariser = new Summariser(summariserName);
+      }
+
+      //Store execution results into a .jtl file
+      String jmeterLogFile = tmpDir + "/amsJmeterTestResults.jtl";
+      ResultCollector resultCollector = new ResultCollector(summariser);
+      resultCollector.setFilename(jmeterLogFile);
+      amsTestPlanTree.add(amsTestPlanTree.getArray()[0], resultCollector);
+      jmeterEngine.configure(amsTestPlanTree);
+      jmeterEngine.run();
+
+      LOG.info("AMS Jmeter Test started up successfully");
+
+    } catch (Exception ioEx) {
+      amsTestPlanTree.clear();
+      jmeterEngine.askThreadsToStop();
+      jmeterEngine.stopTest();
+      JMeterContextService.endTest();
+      LOG.error("Error occurred while running AMS load test : " + ioEx.getMessage());
+      ioEx.printStackTrace();
+    }
+  }
+
+  private ConstantTimer createConstantTimer(int delay) {
+    ConstantTimer timer = new ConstantTimer();
+    timer.setDelay("" + delay);
+    return timer;
+  }
+
+  private Map<String, String> getAppSpecificParameters(String app, GetMetricRequestInfo request, Properties amsJmeterProperties) {
+
+    Map<String, String> parametersMap = new HashMap<String, String>();
+    String hostPrefix = amsJmeterProperties.getProperty("host-prefix");
+    String hostSuffix = amsJmeterProperties.getProperty("host-suffix");
+    int minHostIndex = Integer.valueOf(amsJmeterProperties.getProperty("min-host-index"));
+    int numHosts = Integer.valueOf(amsJmeterProperties.getProperty("num-hosts"));
+
+    parametersMap.put("appId", app);
+
+    if (request.needsTimestamps()) {
+      long currentTime = System.currentTimeMillis();
+      long oneHourBack = currentTime - 3600 * 1000;
+      parametersMap.put("startTime", String.valueOf(oneHourBack));
+      parametersMap.put("endTime", String.valueOf(currentTime));
+    }
+
+    if (request.needsHost()) {
+      if (ClientApp.AMS_HBASE.getId().equals(app)) {
+        parametersMap.put("hostname", amsJmeterProperties.getProperty("ams-host"));
+      } else if (ClientApp.HOST.getId().equals(app) || ClientApp.NODEMANAGER.getId().equals(app)) {
+        int randomHost = minHostIndex + new Random().nextInt(numHosts);
+        parametersMap.put("hostname", hostPrefix + randomHost + hostSuffix);
+      } else {
+        parametersMap.put("hostname", hostPrefix + amsJmeterProperties.getProperty(app + "-host") + hostSuffix);
+      }
+    }
+    parametersMap.put("metricNames", request.getMetricStringPayload());
+    return parametersMap;
+  }
+
+  private void createThreadGroupHashTree(int appIndex, Properties amsJmeterProperties, HashTree amsTestPlanTree, TestPlan amsTestPlan) {
+
+    AppGetMetric appGetMetric = appGetMetrics.get(appIndex);
+    String app = appGetMetric.getApp();
+    int interval = appGetMetric.getInterval();
+
+    //Read and validate AMS information.
+    String[] amsHostPort = amsJmeterProperties.getProperty("ams-host-port").split(":");
+    String amsHost = amsHostPort[0];
+    String amsPath = amsJmeterProperties.getProperty("ams-path");
+    int amsPort = Integer.valueOf(amsHostPort[1]);
+    int numLoops = Integer.valueOf(amsJmeterProperties.getProperty("num-get-calls-per-app"));
+
+    LoopController loopController = createLoopController(app + " GET loop controller", numLoops, false);
+    for (GetMetricRequestInfo request : appGetMetric.getMetricRequests()) {
+
+      ThreadGroup threadGroup = createThreadGroup(app + " GET threadGroup", 1, 0, loopController);
+
+      HashTree threadGroupHashTree = amsTestPlanTree.add(amsTestPlan, threadGroup);
+      Map<String, String> parametersMap = getAppSpecificParameters(app, request, amsJmeterProperties);
+
+      HTTPSampler sampler = createGetSampler("GET " + app + " metrics", amsHost, amsPort, amsPath, null, parametersMap);
+
+      if (numLoops > 1) {
+        threadGroupHashTree.add(createConstantTimer(interval));
+      }
+
+      threadGroupHashTree.add(sampler);
+    }
+  }
+
+  private HTTPSampler createGetSampler(String name, String domain, int port, String path, String encoding, Map<String, String> parameters) {
+
+    HTTPSampler sampler = new HTTPSampler();
+    sampler.setDomain(domain);
+    sampler.setPort(port);
+    sampler.setPath(path);
+    sampler.setMethod(HTTPConstants.GET);
+
+    if (encoding != null)
+      sampler.setContentEncoding(encoding);
+
+    for (Map.Entry<String, String> entry : parameters.entrySet()) {
+      sampler.addArgument(entry.getKey(), entry.getValue());
+    }
+    sampler.setName(name);
+    return sampler;
+  }
+
+  private LoopController createLoopController(String name, int numLoops, boolean continueForever) {
+    LoopController loopController = new LoopController();
+    loopController.setLoops(numLoops);
+    loopController.setProperty(TestElement.TEST_CLASS, LoopController.class.getName());
+    loopController.initialize();
+    loopController.setContinueForever(continueForever);
+    loopController.setName(name);
+    return loopController;
+  }
+
+  private ThreadGroup createThreadGroup(String name, int numThreads, int rampUp, LoopController loopController) {
+    ThreadGroup threadGroup = new ThreadGroup();
+    threadGroup.setName(name);
+    threadGroup.setNumThreads(numThreads);
+    threadGroup.setRampUp(rampUp);
+    threadGroup.setSamplerController(loopController);
+    threadGroup.setProperty(TestElement.TEST_CLASS, ThreadGroup.class.getName());
+    return threadGroup;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/42112e28/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/loadsimulator/net/TestRestMetricsSender.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/loadsimulator/net/TestRestMetricsSender.java b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/loadsimulator/net/TestRestMetricsSender.java
new file mode 100644
index 0000000..9c8e641
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/loadsimulator/net/TestRestMetricsSender.java
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.metrics.core.loadsimulator.net;
+
+import org.junit.Test;
+
+import java.io.IOException;
+
+import static org.easymock.EasyMock.*;
+import static org.junit.Assert.assertEquals;
+
+public class TestRestMetricsSender {
+
+  @Test
+  public void testPushMetrics() throws Exception {
+    final UrlService svcMock = createStrictMock(UrlService.class);
+    final String payload = "test";
+    final String expectedResponse = "mockResponse";
+
+    expect(svcMock.send(anyString())).andReturn(expectedResponse);
+    svcMock.disconnect();
+    expectLastCall();
+
+    replay(svcMock);
+
+    RestMetricsSender sender = new RestMetricsSender("expectedHostName") {
+      @Override
+      protected UrlService getConnectedUrlService() throws IOException {
+        return svcMock;
+      }
+    };
+    String response = sender.pushMetrics(payload);
+
+    verify(svcMock);
+    assertEquals("", expectedResponse, response);
+  }
+
+  @Test
+  public void testPushMetricsFailed() throws Exception {
+    final UrlService svcMock = createStrictMock(UrlService.class);
+    final String payload = "test";
+    final String expectedResponse = "mockResponse";
+    RestMetricsSender sender = new RestMetricsSender("expectedHostName") {
+      @Override
+      protected UrlService getConnectedUrlService() throws IOException {
+        return svcMock;
+      }
+    };
+
+    expect(svcMock.send(anyString())).andThrow(new IOException());
+    svcMock.disconnect();
+    expectLastCall();
+
+    replay(svcMock);
+
+    String response = sender.pushMetrics(payload);
+
+    verify(svcMock);
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ambari/blob/42112e28/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/loadsimulator/net/TestStdOutMetricsSender.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/loadsimulator/net/TestStdOutMetricsSender.java b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/loadsimulator/net/TestStdOutMetricsSender.java
new file mode 100644
index 0000000..29ebda4
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/loadsimulator/net/TestStdOutMetricsSender.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.metrics.core.loadsimulator.net;
+
+
+import org.junit.Test;
+
+import java.io.ByteArrayOutputStream;
+import java.io.PrintStream;
+
+public class TestStdOutMetricsSender {
+
+  @Test
+  public void testPushMetrics() throws Exception {
+    ByteArrayOutputStream baos = new ByteArrayOutputStream();
+    PrintStream out = new PrintStream(baos);
+    StdOutMetricsSender sender = new StdOutMetricsSender("expectedHostName", out);
+    sender.pushMetrics("test");
+
+    System.out.println(baos.toString());
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ambari/blob/42112e28/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/loadsimulator/util/TestRandomMetricsProvider.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/loadsimulator/util/TestRandomMetricsProvider.java b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/loadsimulator/util/TestRandomMetricsProvider.java
new file mode 100644
index 0000000..a1801b0
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/loadsimulator/util/TestRandomMetricsProvider.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.metrics.core.loadsimulator.util;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertTrue;
+
+public class TestRandomMetricsProvider {
+
+  @Test
+  public void testReturnSingle() {
+    double from = 5.25;
+    double to = 5.40;
+    RandomMetricsProvider provider = new RandomMetricsProvider(from, to);
+    double metric = provider.next();
+
+    assertTrue("Generated metric should be in range", from < metric && metric < to);
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ambari/blob/42112e28/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/loadsimulator/util/TestTimeStampProvider.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/loadsimulator/util/TestTimeStampProvider.java b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/loadsimulator/util/TestTimeStampProvider.java
new file mode 100644
index 0000000..9011e75
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/loadsimulator/util/TestTimeStampProvider.java
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.metrics.core.loadsimulator.util;
+
+import org.junit.Test;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.junit.Assert.assertEquals;
+
+public class TestTimeStampProvider {
+
+  @Test
+  public void testReturnSingle() {
+    long startTime = 1411663170112L;
+    int timeStep = 5000;
+    TimeStampProvider tm = new TimeStampProvider(startTime, timeStep, 0);
+
+    long tStamp = tm.next();
+
+    assertEquals("First generated timestamp should match starttime", startTime, tStamp);
+  }
+
+  @Test
+  public void testReturnTstampsForSendInterval() throws Exception {
+    long startTime = 0;
+    int collectInterval = 5;
+    int sendInterval = 30;
+    TimeStampProvider tsp = new TimeStampProvider(startTime, collectInterval, sendInterval);
+
+    long[] timestamps = tsp.timestampsForNextInterval();
+
+    assertThat(timestamps)
+      .hasSize(6)
+      .containsOnly(0, 5, 10, 15, 20, 25);
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ambari/blob/42112e28/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/timeline/AbstractMiniHBaseClusterTest.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/timeline/AbstractMiniHBaseClusterTest.java b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/timeline/AbstractMiniHBaseClusterTest.java
new file mode 100644
index 0000000..0553d4c
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/timeline/AbstractMiniHBaseClusterTest.java
@@ -0,0 +1,293 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.metrics.core.timeline;
+
+import static org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration.OUT_OFF_BAND_DATA_TIME_ALLOWANCE;
+import static org.apache.ambari.metrics.core.timeline.query.PhoenixTransactSQL.METRICS_RECORD_TABLE_NAME;
+import static org.apache.ambari.metrics.core.timeline.query.PhoenixTransactSQL.UPSERT_METRICS_SQL;
+import static org.apache.phoenix.end2end.ParallelStatsDisabledIT.tearDownMiniCluster;
+import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.IntegrationTestingUtility;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics;
+import org.apache.ambari.metrics.core.timeline.aggregators.AggregatorUtils;
+import org.apache.ambari.metrics.core.timeline.discovery.TimelineMetricMetadataManager;
+import org.apache.ambari.metrics.core.timeline.query.PhoenixConnectionProvider;
+import org.apache.hadoop.yarn.util.timeline.TimelineUtils;
+import org.apache.log4j.Level;
+import org.apache.log4j.Logger;
+import org.apache.phoenix.hbase.index.write.IndexWriterUtils;
+import org.apache.phoenix.query.BaseTest;
+import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.util.PropertiesUtil;
+import org.apache.phoenix.util.ReadOnlyProps;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public abstract class AbstractMiniHBaseClusterTest extends BaseTest {
+
+  protected static final long BATCH_SIZE = 3;
+  protected Connection conn;
+  protected PhoenixHBaseAccessor hdb;
+  protected TimelineMetricMetadataManager metadataManager;
+
+  public final Log LOG;
+
+  public AbstractMiniHBaseClusterTest() {
+    LOG = LogFactory.getLog(this.getClass());
+  }
+
+  @BeforeClass
+  public static void doSetup() throws Exception {
+    Map<String, String> props = getDefaultProps();
+    props.put(IntegrationTestingUtility.IS_DISTRIBUTED_CLUSTER, "false");
+    props.put(QueryServices.QUEUE_SIZE_ATTRIB, Integer.toString(5000));
+    props.put(IndexWriterUtils.HTABLE_THREAD_KEY, Integer.toString(100));
+    // Make a small batch size to test multiple calls to reserve sequences
+    props.put(QueryServices.SEQUENCE_CACHE_SIZE_ATTRIB, Long.toString(BATCH_SIZE));
+    // Must update config before starting server
+    setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator()));
+  }
+
+  @AfterClass
+  public static void doTeardown() throws Exception {
+    dropNonSystemTables();
+    tearDownMiniCluster();
+  }
+
+  @Before
+  public void setUp() throws Exception {
+    Logger.getLogger("org.apache.ambari.metrics.core.timeline").setLevel(Level.DEBUG);
+    hdb = createTestableHBaseAccessor();
+    // inits connection, starts mini cluster
+    conn = getConnection(getUrl());
+
+    hdb.initMetricSchema();
+    metadataManager = new TimelineMetricMetadataManager(new Configuration(), hdb);
+    hdb.setMetadataInstance(metadataManager);
+  }
+
+  private void deleteTableIgnoringExceptions(Statement stmt, String tableName) {
+    try {
+      stmt.execute("delete from " + tableName);
+    } catch (Exception e) {
+      LOG.warn("Exception on delete table " + tableName, e);
+    }
+  }
+
+  @After
+  public void tearDown() {
+    Connection conn = null;
+    Statement stmt = null;
+    try {
+      conn = getConnection(getUrl());
+      stmt = conn.createStatement();
+
+      deleteTableIgnoringExceptions(stmt, "METRIC_AGGREGATE");
+      deleteTableIgnoringExceptions(stmt, "METRIC_AGGREGATE_MINUTE");
+      deleteTableIgnoringExceptions(stmt, "METRIC_AGGREGATE_HOURLY");
+      deleteTableIgnoringExceptions(stmt, "METRIC_AGGREGATE_DAILY");
+      deleteTableIgnoringExceptions(stmt, "METRIC_RECORD");
+      deleteTableIgnoringExceptions(stmt, "METRIC_RECORD_MINUTE");
+      deleteTableIgnoringExceptions(stmt, "METRIC_RECORD_HOURLY");
+      deleteTableIgnoringExceptions(stmt, "METRIC_RECORD_DAILY");
+      deleteTableIgnoringExceptions(stmt, "METRICS_METADATA");
+      deleteTableIgnoringExceptions(stmt, "HOSTED_APPS_METADATA");
+
+      conn.commit();
+    } catch (Exception e) {
+      LOG.warn("Error on deleting HBase schema.", e);
+    }  finally {
+      if (stmt != null) {
+        try {
+          stmt.close();
+        } catch (SQLException e) {
+          // Ignore
+        }
+      }
+
+      if (conn != null) {
+        try {
+          conn.close();
+        } catch (SQLException e) {
+          // Ignore
+        }
+      }
+    }
+  }
+
+  public static Map<String, String> getDefaultProps() {
+    Map<String, String> props = new HashMap<String, String>();
+    // Must update config before starting server
+    props.put(QueryServices.STATS_USE_CURRENT_TIME_ATTRIB,
+      Boolean.FALSE.toString());
+    props.put("java.security.krb5.realm", "");
+    props.put("java.security.krb5.kdc", "");
+    return props;
+  }
+
+  protected Connection getConnection(String url) throws SQLException {
+    Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+    Connection conn = DriverManager.getConnection(getUrl(), props);
+    return conn;
+  }
+
+  /**
+   * A canary test. Will show if the infrastructure is set-up correctly.
+   */
+  @Test
+  public void testClusterOK() throws Exception {
+    Connection conn = getConnection(getUrl());
+    conn.setAutoCommit(true);
+
+    String sampleDDL = "CREATE TABLE TEST_METRICS " +
+      "(TEST_COLUMN VARCHAR " +
+      "CONSTRAINT pk PRIMARY KEY (TEST_COLUMN)) " +
+      "DATA_BLOCK_ENCODING='FAST_DIFF', IMMUTABLE_ROWS=true, " +
+      "TTL=86400, COMPRESSION='NONE' ";
+
+    Statement stmt = conn.createStatement();
+    stmt.executeUpdate(sampleDDL);
+    conn.commit();
+
+    ResultSet rs = stmt.executeQuery(
+      "SELECT COUNT(TEST_COLUMN) FROM TEST_METRICS");
+
+    rs.next();
+    long l = rs.getLong(1);
+    assertThat(l).isGreaterThanOrEqualTo(0);
+
+    stmt.execute("DROP TABLE TEST_METRICS");
+    conn.close();
+  }
+
+  protected PhoenixHBaseAccessor createTestableHBaseAccessor() {
+    Configuration metricsConf = new Configuration();
+    metricsConf.set(TimelineMetricConfiguration.HBASE_COMPRESSION_SCHEME, "NONE");
+    // Unit tests insert values into the future
+    metricsConf.setLong(OUT_OFF_BAND_DATA_TIME_ALLOWANCE, 600000);
+
+    return
+      new PhoenixHBaseAccessor(new TimelineMetricConfiguration(new Configuration(), metricsConf),
+        new PhoenixConnectionProvider() {
+          @Override
+          public Admin getHBaseAdmin() throws IOException {
+            try {
+              return driver.getConnectionQueryServices(null, null).getAdmin();
+            } catch (SQLException e) {
+              LOG.error(e);
+            }
+            return null;
+          }
+
+          @Override
+          public Connection getConnection() {
+            Connection connection = null;
+            try {
+              connection = DriverManager.getConnection(getUrl());
+            } catch (SQLException e) {
+              LOG.warn("Unable to connect to HBase store using Phoenix.", e);
+            }
+            return connection;
+          }
+
+        });
+  }
+
+  protected void insertMetricRecords(Connection conn, TimelineMetrics metrics)
+    throws SQLException, IOException {
+
+    List<TimelineMetric> timelineMetrics = metrics.getMetrics();
+    if (timelineMetrics == null || timelineMetrics.isEmpty()) {
+      LOG.debug("Empty metrics insert request.");
+      return;
+    }
+
+    PreparedStatement metricRecordStmt = null;
+
+    try {
+      metricRecordStmt = conn.prepareStatement(String.format(
+        UPSERT_METRICS_SQL, METRICS_RECORD_TABLE_NAME));
+
+      for (TimelineMetric metric : timelineMetrics) {
+        metricRecordStmt.clearParameters();
+
+        if (LOG.isTraceEnabled()) {
+          LOG.trace("host: " + metric.getHostName() + ", " +
+            "metricName = " + metric.getMetricName() + ", " +
+            "values: " + metric.getMetricValues());
+        }
+        double[] aggregates =  AggregatorUtils.calculateAggregates(
+          metric.getMetricValues());
+
+        byte[] uuid = metadataManager.getUuid(metric);
+        if (uuid == null) {
+          LOG.error("Error computing UUID for metric. Cannot write metrics : " + metric.toString());
+          continue;
+        }
+        metricRecordStmt.setBytes(1, uuid);
+        metricRecordStmt.setLong(2, metric.getStartTime());
+        metricRecordStmt.setDouble(3, aggregates[0]);
+        metricRecordStmt.setDouble(4, aggregates[1]);
+        metricRecordStmt.setDouble(5, aggregates[2]);
+        metricRecordStmt.setInt(6, (int) aggregates[3]);
+        String json = TimelineUtils.dumpTimelineRecordtoJSON(metric.getMetricValues());
+        metricRecordStmt.setString(7, json);
+
+        try {
+          int row = metricRecordStmt.executeUpdate();
+          LOG.info("Inserted " + row + " rows.");
+        } catch (SQLException sql) {
+          LOG.error(sql);
+        }
+      }
+
+      conn.commit();
+
+    } finally {
+      if (metricRecordStmt != null) {
+        try {
+          metricRecordStmt.close();
+        } catch (SQLException e) {
+          // Ignore
+        }
+      }
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ambari/blob/42112e28/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/timeline/AbstractPhoenixConnectionlessTest.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/timeline/AbstractPhoenixConnectionlessTest.java b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/timeline/AbstractPhoenixConnectionlessTest.java
new file mode 100644
index 0000000..d3fc50f
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/timeline/AbstractPhoenixConnectionlessTest.java
@@ -0,0 +1,111 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.metrics.core.timeline;
+
+import org.apache.phoenix.jdbc.PhoenixEmbeddedDriver;
+import org.apache.phoenix.jdbc.PhoenixTestDriver;
+import org.apache.phoenix.query.BaseTest;
+import org.apache.phoenix.util.PropertiesUtil;
+import org.apache.phoenix.util.ReadOnlyProps;
+import org.apache.phoenix.util.TestUtil;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import static org.apache.phoenix.util.PhoenixRuntime.TENANT_ID_ATTRIB;
+import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+public abstract class AbstractPhoenixConnectionlessTest extends BaseTest {
+
+  protected static String getUrl() {
+    return TestUtil.PHOENIX_CONNECTIONLESS_JDBC_URL;
+  }
+
+  protected static String getUrl(String tenantId) {
+    return getUrl() + ';' + TENANT_ID_ATTRIB + '=' + tenantId;
+  }
+
+  protected static PhoenixTestDriver driver;
+
+  private static void startServer(String url) throws Exception {
+    assertNull(driver);
+    // only load the test driver if we are testing locally - for integration tests, we want to
+    // test on a wider scale
+    if (PhoenixEmbeddedDriver.isTestUrl(url)) {
+      driver = initDriver(ReadOnlyProps.EMPTY_PROPS);
+      assertTrue(DriverManager.getDriver(url) == driver);
+      driver.connect(url, PropertiesUtil.deepCopy(TEST_PROPERTIES));
+    }
+  }
+
+  protected static synchronized PhoenixTestDriver initDriver(ReadOnlyProps props) throws Exception {
+    if (driver == null) {
+      driver = new PhoenixTestDriver(props);
+      DriverManager.registerDriver(driver);
+    }
+    return driver;
+  }
+
+  private String connUrl;
+
+  @Before
+  public void setup() throws Exception {
+    connUrl = getUrl();
+    startServer(connUrl);
+  }
+
+  @Test
+  public void testStorageSystemInitialized() throws Exception {
+    String sampleDDL = "CREATE TABLE TEST_METRICS (TEST_COLUMN VARCHAR " +
+      "CONSTRAINT pk PRIMARY KEY (TEST_COLUMN)) DATA_BLOCK_ENCODING='FAST_DIFF', " +
+      "IMMUTABLE_ROWS=true, TTL=86400, COMPRESSION='SNAPPY'";
+
+    Connection conn = null;
+    PreparedStatement stmt = null;
+    try {
+      conn = DriverManager.getConnection(connUrl);
+      stmt = conn.prepareStatement(sampleDDL);
+      stmt.execute();
+      conn.commit();
+    } finally {
+      if (stmt != null) {
+        stmt.close();
+      }
+      if (conn != null) {
+        conn.close();
+      }
+    }
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    if (driver != null) {
+      try {
+        driver.close();
+      } finally {
+        PhoenixTestDriver phoenixTestDriver = driver;
+        driver = null;
+        DriverManager.deregisterDriver(phoenixTestDriver);
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/42112e28/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/timeline/FunctionTest.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/timeline/FunctionTest.java b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/timeline/FunctionTest.java
new file mode 100644
index 0000000..03e39f7
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/timeline/FunctionTest.java
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.metrics.core.timeline;
+
+import org.apache.ambari.metrics.core.timeline.aggregators.Function;
+import org.junit.Ignore;
+import org.junit.Test;
+
+import static org.apache.ambari.metrics.core.timeline.aggregators.Function.fromMetricName;
+import static org.apache.ambari.metrics.core.timeline.aggregators.Function.ReadFunction.AVG;
+import static org.apache.ambari.metrics.core.timeline.aggregators.Function.PostProcessingFunction.RATE;
+import static org.apache.ambari.metrics.core.timeline.aggregators.Function.PostProcessingFunction.DIFF;
+import static org.assertj.core.api.Assertions.assertThat;
+
+public class FunctionTest {
+
+  @Test
+  public void testCreation() throws Exception {
+    Function f = fromMetricName("Metric._avg");
+    assertThat(f).isEqualTo(new Function(AVG, null));
+
+    f = fromMetricName("Metric._rate._avg");
+    assertThat(f).isEqualTo(new Function(AVG, RATE));
+
+    f = fromMetricName("bytes_in");
+    assertThat(f).isEqualTo(Function.DEFAULT_VALUE_FUNCTION);
+
+    // Rate support without aggregates
+    f = fromMetricName("Metric._rate");
+    assertThat(f).isEqualTo(new Function(null, RATE));
+
+    // Diff support
+    f = fromMetricName("Metric._diff._avg");
+    assertThat(f).isEqualTo(new Function(AVG, DIFF));
+
+    // Diff support without aggregates
+    f = fromMetricName("Metric._diff");
+    assertThat(f).isEqualTo(new Function(null, DIFF));
+
+  }
+
+  @Ignore // If unknown function: behavior is best effort query without function
+  @Test(expected = Function.FunctionFormatException.class)
+  public void testNotAFunction() throws Exception {
+    fromMetricName("bytes._not._afunction");
+  }
+}