You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by av...@apache.org on 2018/01/08 18:19:35 UTC
[09/28] ambari git commit: AMBARI-22740 : Fix integration test for
HBase in branch-3.0-ams due to UUID changes. (avijayan)
http://git-wip-us.apache.org/repos/asf/ambari/blob/42112e28/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/webapp/TimelineWebServices.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/webapp/TimelineWebServices.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/webapp/TimelineWebServices.java
deleted file mode 100644
index d3da500..0000000
--- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/webapp/TimelineWebServices.java
+++ /dev/null
@@ -1,513 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.yarn.server.applicationhistoryservice.webapp;
-
-import java.io.IOException;
-import java.sql.SQLException;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.EnumSet;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.SortedSet;
-import java.util.TreeSet;
-
-import javax.servlet.http.HttpServletRequest;
-import javax.servlet.http.HttpServletResponse;
-import javax.ws.rs.Consumes;
-import javax.ws.rs.GET;
-import javax.ws.rs.POST;
-import javax.ws.rs.Path;
-import javax.ws.rs.Produces;
-import javax.ws.rs.QueryParam;
-import javax.ws.rs.WebApplicationException;
-import javax.ws.rs.core.Context;
-import javax.ws.rs.core.MediaType;
-import javax.ws.rs.core.Response;
-import javax.xml.bind.annotation.XmlAccessType;
-import javax.xml.bind.annotation.XmlAccessorType;
-import javax.xml.bind.annotation.XmlElement;
-import javax.xml.bind.annotation.XmlRootElement;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.classification.InterfaceAudience.Public;
-import org.apache.hadoop.classification.InterfaceStability.Unstable;
-import org.apache.hadoop.metrics2.sink.timeline.AggregationResult;
-import org.apache.hadoop.metrics2.sink.timeline.ContainerMetric;
-import org.apache.hadoop.metrics2.sink.timeline.Precision;
-import org.apache.hadoop.metrics2.sink.timeline.PrecisionLimitExceededException;
-import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
-import org.apache.hadoop.metrics2.sink.timeline.TimelineMetricMetadata;
-import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics;
-import org.apache.hadoop.metrics2.sink.timeline.TopNConfig;
-import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse;
-import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricStore;
-import org.apache.hadoop.yarn.server.applicationhistoryservice.timeline.GenericObjectMapper;
-import org.apache.hadoop.yarn.server.applicationhistoryservice.timeline.NameValuePair;
-import org.apache.hadoop.yarn.server.applicationhistoryservice.timeline.TimelineReader.Field;
-import org.apache.hadoop.yarn.util.timeline.TimelineUtils;
-import org.apache.hadoop.yarn.webapp.BadRequestException;
-
-import com.google.inject.Inject;
-import com.google.inject.Singleton;
-
-@Singleton
-@Path("/ws/v1/timeline")
-public class TimelineWebServices {
- private static final Log LOG = LogFactory.getLog(TimelineWebServices.class);
-
- private TimelineMetricStore timelineMetricStore;
-
- @Inject
- public TimelineWebServices(TimelineMetricStore timelineMetricStore) {
- this.timelineMetricStore = timelineMetricStore;
- }
-
- @XmlRootElement(name = "about")
- @XmlAccessorType(XmlAccessType.NONE)
- @Public
- @Unstable
- public static class AboutInfo {
-
- private String about;
-
- public AboutInfo() {
-
- }
-
- public AboutInfo(String about) {
- this.about = about;
- }
-
- @XmlElement(name = "About")
- public String getAbout() {
- return about;
- }
-
- public void setAbout(String about) {
- this.about = about;
- }
-
- }
-
- /**
- * Return the description of the timeline web services.
- */
- @GET
- @Produces({ MediaType.APPLICATION_JSON /* , MediaType.APPLICATION_XML */})
- public AboutInfo about(
- @Context HttpServletRequest req,
- @Context HttpServletResponse res) {
- init(res);
- return new AboutInfo("AMS API");
- }
-
- /**
- * Store the given metrics into the timeline store, and return errors that
- * happened during storing.
- */
- @Path("/metrics")
- @POST
- @Consumes({ MediaType.APPLICATION_JSON /* , MediaType.APPLICATION_XML */})
- public TimelinePutResponse postMetrics(
- @Context HttpServletRequest req,
- @Context HttpServletResponse res,
- TimelineMetrics metrics) {
-
- init(res);
- if (metrics == null) {
- return new TimelinePutResponse();
- }
-
- try {
-
- // TODO: Check ACLs for MetricEntity using the TimelineACLManager.
- // TODO: Save owner of the MetricEntity.
-
- if (LOG.isDebugEnabled()) {
- LOG.debug("Storing metrics: " +
- TimelineUtils.dumpTimelineRecordtoJSON(metrics, true));
- }
-
- return timelineMetricStore.putMetrics(metrics);
-
- } catch (Exception e) {
- LOG.error("Error saving metrics.", e);
- throw new WebApplicationException(e, Response.Status.INTERNAL_SERVER_ERROR);
- }
- }
-
- /**
- * Store the given metrics into the timeline store, and return errors that
- * happened during storing.
- */
- @Path("/metrics/aggregated")
- @POST
- @Consumes({ MediaType.APPLICATION_JSON /* , MediaType.APPLICATION_XML */})
- public TimelinePutResponse postAggregatedMetrics(
- @Context HttpServletRequest req,
- @Context HttpServletResponse res,
- AggregationResult metrics) {
-
- init(res);
- if (metrics == null) {
- return new TimelinePutResponse();
- }
-
- try {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Storing aggregated metrics: " +
- TimelineUtils.dumpTimelineRecordtoJSON(metrics, true));
- }
-
- return timelineMetricStore.putHostAggregatedMetrics(metrics);
- } catch (Exception e) {
- LOG.error("Error saving metrics.", e);
- throw new WebApplicationException(e, Response.Status.INTERNAL_SERVER_ERROR);
- }
- }
-
- @Path("/containermetrics")
- @POST
- @Consumes({ MediaType.APPLICATION_JSON /* , MediaType.APPLICATION_XML */})
- public TimelinePutResponse postContainerMetrics(
- @Context HttpServletRequest req,
- @Context HttpServletResponse res,
- List<ContainerMetric> metrics) {
- init(res);
- if (metrics == null || metrics.isEmpty()) {
- return new TimelinePutResponse();
- }
-
- try {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Storing container metrics: " + TimelineUtils
- .dumpTimelineRecordtoJSON(metrics, true));
- }
-
- return timelineMetricStore.putContainerMetrics(metrics);
-
- } catch (Exception e) {
- LOG.error("Error saving metrics.", e);
- throw new WebApplicationException(e, Response.Status.INTERNAL_SERVER_ERROR);
- }
- }
-
- /**
- * Query for a set of different metrics satisfying the filter criteria.
- * All query params are optional. The default limit will apply if none
- * specified.
- *
- * @param metricNames Comma separated list of metrics to retrieve.
- * @param appId Application Id for the requested metrics.
- * @param instanceId Application instance id.
- * @param hostname Hostname where the metrics originated.
- * @param startTime Start time for the metric records retrieved.
- * @param precision Precision [ seconds, minutes, hours ]
- * @param limit limit on total number of {@link TimelineMetric} records
- * retrieved.
- * @return {@link @TimelineMetrics}
- */
- @GET
- @Path("/metrics")
- @Produces({ MediaType.APPLICATION_JSON })
- public TimelineMetrics getTimelineMetrics(
- @Context HttpServletRequest req,
- @Context HttpServletResponse res,
- @QueryParam("metricNames") String metricNames,
- @QueryParam("appId") String appId,
- @QueryParam("instanceId") String instanceId,
- @QueryParam("hostname") String hostname,
- @QueryParam("startTime") String startTime,
- @QueryParam("endTime") String endTime,
- @QueryParam("precision") String precision,
- @QueryParam("limit") String limit,
- @QueryParam("grouped") String grouped,
- @QueryParam("topN") String topN,
- @QueryParam("topNFunction") String topNFunction,
- @QueryParam("isBottomN") String isBottomN,
- @QueryParam("seriesAggregateFunction") String seriesAggregateFunction
- ) {
- init(res);
- try {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Request for metrics => metricNames: " + metricNames + ", " +
- "appId: " + appId + ", instanceId: " + instanceId + ", " +
- "hostname: " + hostname + ", startTime: " + startTime + ", " +
- "endTime: " + endTime + ", " +
- "precision: " + precision + "seriesAggregateFunction: " + seriesAggregateFunction);
- }
-
- return timelineMetricStore.getTimelineMetrics(
- parseListStr(metricNames, ","), parseListStr(hostname, ","), appId, parseStr(instanceId),
- parseLongStr(startTime), parseLongStr(endTime),
- Precision.getPrecision(precision), parseIntStr(limit),
- parseBoolean(grouped), parseTopNConfig(topN, topNFunction, isBottomN),
- seriesAggregateFunction);
-
- } catch (NumberFormatException ne) {
- throw new BadRequestException("startTime and limit should be numeric " +
- "values");
- } catch (Precision.PrecisionFormatException pfe) {
- throw new BadRequestException("precision should be seconds, minutes " +
- "or hours");
- } catch (PrecisionLimitExceededException iae) {
- throw new PrecisionLimitExceededException(iae.getMessage());
- } catch (IllegalArgumentException iae) {
- throw new BadRequestException(iae.getMessage());
- } catch (SQLException | IOException e) {
- throw new WebApplicationException(e,
- Response.Status.INTERNAL_SERVER_ERROR);
- }
- }
-
- @GET
- @Path("/metrics/metadata")
- @Produces({ MediaType.APPLICATION_JSON })
- public Map<String, List<TimelineMetricMetadata>> getTimelineMetricMetadata(
- @Context HttpServletRequest req,
- @Context HttpServletResponse res,
- @QueryParam("appId") String appId,
- @QueryParam("metricName") String metricPattern,
- @QueryParam("includeAll") String includeBlacklistedMetrics
- ) {
- init(res);
-
- try {
- return timelineMetricStore.getTimelineMetricMetadata(
- parseStr(appId),
- parseStr(metricPattern),
- parseBoolean(includeBlacklistedMetrics));
- } catch (Exception e) {
- throw new WebApplicationException(e, Response.Status.INTERNAL_SERVER_ERROR);
- }
- }
-
- @GET
- @Path("/metrics/hosts")
- @Produces({ MediaType.APPLICATION_JSON })
- public Map<String, Set<String>> getHostedAppsMetadata(
- @Context HttpServletRequest req,
- @Context HttpServletResponse res
- ) {
- init(res);
-
- try {
- return timelineMetricStore.getHostAppsMetadata();
- } catch (Exception e) {
- throw new WebApplicationException(e, Response.Status.INTERNAL_SERVER_ERROR);
- }
- }
-
- @GET
- @Path("/metrics/instances")
- @Produces({ MediaType.APPLICATION_JSON })
- public Map<String, Map<String, Set<String>>> getClusterHostsMetadata(
- @Context HttpServletRequest req,
- @Context HttpServletResponse res,
- @QueryParam("appId") String appId,
- @QueryParam("instanceId") String instanceId
- ) {
- init(res);
-
- try {
- return timelineMetricStore.getInstanceHostsMetadata(instanceId, appId);
- } catch (Exception e) {
- throw new WebApplicationException(e, Response.Status.INTERNAL_SERVER_ERROR);
- }
- }
-
- @GET
- @Path("/metrics/uuid")
- @Produces({ MediaType.APPLICATION_JSON })
- public byte[] getUuid(
- @Context HttpServletRequest req,
- @Context HttpServletResponse res,
- @QueryParam("metricName") String metricName,
- @QueryParam("appId") String appId,
- @QueryParam("instanceId") String instanceId,
- @QueryParam("hostname") String hostname
- ) {
- init(res);
-
- try {
- return timelineMetricStore.getUuid(metricName, appId, instanceId, hostname);
- } catch (Exception e) {
- throw new WebApplicationException(e, Response.Status.INTERNAL_SERVER_ERROR);
- }
- }
-
- /**
- * This is a discovery endpoint that advertises known live collector
- * instances. Note: It will always answer with current instance as live.
- * This can be utilized as a liveliness pinger endpoint since the instance
- * names are cached and thereby no synchronous calls result from this API
- *
- * @return List<String> hostnames</String>
- */
- @GET
- @Path("/metrics/livenodes")
- @Produces({ MediaType.APPLICATION_JSON })
- public List<String> getLiveCollectorNodes(
- @Context HttpServletRequest req,
- @Context HttpServletResponse res
- ) {
- init(res);
-
- return timelineMetricStore.getLiveInstances();
- }
-
- private void init(HttpServletResponse response) {
- response.setContentType(null);
- }
-
- private static SortedSet<String> parseArrayStr(String str, String delimiter) {
- if (str == null) {
- return null;
- }
- SortedSet<String> strSet = new TreeSet<String>();
- String[] strs = str.split(delimiter);
- for (String aStr : strs) {
- strSet.add(aStr.trim());
- }
- return strSet;
- }
-
- private static NameValuePair parsePairStr(String str, String delimiter) {
- if (str == null) {
- return null;
- }
- String[] strs = str.split(delimiter, 2);
- try {
- return new NameValuePair(strs[0].trim(),
- GenericObjectMapper.OBJECT_READER.readValue(strs[1].trim()));
- } catch (Exception e) {
- // didn't work as an Object, keep it as a String
- return new NameValuePair(strs[0].trim(), strs[1].trim());
- }
- }
-
- private static Collection<NameValuePair> parsePairsStr(
- String str, String aDelimiter, String pDelimiter) {
- if (str == null) {
- return null;
- }
- String[] strs = str.split(aDelimiter);
- Set<NameValuePair> pairs = new HashSet<NameValuePair>();
- for (String aStr : strs) {
- pairs.add(parsePairStr(aStr, pDelimiter));
- }
- return pairs;
- }
-
- private static EnumSet<Field> parseFieldsStr(String str, String delimiter) {
- if (str == null) {
- return null;
- }
- String[] strs = str.split(delimiter);
- List<Field> fieldList = new ArrayList<Field>();
- for (String s : strs) {
- s = s.trim().toUpperCase();
- if (s.equals("EVENTS")) {
- fieldList.add(Field.EVENTS);
- } else if (s.equals("LASTEVENTONLY")) {
- fieldList.add(Field.LAST_EVENT_ONLY);
- } else if (s.equals("RELATEDENTITIES")) {
- fieldList.add(Field.RELATED_ENTITIES);
- } else if (s.equals("PRIMARYFILTERS")) {
- fieldList.add(Field.PRIMARY_FILTERS);
- } else if (s.equals("OTHERINFO")) {
- fieldList.add(Field.OTHER_INFO);
- } else {
- throw new IllegalArgumentException("Requested nonexistent field " + s);
- }
- }
- if (fieldList.size() == 0) {
- return null;
- }
- Field f1 = fieldList.remove(fieldList.size() - 1);
- if (fieldList.size() == 0) {
- return EnumSet.of(f1);
- } else {
- return EnumSet.of(f1, fieldList.toArray(new Field[fieldList.size()]));
- }
- }
-
- private static Long parseLongStr(String str) {
- return str == null ? null : Long.parseLong(str.trim());
- }
-
- private static Integer parseIntStr(String str) {
- return str == null ? null : Integer.parseInt(str.trim());
- }
-
- private static boolean parseBoolean(String booleanStr) {
- return booleanStr == null || Boolean.parseBoolean(booleanStr);
- }
-
- private static TopNConfig parseTopNConfig(String topN, String topNFunction,
- String bottomN) {
- if (topN == null || topN.isEmpty()) {
- return null;
- }
- Integer topNValue = parseIntStr(topN);
-
- if (topNValue == 0) {
- LOG.info("Invalid Input for TopN query. Ignoring TopN Request.");
- return null;
- }
-
- Boolean isBottomN = (bottomN != null && Boolean.parseBoolean(bottomN));
- return new TopNConfig(topNValue, topNFunction, isBottomN);
- }
-
- /**
- * Parses delimited string to list of strings. It skips strings that are
- * effectively empty (i.e. only whitespace).
- *
- */
- private static List<String> parseListStr(String str, String delimiter) {
- if (str == null || str.trim().isEmpty()){
- return null;
- }
-
- String[] split = str.trim().split(delimiter);
- List<String> list = new ArrayList<String>(split.length);
- for (String s : split) {
- if (!s.trim().isEmpty()){
- list.add(s);
- }
- }
-
- return list;
- }
-
- private static String parseStr(String str) {
- String trimmedInstance = (str == null) ? null : str.trim();
- if (trimmedInstance != null) {
- if (trimmedInstance.isEmpty() || trimmedInstance.equalsIgnoreCase("undefined")) {
- trimmedInstance = null;
- }
- }
- return trimmedInstance;
- }
-}
http://git-wip-us.apache.org/repos/asf/ambari/blob/42112e28/ambari-metrics/ambari-metrics-timelineservice/src/main/python/main.py
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/python/main.py b/ambari-metrics/ambari-metrics-timelineservice/src/main/python/main.py
index b6b4e0b..5faeffc 100644
--- a/ambari-metrics/ambari-metrics-timelineservice/src/main/python/main.py
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/python/main.py
@@ -39,13 +39,13 @@ SERVER_START_CMD = \
"-cp {0} {1} " + \
"-Djava.net.preferIPv4Stack=true " \
"-Dproc_timelineserver " + \
- "org.apache.hadoop.yarn.server.applicationhistoryservice.ApplicationHistoryServer"
+ "org.apache.ambari.metrics.AMSApplicationServer"
SERVER_START_CMD_DEBUG = \
"-cp {0} {1} " + \
"-Djava.net.preferIPv4Stack=true " \
"-Dproc_timelineserver " + \
" -Xdebug -Xrunjdwp:transport=dt_socket,address=5005,server=y,suspend={2} " + \
- "org.apache.hadoop.yarn.server.applicationhistoryservice.ApplicationHistoryServer"
+ "org.apache.ambari.metrics.AMSApplicationServer"
AMC_DIE_MSG = "Ambari Metrics Collector java process died with exitcode {0}. Check {1} for more information."
http://git-wip-us.apache.org/repos/asf/ambari/blob/42112e28/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/loadsimulator/data/TestAppMetrics.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/loadsimulator/data/TestAppMetrics.java b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/loadsimulator/data/TestAppMetrics.java
new file mode 100644
index 0000000..89a5759
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/loadsimulator/data/TestAppMetrics.java
@@ -0,0 +1,134 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.metrics.core.loadsimulator.data;
+
+import org.apache.ambari.metrics.core.loadsimulator.util.Json;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+
+import static org.junit.Assert.assertEquals;
+
+public class TestAppMetrics {
+ private static final String SAMPLE_SINGLE_METRIC_HOST_JSON = "{\n" +
+ " \"metrics\" : [ {\n" +
+ " \"instanceid\" : \"\",\n" +
+ " \"hostname\" : \"localhost\",\n" +
+ " \"metrics\" : {\n" +
+ " \"0\" : \"5.35\",\n" +
+ " \"5000\" : \"5.35\",\n" +
+ " \"10000\" : \"5.35\",\n" +
+ " \"15000\" : \"5.35\"\n" +
+ " },\n" +
+ " \"starttime\" : \"1411663170112\",\n" +
+ " \"appid\" : \"HOST\",\n" +
+ " \"metricname\" : \"disk_free\"\n" +
+ " } ]\n" +
+ "}";
+
+ private static final String SAMPLE_TWO_METRIC_HOST_JSON = "{\n" +
+ " \"metrics\" : [ {\n" +
+ " \"instanceid\" : \"\",\n" +
+ " \"hostname\" : \"localhost\",\n" +
+ " \"metrics\" : {\n" +
+ " \"0\" : \"5.35\",\n" +
+ " \"5000\" : \"5.35\",\n" +
+ " \"10000\" : \"5.35\",\n" +
+ " \"15000\" : \"5.35\"\n" +
+ " },\n" +
+ " \"starttime\" : \"0\",\n" +
+ " \"appid\" : \"HOST\",\n" +
+ " \"metricname\" : \"disk_free\"\n" +
+ " }, {\n" +
+ " \"instanceid\" : \"\",\n" +
+ " \"hostname\" : \"localhost\",\n" +
+ " \"metrics\" : {\n" +
+ " \"0\" : \"94.0\",\n" +
+ " \"5000\" : \"94.0\",\n" +
+ " \"10000\" : \"94.0\",\n" +
+ " \"15000\" : \"94.0\"\n" +
+ " },\n" +
+ " \"starttime\" : \"0\",\n" +
+ " \"appid\" : \"HOST\",\n" +
+ " \"metricname\" : \"mem_cached\"\n" +
+ " } ]\n" +
+ "}";
+
+ private long[] timestamps;
+
+ @Before
+ public void setUp() throws Exception {
+ timestamps = new long[4];
+ timestamps[0] = 0;
+ timestamps[1] = timestamps[0] + 5000;
+ timestamps[2] = timestamps[1] + 5000;
+ timestamps[3] = timestamps[2] + 5000;
+
+ }
+
+ @Test
+ public void testHostDiskMetricsSerialization() throws IOException {
+ long timestamp = 1411663170112L;
+ AppMetrics appMetrics = new AppMetrics(new ApplicationInstance("localhost", AppID.HOST, ""), timestamp);
+
+ Metric diskFree = appMetrics.createMetric("disk_free");
+ double value = 5.35;
+
+ diskFree.putMetric(timestamps[0], Double.toString(value));
+ diskFree.putMetric(timestamps[1], Double.toString(value));
+ diskFree.putMetric(timestamps[2], Double.toString(value));
+ diskFree.putMetric(timestamps[3], Double.toString(value));
+
+ appMetrics.addMetric(diskFree);
+
+ String expected = SAMPLE_SINGLE_METRIC_HOST_JSON;
+ String s = new Json(true).serialize(appMetrics);
+
+ assertEquals("Serialized Host Metrics", expected, s);
+ }
+
+
+ @Test
+ public void testSingleHostManyMetricsSerialization() throws IOException {
+ AppMetrics appMetrics = new AppMetrics(new ApplicationInstance("localhost", AppID.HOST, ""), timestamps[0]);
+
+ Metric diskFree = appMetrics.createMetric("disk_free");
+ double value = 5.35;
+ diskFree.putMetric(timestamps[0], Double.toString(value));
+ diskFree.putMetric(timestamps[1], Double.toString(value));
+ diskFree.putMetric(timestamps[2], Double.toString(value));
+ diskFree.putMetric(timestamps[3], Double.toString(value));
+
+ appMetrics.addMetric(diskFree);
+
+ Metric memCache = appMetrics.createMetric("mem_cached");
+ double memVal = 94;
+ memCache.putMetric(timestamps[0], Double.toString(memVal));
+ memCache.putMetric(timestamps[1], Double.toString(memVal));
+ memCache.putMetric(timestamps[2], Double.toString(memVal));
+ memCache.putMetric(timestamps[3], Double.toString(memVal));
+
+ appMetrics.addMetric(memCache);
+
+ String expected = SAMPLE_TWO_METRIC_HOST_JSON;
+ String s = new Json(true).serialize(appMetrics);
+
+ assertEquals("Serialized Host Metrics", expected, s);
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ambari/blob/42112e28/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/loadsimulator/data/TestMetric.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/loadsimulator/data/TestMetric.java b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/loadsimulator/data/TestMetric.java
new file mode 100644
index 0000000..79e4b8f
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/loadsimulator/data/TestMetric.java
@@ -0,0 +1,80 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.metrics.core.loadsimulator.data;
+
+import org.apache.ambari.metrics.core.loadsimulator.util.Json;
+import org.junit.Test;
+
+import java.io.IOException;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.entry;
+import static org.junit.Assert.assertEquals;
+
+public class TestMetric {
+ private static final String SAMPLE_METRIC_IN_JSON = "{\n" +
+ " \"instanceid\" : \"\",\n" +
+ " \"hostname\" : \"localhost\",\n" +
+ " \"metrics\" : {\n" +
+ " \"0\" : \"5.35\",\n" +
+ " \"5000\" : \"5.35\",\n" +
+ " \"10000\" : \"5.35\",\n" +
+ " \"15000\" : \"5.35\"\n" +
+ " },\n" +
+ " \"starttime\" : \"0\",\n" +
+ " \"appid\" : \"HOST\",\n" +
+ " \"metricname\" : \"disk_free\"\n" +
+ "}";
+
+ @Test
+ public void testSerializeToJson() throws IOException {
+ Metric diskOnHostMetric = new Metric(new ApplicationInstance("localhost", AppID.HOST, ""), "disk_free", 0);
+
+ long timestamp = 0;
+ double value = 5.35;
+
+ diskOnHostMetric.putMetric(timestamp, Double.toString(value));
+ diskOnHostMetric.putMetric(timestamp + 5000, Double.toString(value));
+ diskOnHostMetric.putMetric(timestamp + 10000, Double.toString(value));
+ diskOnHostMetric.putMetric(timestamp + 15000, Double.toString(value));
+
+ String expected = SAMPLE_METRIC_IN_JSON;
+ String s = new Json(true).serialize(diskOnHostMetric);
+
+ assertEquals("Json should match", expected, s);
+ }
+
+ @Test
+ public void testDeserializeObjectFromString() throws IOException {
+ String source = SAMPLE_METRIC_IN_JSON;
+
+ Metric m = new Json().deserialize(source, Metric.class);
+
+ assertEquals("localhost", m.getHostname());
+ assertEquals("HOST", m.getAppid());
+ assertEquals("", m.getInstanceid());
+ assertEquals("disk_free", m.getMetricname());
+ assertEquals("0", m.getStarttime());
+
+ assertThat(m.getMetrics()).isNotEmpty().hasSize(4).contains(
+ entry("0", "5.35"),
+ entry("5000", "5.35"),
+ entry("10000", "5.35"),
+ entry("15000", "5.35"));
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ambari/blob/42112e28/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/loadsimulator/jmetertest/jmetertest/AMSJMeterLoadTest.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/loadsimulator/jmetertest/jmetertest/AMSJMeterLoadTest.java b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/loadsimulator/jmetertest/jmetertest/AMSJMeterLoadTest.java
new file mode 100644
index 0000000..8d2d38e
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/loadsimulator/jmetertest/jmetertest/AMSJMeterLoadTest.java
@@ -0,0 +1,198 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.metrics.core.loadsimulator.jmetertest.jmetertest;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.ambari.metrics.core.loadsimulator.MetricsLoadSimulator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedReader;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+
+public class AMSJMeterLoadTest {
+
+ private final static Logger LOG = LoggerFactory.getLogger(AMSJMeterLoadTest.class);
+ private static String PROPERTIES_FILE = "loadsimulator/ams-jmeter.properties";
+ private ScheduledExecutorService scheduledExecutorService = null;
+ private List<AppGetMetric> appGetMetrics;
+ private Properties amsJmeterProperties = null;
+
+ public AMSJMeterLoadTest(Map<String, String> args) {
+
+ String testType = args.get("type");
+ String userDefinedPropertiesFile = args.get("amsJmeterPropertiesFile");
+ if (null == userDefinedPropertiesFile || userDefinedPropertiesFile.isEmpty()) {
+ this.amsJmeterProperties = readProperties(PROPERTIES_FILE);
+ } else {
+ this.amsJmeterProperties = readProperties(userDefinedPropertiesFile);
+ }
+
+ if ("U".equals(testType)) { //GET metrics simulator
+ int numInstances = Integer.valueOf(amsJmeterProperties.getProperty("num-ui-instances"));
+ this.scheduledExecutorService = Executors.newScheduledThreadPool(numInstances);
+ this.appGetMetrics = initializeGetMetricsPayload(amsJmeterProperties);
+ this.runTest(numInstances);
+ } else { //PUT Metrics simulator
+ Map<String, String> mapArgs = new HashMap<String, String>();
+ mapArgs.put("hostName", (args.get("host-prefix") != null) ? args.get("host-prefix") : amsJmeterProperties.getProperty("host-prefix"));
+ mapArgs.put("minHostIndex", (args.get("min-host-index") != null) ? args.get("min-host-index") : amsJmeterProperties.getProperty("min-host-index"));
+ mapArgs.put("numberOfHosts", (args.get("num-hosts") != null) ? args.get("num-hosts") : amsJmeterProperties.getProperty("num-hosts"));
+ mapArgs.put("metricsHostName", (args.get("ams-host-port") != null) ? args.get("ams-host-port") : amsJmeterProperties.getProperty("ams-host-port"));
+ mapArgs.put("collectInterval", (args.get("collection-interval") != null) ? args.get("collection-interval") : amsJmeterProperties.getProperty("collection-interval"));
+ mapArgs.put("sendInterval", (args.get("send-interval") != null) ? args.get("send-interval") : amsJmeterProperties.getProperty("send-interval"));
+ mapArgs.put("master", (args.get("create-master") != null) ? args.get("create-master") : amsJmeterProperties.getProperty("create-master"));
+ System.out.println("AMS Load Simulation Parameters : " + mapArgs);
+ MetricsLoadSimulator.startTest(mapArgs);
+ }
+ }
+
+ public static Properties readProperties(String propertiesFile) {
+ try {
+ Properties properties = new Properties();
+ InputStream inputStream = ClassLoader.getSystemResourceAsStream(propertiesFile);
+ if (inputStream == null) {
+ inputStream = new FileInputStream(propertiesFile);
+ }
+ properties.load(inputStream);
+ return properties;
+ } catch (IOException ioEx) {
+ LOG.error("Error reading properties file for jmeter");
+ return null;
+ }
+ }
+
+ private static List<GetMetricRequestInfo> readMetricsFromFile(String app) {
+ InputStream input = null;
+ List<GetMetricRequestInfo> metricList = new ArrayList<>();
+ String fileName = "ui_metrics_def/" + app + ".dat";
+
+ try {
+ input = ClassLoader.getSystemResourceAsStream(fileName);
+ BufferedReader reader = new BufferedReader(new InputStreamReader(input));
+ String line;
+ List<String> metrics = new ArrayList<>();
+ while ((line = reader.readLine()) != null) {
+
+ if (line.startsWith("|")) {
+ boolean needsTimestamps = line.contains("startTime");
+ boolean needsHost = line.contains("hostname");
+ metricList.add(new GetMetricRequestInfo(metrics, needsTimestamps, needsHost));
+ metrics.clear();
+ } else {
+ metrics.add(line);
+ }
+ }
+ return metricList;
+ } catch (IOException e) {
+ LOG.error("Cannot read file " + fileName + " for appID " + app, e);
+ } finally {
+ if (input != null) {
+ try {
+ input.close();
+ } catch (IOException ex) {
+ }
+ }
+ }
+ return null;
+ }
+
+ private static List<AppGetMetric> initializeGetMetricsPayload(Properties amsJmeterProperties) {
+
+ List<AppGetMetric> appGetMetrics = new ArrayList<AppGetMetric>();
+ String appsToTest = amsJmeterProperties.getProperty("apps-to-test");
+ String[] apps;
+
+ if (appsToTest != null && !appsToTest.isEmpty()) {
+ apps = StringUtils.split(appsToTest, ",");
+ } else {
+ apps = new String[JmeterTestPlanTask.ClientApp.values().length];
+ int ctr = 0;
+ for (JmeterTestPlanTask.ClientApp app : JmeterTestPlanTask.ClientApp.values())
+ apps[ctr++] = app.getId();
+ }
+
+ for (String app : apps) {
+
+ int interval = Integer.valueOf(amsJmeterProperties.getProperty("get-interval"));
+ String intervalString = amsJmeterProperties.getProperty(app + "-get-interval");
+ if (intervalString != null && !intervalString.isEmpty()) {
+ interval = Integer.valueOf(intervalString);
+ }
+ appGetMetrics.add(new AppGetMetric(readMetricsFromFile(app), interval, app));
+ }
+
+ return appGetMetrics;
+ }
+
+ public void runTest(int numInstances) {
+
+ int appRefreshRate = Integer.valueOf(amsJmeterProperties.getProperty("app-refresh-rate"));
+ for (int i = 0; i < numInstances; i++) {
+ ScheduledFuture future = scheduledExecutorService.scheduleAtFixedRate(new JmeterTestPlanTask(appGetMetrics,
+ amsJmeterProperties), 0, appRefreshRate, TimeUnit.MILLISECONDS);
+ }
+ }
+
+ /**
+ * Sample Usage:
+ * java -cp "lib/*":ambari-metrics-timelineservice-2.1.1.0.jar org.apache.ambari.metrics
+ * .core.loadsimulator.jmeter.AMSJMeterLoadTest
+ * -t UI -p ambari-metrics-timelineservice/src/main/resources/jmeter/ams-jmeter.properties
+ */
+ public static void main(String[] args) {
+ Map<String, String> mapArgs = parseArgs(args);
+ new AMSJMeterLoadTest(mapArgs);
+ }
+
+ private static Map<String, String> parseArgs(String[] args) {
+ Map<String, String> mapProps = new HashMap<String, String>();
+ if (args.length == 0) {
+ printUsage();
+ throw new RuntimeException("Unexpected argument, See usage message.");
+ } else {
+ for (int i = 0; i < args.length; i += 2) {
+ String arg = args[i];
+ mapProps.put(arg.substring(1), args[i+1]);
+ }
+ }
+ return mapProps;
+ }
+
+ public static void printUsage() {
+ System.err.println("Usage: java AMSJmeterLoadTest [OPTIONS]");
+ System.err.println("Options: ");
+ System.err.println("[--t type (S=>Sink/U=>UI)] [-ams-host-port localhost:6188] [-min-host-index 2] [-host-prefix TestHost.] [-num-hosts 2] " +
+ "[-create-master true] [-collection-interval 10000 ] [-send-interval 60000 ] [-p amsJmeterPropertiesFile (Optional)]");
+ }
+
+}
+
+
http://git-wip-us.apache.org/repos/asf/ambari/blob/42112e28/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/loadsimulator/jmetertest/jmetertest/AppGetMetric.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/loadsimulator/jmetertest/jmetertest/AppGetMetric.java b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/loadsimulator/jmetertest/jmetertest/AppGetMetric.java
new file mode 100644
index 0000000..bc6428e
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/loadsimulator/jmetertest/jmetertest/AppGetMetric.java
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.metrics.core.loadsimulator.jmetertest.jmetertest;
+
+import java.util.List;
+
+public class AppGetMetric {
+
+ private String app;
+ private int interval;
+ private List<GetMetricRequestInfo> requests;
+
+ public AppGetMetric(List<GetMetricRequestInfo> requests, int interval, String app) {
+ this.setMetricRequests(requests);
+ this.setInterval(interval);
+ this.setApp(app);
+ }
+
+ public List<GetMetricRequestInfo> getMetricRequests() {
+ return requests;
+ }
+
+ public void setMetricRequests(List<GetMetricRequestInfo> requests) {
+ this.requests = requests;
+ }
+
+ public int getInterval() {
+ return interval;
+ }
+
+ public void setInterval(int interval) {
+ this.interval = interval;
+ }
+
+ public String getApp() {
+ return app;
+ }
+
+ public void setApp(String app) {
+ this.app = app;
+ }
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/42112e28/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/loadsimulator/jmetertest/jmetertest/GetMetricRequestInfo.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/loadsimulator/jmetertest/jmetertest/GetMetricRequestInfo.java b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/loadsimulator/jmetertest/jmetertest/GetMetricRequestInfo.java
new file mode 100644
index 0000000..60ed3eb
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/loadsimulator/jmetertest/jmetertest/GetMetricRequestInfo.java
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.metrics.core.loadsimulator.jmetertest.jmetertest;
+
+import org.apache.commons.lang.StringUtils;
+
+import java.util.List;
+
+
+public class GetMetricRequestInfo {
+
+ private String metricStringPayload;
+ private boolean needsTimestamps;
+ private boolean needsHost;
+
+ public GetMetricRequestInfo(List<String> metrics, boolean needsTimestamps, boolean needsHost) {
+
+ this.setMetricStringPayload(StringUtils.join(metrics, ","));
+ this.setNeedsTimestamps(needsTimestamps);
+ this.setNeedsHost(needsHost);
+ }
+
+ public String getMetricStringPayload() {
+ return metricStringPayload;
+ }
+
+ public void setMetricStringPayload(String metricStringPayload) {
+ this.metricStringPayload = metricStringPayload;
+ }
+
+ public boolean needsTimestamps() {
+ return needsTimestamps;
+ }
+
+ public void setNeedsTimestamps(boolean needsTimestamps) {
+ this.needsTimestamps = needsTimestamps;
+ }
+
+ public boolean needsHost() {
+ return needsHost;
+ }
+
+ public void setNeedsHost(boolean needsHost) {
+ this.needsHost = needsHost;
+ }
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/42112e28/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/loadsimulator/jmetertest/jmetertest/JmeterTestPlanTask.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/loadsimulator/jmetertest/jmetertest/JmeterTestPlanTask.java b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/loadsimulator/jmetertest/jmetertest/JmeterTestPlanTask.java
new file mode 100644
index 0000000..0590c73
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/loadsimulator/jmetertest/jmetertest/JmeterTestPlanTask.java
@@ -0,0 +1,276 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.metrics.core.loadsimulator.jmetertest.jmetertest;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.jmeter.control.LoopController;
+import org.apache.jmeter.engine.StandardJMeterEngine;
+import org.apache.jmeter.protocol.http.sampler.HTTPSampler;
+import org.apache.jmeter.protocol.http.util.HTTPConstants;
+import org.apache.jmeter.reporters.ResultCollector;
+import org.apache.jmeter.reporters.Summariser;
+import org.apache.jmeter.testelement.TestElement;
+import org.apache.jmeter.testelement.TestPlan;
+import org.apache.jmeter.threads.JMeterContextService;
+import org.apache.jmeter.threads.ThreadGroup;
+import org.apache.jmeter.timers.ConstantTimer;
+import org.apache.jmeter.util.JMeterUtils;
+import org.apache.jorphan.collections.HashTree;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.lang.reflect.Field;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Random;
+
+public class JmeterTestPlanTask implements Runnable {
+
+ private static StandardJMeterEngine jmeterEngine = null;
+ private final static Logger LOG = LoggerFactory.getLogger(JmeterTestPlanTask.class);
+ private List<AppGetMetric> appGetMetrics;
+ private Properties amsJmeterProperties;
+ private HashTree amsTestPlanTree;
+ private TestPlan amsTestPlan;
+ private static final String JMETER_HOME = "loadsimulator";
+ private static final String JMETER_PROPERTIES_FILE = JMETER_HOME + "/jmeter.properties";
+ private static final String SAVESERVICE_PROPERTIES_FILE = JMETER_HOME + "/saveservice.properties";
+
+ public enum ClientApp {
+ HOST("HOST"),
+ NAMENODE("NAMENODE"),
+ HBASE("HBASE"),
+ NIMBUS("NIMBUS"),
+ KAFKA_BROKER("KAFKA_BROKER"),
+ FLUME_HANDLER("FLUME_HANDLER"),
+ AMS_HBASE("AMS-HBASE"),
+ NODEMANAGER("NODEMANAGER"),
+ RESOURCEMANAGER("RESOURCEMANAGER"),
+ DATANODE("DATANODE");
+
+ private String id;
+
+ private ClientApp(String id) {
+ this.id = id;
+ }
+
+ public String getId() {
+ return id;
+ }
+ }
+
+ public JmeterTestPlanTask(List<AppGetMetric> appGetMetrics, Properties amsJmeterProperties) {
+ this.appGetMetrics = appGetMetrics;
+ this.amsJmeterProperties = amsJmeterProperties;
+ amsTestPlanTree = new HashTree();
+ amsTestPlan = new TestPlan("AMS JMeter Load Test plan");
+ System.out.println("Starting AMS Jmeter load testing");
+ }
+
+ public void run() {
+ if (jmeterEngine != null) {
+
+ Object[] threadGroups = amsTestPlanTree.getArray(amsTestPlan);
+ for (Object threadGroupObj : threadGroups) {
+ if (threadGroupObj instanceof ThreadGroup) {
+ ThreadGroup threadGroup = (ThreadGroup) threadGroupObj;
+ threadGroup.stop();
+ }
+ }
+ amsTestPlanTree.clear();
+ jmeterEngine.askThreadsToStop();
+ jmeterEngine.stopTest();
+ JMeterContextService.endTest();
+ }
+
+ //Start the new test plan for the new app.
+ try {
+ //Initialize Jmeter essentials
+ jmeterEngine = new StandardJMeterEngine();
+ JMeterContextService.getContext().setEngine(jmeterEngine);
+
+ //Workaround to supply JMeterUtils with jmeter.prooperties from JAR.
+ JMeterUtils.setJMeterHome("");
+ Field f = new JMeterUtils().getClass().getDeclaredField("appProperties");
+ f.setAccessible(true);
+ f.set(null, AMSJMeterLoadTest.readProperties(JMETER_PROPERTIES_FILE));
+
+ //Copy saveservices.properties file to tmp dir for JMeter to consume.
+ InputStream inputStream = ClassLoader.getSystemResourceAsStream(SAVESERVICE_PROPERTIES_FILE);
+ if (inputStream == null) {
+ inputStream = new FileInputStream(SAVESERVICE_PROPERTIES_FILE);
+ }
+ String tmpDir = System.getProperty("java.io.tmpdir");
+ OutputStream outputStream = new FileOutputStream(tmpDir + "/saveservice.properties");
+ IOUtils.copy(inputStream, outputStream);
+ outputStream.close();
+ JMeterUtils.setProperty("saveservice_properties", tmpDir + "/saveservice.properties");
+
+ //Initialize Test plan
+ amsTestPlan.setProperty(TestElement.TEST_CLASS, TestPlan.class.getName());
+ amsTestPlanTree.add("AMS Test plan", amsTestPlan);
+
+ //Choose a random APP to run the perform GET metrics request.
+ int currentAppIndex = new Random().nextInt(appGetMetrics.size());
+
+ //Create ThreadGroup for the App
+ createThreadGroupHashTree(currentAppIndex, amsJmeterProperties, amsTestPlanTree, amsTestPlan);
+
+ //Geneates the JMX file that you can use through the GUI mode.
+ //SaveService.saveTree(amsTestPlanTree, new FileOutputStream(JMETER_HOME + "/" + "amsTestPlan.jmx"));
+
+ //Summarizer output to get test progress in stdout like.
+ Summariser summariser = null;
+ String summariserName = JMeterUtils.getPropDefault("summariser.name", "summary");
+ if (summariserName.length() > 0) {
+ summariser = new Summariser(summariserName);
+ }
+
+ //Store execution results into a .jtl file
+ String jmeterLogFile = tmpDir + "/amsJmeterTestResults.jtl";
+ ResultCollector resultCollector = new ResultCollector(summariser);
+ resultCollector.setFilename(jmeterLogFile);
+ amsTestPlanTree.add(amsTestPlanTree.getArray()[0], resultCollector);
+ jmeterEngine.configure(amsTestPlanTree);
+ jmeterEngine.run();
+
+ LOG.info("AMS Jmeter Test started up successfully");
+
+ } catch (Exception ioEx) {
+ amsTestPlanTree.clear();
+ jmeterEngine.askThreadsToStop();
+ jmeterEngine.stopTest();
+ JMeterContextService.endTest();
+ LOG.error("Error occurred while running AMS load test : " + ioEx.getMessage());
+ ioEx.printStackTrace();
+ }
+ }
+
+ private ConstantTimer createConstantTimer(int delay) {
+ ConstantTimer timer = new ConstantTimer();
+ timer.setDelay("" + delay);
+ return timer;
+ }
+
+ private Map<String, String> getAppSpecificParameters(String app, GetMetricRequestInfo request, Properties amsJmeterProperties) {
+
+ Map<String, String> parametersMap = new HashMap<String, String>();
+ String hostPrefix = amsJmeterProperties.getProperty("host-prefix");
+ String hostSuffix = amsJmeterProperties.getProperty("host-suffix");
+ int minHostIndex = Integer.valueOf(amsJmeterProperties.getProperty("min-host-index"));
+ int numHosts = Integer.valueOf(amsJmeterProperties.getProperty("num-hosts"));
+
+ parametersMap.put("appId", app);
+
+ if (request.needsTimestamps()) {
+ long currentTime = System.currentTimeMillis();
+ long oneHourBack = currentTime - 3600 * 1000;
+ parametersMap.put("startTime", String.valueOf(oneHourBack));
+ parametersMap.put("endTime", String.valueOf(currentTime));
+ }
+
+ if (request.needsHost()) {
+ if (ClientApp.AMS_HBASE.getId().equals(app)) {
+ parametersMap.put("hostname", amsJmeterProperties.getProperty("ams-host"));
+ } else if (ClientApp.HOST.getId().equals(app) || ClientApp.NODEMANAGER.getId().equals(app)) {
+ int randomHost = minHostIndex + new Random().nextInt(numHosts);
+ parametersMap.put("hostname", hostPrefix + randomHost + hostSuffix);
+ } else {
+ parametersMap.put("hostname", hostPrefix + amsJmeterProperties.getProperty(app + "-host") + hostSuffix);
+ }
+ }
+ parametersMap.put("metricNames", request.getMetricStringPayload());
+ return parametersMap;
+ }
+
+ private void createThreadGroupHashTree(int appIndex, Properties amsJmeterProperties, HashTree amsTestPlanTree, TestPlan amsTestPlan) {
+
+ AppGetMetric appGetMetric = appGetMetrics.get(appIndex);
+ String app = appGetMetric.getApp();
+ int interval = appGetMetric.getInterval();
+
+ //Read and validate AMS information.
+ String[] amsHostPort = amsJmeterProperties.getProperty("ams-host-port").split(":");
+ String amsHost = amsHostPort[0];
+ String amsPath = amsJmeterProperties.getProperty("ams-path");
+ int amsPort = Integer.valueOf(amsHostPort[1]);
+ int numLoops = Integer.valueOf(amsJmeterProperties.getProperty("num-get-calls-per-app"));
+
+ LoopController loopController = createLoopController(app + " GET loop controller", numLoops, false);
+ for (GetMetricRequestInfo request : appGetMetric.getMetricRequests()) {
+
+ ThreadGroup threadGroup = createThreadGroup(app + " GET threadGroup", 1, 0, loopController);
+
+ HashTree threadGroupHashTree = amsTestPlanTree.add(amsTestPlan, threadGroup);
+ Map<String, String> parametersMap = getAppSpecificParameters(app, request, amsJmeterProperties);
+
+ HTTPSampler sampler = createGetSampler("GET " + app + " metrics", amsHost, amsPort, amsPath, null, parametersMap);
+
+ if (numLoops > 1) {
+ threadGroupHashTree.add(createConstantTimer(interval));
+ }
+
+ threadGroupHashTree.add(sampler);
+ }
+ }
+
+ private HTTPSampler createGetSampler(String name, String domain, int port, String path, String encoding, Map<String, String> parameters) {
+
+ HTTPSampler sampler = new HTTPSampler();
+ sampler.setDomain(domain);
+ sampler.setPort(port);
+ sampler.setPath(path);
+ sampler.setMethod(HTTPConstants.GET);
+
+ if (encoding != null)
+ sampler.setContentEncoding(encoding);
+
+ for (Map.Entry<String, String> entry : parameters.entrySet()) {
+ sampler.addArgument(entry.getKey(), entry.getValue());
+ }
+ sampler.setName(name);
+ return sampler;
+ }
+
+ private LoopController createLoopController(String name, int numLoops, boolean continueForever) {
+ LoopController loopController = new LoopController();
+ loopController.setLoops(numLoops);
+ loopController.setProperty(TestElement.TEST_CLASS, LoopController.class.getName());
+ loopController.initialize();
+ loopController.setContinueForever(continueForever);
+ loopController.setName(name);
+ return loopController;
+ }
+
+ private ThreadGroup createThreadGroup(String name, int numThreads, int rampUp, LoopController loopController) {
+ ThreadGroup threadGroup = new ThreadGroup();
+ threadGroup.setName(name);
+ threadGroup.setNumThreads(numThreads);
+ threadGroup.setRampUp(rampUp);
+ threadGroup.setSamplerController(loopController);
+ threadGroup.setProperty(TestElement.TEST_CLASS, ThreadGroup.class.getName());
+ return threadGroup;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/42112e28/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/loadsimulator/net/TestRestMetricsSender.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/loadsimulator/net/TestRestMetricsSender.java b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/loadsimulator/net/TestRestMetricsSender.java
new file mode 100644
index 0000000..9c8e641
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/loadsimulator/net/TestRestMetricsSender.java
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.metrics.core.loadsimulator.net;
+
+import org.junit.Test;
+
+import java.io.IOException;
+
+import static org.easymock.EasyMock.*;
+import static org.junit.Assert.assertEquals;
+
+public class TestRestMetricsSender {
+
+ @Test
+ public void testPushMetrics() throws Exception {
+ final UrlService svcMock = createStrictMock(UrlService.class);
+ final String payload = "test";
+ final String expectedResponse = "mockResponse";
+
+ expect(svcMock.send(anyString())).andReturn(expectedResponse);
+ svcMock.disconnect();
+ expectLastCall();
+
+ replay(svcMock);
+
+ RestMetricsSender sender = new RestMetricsSender("expectedHostName") {
+ @Override
+ protected UrlService getConnectedUrlService() throws IOException {
+ return svcMock;
+ }
+ };
+ String response = sender.pushMetrics(payload);
+
+ verify(svcMock);
+ assertEquals("", expectedResponse, response);
+ }
+
+ @Test
+ public void testPushMetricsFailed() throws Exception {
+ final UrlService svcMock = createStrictMock(UrlService.class);
+ final String payload = "test";
+ final String expectedResponse = "mockResponse";
+ RestMetricsSender sender = new RestMetricsSender("expectedHostName") {
+ @Override
+ protected UrlService getConnectedUrlService() throws IOException {
+ return svcMock;
+ }
+ };
+
+ expect(svcMock.send(anyString())).andThrow(new IOException());
+ svcMock.disconnect();
+ expectLastCall();
+
+ replay(svcMock);
+
+ String response = sender.pushMetrics(payload);
+
+ verify(svcMock);
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ambari/blob/42112e28/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/loadsimulator/net/TestStdOutMetricsSender.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/loadsimulator/net/TestStdOutMetricsSender.java b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/loadsimulator/net/TestStdOutMetricsSender.java
new file mode 100644
index 0000000..29ebda4
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/loadsimulator/net/TestStdOutMetricsSender.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.metrics.core.loadsimulator.net;
+
+
+import org.junit.Test;
+
+import java.io.ByteArrayOutputStream;
+import java.io.PrintStream;
+
+public class TestStdOutMetricsSender {
+
+ @Test
+ public void testPushMetrics() throws Exception {
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ PrintStream out = new PrintStream(baos);
+ StdOutMetricsSender sender = new StdOutMetricsSender("expectedHostName", out);
+ sender.pushMetrics("test");
+
+ System.out.println(baos.toString());
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ambari/blob/42112e28/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/loadsimulator/util/TestRandomMetricsProvider.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/loadsimulator/util/TestRandomMetricsProvider.java b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/loadsimulator/util/TestRandomMetricsProvider.java
new file mode 100644
index 0000000..a1801b0
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/loadsimulator/util/TestRandomMetricsProvider.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.metrics.core.loadsimulator.util;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertTrue;
+
+public class TestRandomMetricsProvider {
+
+ @Test
+ public void testReturnSingle() {
+ double from = 5.25;
+ double to = 5.40;
+ RandomMetricsProvider provider = new RandomMetricsProvider(from, to);
+ double metric = provider.next();
+
+ assertTrue("Generated metric should be in range", from < metric && metric < to);
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ambari/blob/42112e28/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/loadsimulator/util/TestTimeStampProvider.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/loadsimulator/util/TestTimeStampProvider.java b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/loadsimulator/util/TestTimeStampProvider.java
new file mode 100644
index 0000000..9011e75
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/loadsimulator/util/TestTimeStampProvider.java
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.metrics.core.loadsimulator.util;
+
+import org.junit.Test;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.junit.Assert.assertEquals;
+
+public class TestTimeStampProvider {
+
+ @Test
+ public void testReturnSingle() {
+ long startTime = 1411663170112L;
+ int timeStep = 5000;
+ TimeStampProvider tm = new TimeStampProvider(startTime, timeStep, 0);
+
+ long tStamp = tm.next();
+
+ assertEquals("First generated timestamp should match starttime", startTime, tStamp);
+ }
+
+ @Test
+ public void testReturnTstampsForSendInterval() throws Exception {
+ long startTime = 0;
+ int collectInterval = 5;
+ int sendInterval = 30;
+ TimeStampProvider tsp = new TimeStampProvider(startTime, collectInterval, sendInterval);
+
+ long[] timestamps = tsp.timestampsForNextInterval();
+
+ assertThat(timestamps)
+ .hasSize(6)
+ .containsOnly(0, 5, 10, 15, 20, 25);
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ambari/blob/42112e28/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/timeline/AbstractMiniHBaseClusterTest.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/timeline/AbstractMiniHBaseClusterTest.java b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/timeline/AbstractMiniHBaseClusterTest.java
new file mode 100644
index 0000000..0553d4c
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/timeline/AbstractMiniHBaseClusterTest.java
@@ -0,0 +1,293 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.metrics.core.timeline;
+
+import static org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration.OUT_OFF_BAND_DATA_TIME_ALLOWANCE;
+import static org.apache.ambari.metrics.core.timeline.query.PhoenixTransactSQL.METRICS_RECORD_TABLE_NAME;
+import static org.apache.ambari.metrics.core.timeline.query.PhoenixTransactSQL.UPSERT_METRICS_SQL;
+import static org.apache.phoenix.end2end.ParallelStatsDisabledIT.tearDownMiniCluster;
+import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.IntegrationTestingUtility;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics;
+import org.apache.ambari.metrics.core.timeline.aggregators.AggregatorUtils;
+import org.apache.ambari.metrics.core.timeline.discovery.TimelineMetricMetadataManager;
+import org.apache.ambari.metrics.core.timeline.query.PhoenixConnectionProvider;
+import org.apache.hadoop.yarn.util.timeline.TimelineUtils;
+import org.apache.log4j.Level;
+import org.apache.log4j.Logger;
+import org.apache.phoenix.hbase.index.write.IndexWriterUtils;
+import org.apache.phoenix.query.BaseTest;
+import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.util.PropertiesUtil;
+import org.apache.phoenix.util.ReadOnlyProps;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public abstract class AbstractMiniHBaseClusterTest extends BaseTest {
+
+ protected static final long BATCH_SIZE = 3;
+ protected Connection conn;
+ protected PhoenixHBaseAccessor hdb;
+ protected TimelineMetricMetadataManager metadataManager;
+
+ public final Log LOG;
+
+ public AbstractMiniHBaseClusterTest() {
+ LOG = LogFactory.getLog(this.getClass());
+ }
+
+ @BeforeClass
+ public static void doSetup() throws Exception {
+ Map<String, String> props = getDefaultProps();
+ props.put(IntegrationTestingUtility.IS_DISTRIBUTED_CLUSTER, "false");
+ props.put(QueryServices.QUEUE_SIZE_ATTRIB, Integer.toString(5000));
+ props.put(IndexWriterUtils.HTABLE_THREAD_KEY, Integer.toString(100));
+ // Make a small batch size to test multiple calls to reserve sequences
+ props.put(QueryServices.SEQUENCE_CACHE_SIZE_ATTRIB, Long.toString(BATCH_SIZE));
+ // Must update config before starting server
+ setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator()));
+ }
+
+ @AfterClass
+ public static void doTeardown() throws Exception {
+ dropNonSystemTables();
+ tearDownMiniCluster();
+ }
+
+ @Before
+ public void setUp() throws Exception {
+ Logger.getLogger("org.apache.ambari.metrics.core.timeline").setLevel(Level.DEBUG);
+ hdb = createTestableHBaseAccessor();
+ // inits connection, starts mini cluster
+ conn = getConnection(getUrl());
+
+ hdb.initMetricSchema();
+ metadataManager = new TimelineMetricMetadataManager(new Configuration(), hdb);
+ hdb.setMetadataInstance(metadataManager);
+ }
+
+ private void deleteTableIgnoringExceptions(Statement stmt, String tableName) {
+ try {
+ stmt.execute("delete from " + tableName);
+ } catch (Exception e) {
+ LOG.warn("Exception on delete table " + tableName, e);
+ }
+ }
+
+ @After
+ public void tearDown() {
+ Connection conn = null;
+ Statement stmt = null;
+ try {
+ conn = getConnection(getUrl());
+ stmt = conn.createStatement();
+
+ deleteTableIgnoringExceptions(stmt, "METRIC_AGGREGATE");
+ deleteTableIgnoringExceptions(stmt, "METRIC_AGGREGATE_MINUTE");
+ deleteTableIgnoringExceptions(stmt, "METRIC_AGGREGATE_HOURLY");
+ deleteTableIgnoringExceptions(stmt, "METRIC_AGGREGATE_DAILY");
+ deleteTableIgnoringExceptions(stmt, "METRIC_RECORD");
+ deleteTableIgnoringExceptions(stmt, "METRIC_RECORD_MINUTE");
+ deleteTableIgnoringExceptions(stmt, "METRIC_RECORD_HOURLY");
+ deleteTableIgnoringExceptions(stmt, "METRIC_RECORD_DAILY");
+ deleteTableIgnoringExceptions(stmt, "METRICS_METADATA");
+ deleteTableIgnoringExceptions(stmt, "HOSTED_APPS_METADATA");
+
+ conn.commit();
+ } catch (Exception e) {
+ LOG.warn("Error on deleting HBase schema.", e);
+ } finally {
+ if (stmt != null) {
+ try {
+ stmt.close();
+ } catch (SQLException e) {
+ // Ignore
+ }
+ }
+
+ if (conn != null) {
+ try {
+ conn.close();
+ } catch (SQLException e) {
+ // Ignore
+ }
+ }
+ }
+ }
+
+ public static Map<String, String> getDefaultProps() {
+ Map<String, String> props = new HashMap<String, String>();
+ // Must update config before starting server
+ props.put(QueryServices.STATS_USE_CURRENT_TIME_ATTRIB,
+ Boolean.FALSE.toString());
+ props.put("java.security.krb5.realm", "");
+ props.put("java.security.krb5.kdc", "");
+ return props;
+ }
+
+ protected Connection getConnection(String url) throws SQLException {
+ Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+ Connection conn = DriverManager.getConnection(getUrl(), props);
+ return conn;
+ }
+
+ /**
+ * A canary test. Will show if the infrastructure is set-up correctly.
+ */
+ @Test
+ public void testClusterOK() throws Exception {
+ Connection conn = getConnection(getUrl());
+ conn.setAutoCommit(true);
+
+ String sampleDDL = "CREATE TABLE TEST_METRICS " +
+ "(TEST_COLUMN VARCHAR " +
+ "CONSTRAINT pk PRIMARY KEY (TEST_COLUMN)) " +
+ "DATA_BLOCK_ENCODING='FAST_DIFF', IMMUTABLE_ROWS=true, " +
+ "TTL=86400, COMPRESSION='NONE' ";
+
+ Statement stmt = conn.createStatement();
+ stmt.executeUpdate(sampleDDL);
+ conn.commit();
+
+ ResultSet rs = stmt.executeQuery(
+ "SELECT COUNT(TEST_COLUMN) FROM TEST_METRICS");
+
+ rs.next();
+ long l = rs.getLong(1);
+ assertThat(l).isGreaterThanOrEqualTo(0);
+
+ stmt.execute("DROP TABLE TEST_METRICS");
+ conn.close();
+ }
+
+ protected PhoenixHBaseAccessor createTestableHBaseAccessor() {
+ Configuration metricsConf = new Configuration();
+ metricsConf.set(TimelineMetricConfiguration.HBASE_COMPRESSION_SCHEME, "NONE");
+ // Unit tests insert values into the future
+ metricsConf.setLong(OUT_OFF_BAND_DATA_TIME_ALLOWANCE, 600000);
+
+ return
+ new PhoenixHBaseAccessor(new TimelineMetricConfiguration(new Configuration(), metricsConf),
+ new PhoenixConnectionProvider() {
+ @Override
+ public Admin getHBaseAdmin() throws IOException {
+ try {
+ return driver.getConnectionQueryServices(null, null).getAdmin();
+ } catch (SQLException e) {
+ LOG.error(e);
+ }
+ return null;
+ }
+
+ @Override
+ public Connection getConnection() {
+ Connection connection = null;
+ try {
+ connection = DriverManager.getConnection(getUrl());
+ } catch (SQLException e) {
+ LOG.warn("Unable to connect to HBase store using Phoenix.", e);
+ }
+ return connection;
+ }
+
+ });
+ }
+
+ protected void insertMetricRecords(Connection conn, TimelineMetrics metrics)
+ throws SQLException, IOException {
+
+ List<TimelineMetric> timelineMetrics = metrics.getMetrics();
+ if (timelineMetrics == null || timelineMetrics.isEmpty()) {
+ LOG.debug("Empty metrics insert request.");
+ return;
+ }
+
+ PreparedStatement metricRecordStmt = null;
+
+ try {
+ metricRecordStmt = conn.prepareStatement(String.format(
+ UPSERT_METRICS_SQL, METRICS_RECORD_TABLE_NAME));
+
+ for (TimelineMetric metric : timelineMetrics) {
+ metricRecordStmt.clearParameters();
+
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("host: " + metric.getHostName() + ", " +
+ "metricName = " + metric.getMetricName() + ", " +
+ "values: " + metric.getMetricValues());
+ }
+ double[] aggregates = AggregatorUtils.calculateAggregates(
+ metric.getMetricValues());
+
+ byte[] uuid = metadataManager.getUuid(metric);
+ if (uuid == null) {
+ LOG.error("Error computing UUID for metric. Cannot write metrics : " + metric.toString());
+ continue;
+ }
+ metricRecordStmt.setBytes(1, uuid);
+ metricRecordStmt.setLong(2, metric.getStartTime());
+ metricRecordStmt.setDouble(3, aggregates[0]);
+ metricRecordStmt.setDouble(4, aggregates[1]);
+ metricRecordStmt.setDouble(5, aggregates[2]);
+ metricRecordStmt.setInt(6, (int) aggregates[3]);
+ String json = TimelineUtils.dumpTimelineRecordtoJSON(metric.getMetricValues());
+ metricRecordStmt.setString(7, json);
+
+ try {
+ int row = metricRecordStmt.executeUpdate();
+ LOG.info("Inserted " + row + " rows.");
+ } catch (SQLException sql) {
+ LOG.error(sql);
+ }
+ }
+
+ conn.commit();
+
+ } finally {
+ if (metricRecordStmt != null) {
+ try {
+ metricRecordStmt.close();
+ } catch (SQLException e) {
+ // Ignore
+ }
+ }
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ambari/blob/42112e28/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/timeline/AbstractPhoenixConnectionlessTest.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/timeline/AbstractPhoenixConnectionlessTest.java b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/timeline/AbstractPhoenixConnectionlessTest.java
new file mode 100644
index 0000000..d3fc50f
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/timeline/AbstractPhoenixConnectionlessTest.java
@@ -0,0 +1,111 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.metrics.core.timeline;
+
+import org.apache.phoenix.jdbc.PhoenixEmbeddedDriver;
+import org.apache.phoenix.jdbc.PhoenixTestDriver;
+import org.apache.phoenix.query.BaseTest;
+import org.apache.phoenix.util.PropertiesUtil;
+import org.apache.phoenix.util.ReadOnlyProps;
+import org.apache.phoenix.util.TestUtil;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import static org.apache.phoenix.util.PhoenixRuntime.TENANT_ID_ATTRIB;
+import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+public abstract class AbstractPhoenixConnectionlessTest extends BaseTest {
+
+ protected static String getUrl() {
+ return TestUtil.PHOENIX_CONNECTIONLESS_JDBC_URL;
+ }
+
+ protected static String getUrl(String tenantId) {
+ return getUrl() + ';' + TENANT_ID_ATTRIB + '=' + tenantId;
+ }
+
+ protected static PhoenixTestDriver driver;
+
+ private static void startServer(String url) throws Exception {
+ assertNull(driver);
+ // only load the test driver if we are testing locally - for integration tests, we want to
+ // test on a wider scale
+ if (PhoenixEmbeddedDriver.isTestUrl(url)) {
+ driver = initDriver(ReadOnlyProps.EMPTY_PROPS);
+ assertTrue(DriverManager.getDriver(url) == driver);
+ driver.connect(url, PropertiesUtil.deepCopy(TEST_PROPERTIES));
+ }
+ }
+
+ protected static synchronized PhoenixTestDriver initDriver(ReadOnlyProps props) throws Exception {
+ if (driver == null) {
+ driver = new PhoenixTestDriver(props);
+ DriverManager.registerDriver(driver);
+ }
+ return driver;
+ }
+
+ private String connUrl;
+
+ @Before
+ public void setup() throws Exception {
+ connUrl = getUrl();
+ startServer(connUrl);
+ }
+
+ @Test
+ public void testStorageSystemInitialized() throws Exception {
+ String sampleDDL = "CREATE TABLE TEST_METRICS (TEST_COLUMN VARCHAR " +
+ "CONSTRAINT pk PRIMARY KEY (TEST_COLUMN)) DATA_BLOCK_ENCODING='FAST_DIFF', " +
+ "IMMUTABLE_ROWS=true, TTL=86400, COMPRESSION='SNAPPY'";
+
+ Connection conn = null;
+ PreparedStatement stmt = null;
+ try {
+ conn = DriverManager.getConnection(connUrl);
+ stmt = conn.prepareStatement(sampleDDL);
+ stmt.execute();
+ conn.commit();
+ } finally {
+ if (stmt != null) {
+ stmt.close();
+ }
+ if (conn != null) {
+ conn.close();
+ }
+ }
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ if (driver != null) {
+ try {
+ driver.close();
+ } finally {
+ PhoenixTestDriver phoenixTestDriver = driver;
+ driver = null;
+ DriverManager.deregisterDriver(phoenixTestDriver);
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/42112e28/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/timeline/FunctionTest.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/timeline/FunctionTest.java b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/timeline/FunctionTest.java
new file mode 100644
index 0000000..03e39f7
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/timeline/FunctionTest.java
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.metrics.core.timeline;
+
+import org.apache.ambari.metrics.core.timeline.aggregators.Function;
+import org.junit.Ignore;
+import org.junit.Test;
+
+import static org.apache.ambari.metrics.core.timeline.aggregators.Function.fromMetricName;
+import static org.apache.ambari.metrics.core.timeline.aggregators.Function.ReadFunction.AVG;
+import static org.apache.ambari.metrics.core.timeline.aggregators.Function.PostProcessingFunction.RATE;
+import static org.apache.ambari.metrics.core.timeline.aggregators.Function.PostProcessingFunction.DIFF;
+import static org.assertj.core.api.Assertions.assertThat;
+
+public class FunctionTest {
+
+ @Test
+ public void testCreation() throws Exception {
+ Function f = fromMetricName("Metric._avg");
+ assertThat(f).isEqualTo(new Function(AVG, null));
+
+ f = fromMetricName("Metric._rate._avg");
+ assertThat(f).isEqualTo(new Function(AVG, RATE));
+
+ f = fromMetricName("bytes_in");
+ assertThat(f).isEqualTo(Function.DEFAULT_VALUE_FUNCTION);
+
+ // Rate support without aggregates
+ f = fromMetricName("Metric._rate");
+ assertThat(f).isEqualTo(new Function(null, RATE));
+
+ // Diff support
+ f = fromMetricName("Metric._diff._avg");
+ assertThat(f).isEqualTo(new Function(AVG, DIFF));
+
+ // Diff support without aggregates
+ f = fromMetricName("Metric._diff");
+ assertThat(f).isEqualTo(new Function(null, DIFF));
+
+ }
+
+ @Ignore // If unknown function: behavior is best effort query without function
+ @Test(expected = Function.FunctionFormatException.class)
+ public void testNotAFunction() throws Exception {
+ fromMetricName("bytes._not._afunction");
+ }
+}